Administrator
2024-07-03 b2b9497053639a359dc50c05674dfa81169ed31e
外部API接口调整/增加历史K线更新接口
4个文件已修改
2个文件已添加
2424 ■■■■ 已修改文件
code_attribute/code_l1_data_manager.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_manager.py 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 1152 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/outside_api_command_callback.py 1176 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_l1_data_manager.py
New file
@@ -0,0 +1,16 @@
"""
代码L1数据管理
"""
class L1DataManager:
    __current_price_dict = {}
    @classmethod
    def set_l1_current_price(cls, code, price):
        cls.__current_price_dict[code] = price
    # 获取L1现价
    @classmethod
    def get_l1_current_price(cls, code):
        return cls.__current_price_dict.get(code)
code_attribute/first_target_code_data_processor.py
@@ -12,6 +12,7 @@
from code_attribute.gpcode_manager import WantBuyCodesManager
from log_module.log import logger_first_code_record, logger_l2_codes_subscript
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi
from ths import l2_code_operate
from trade import trade_data_manager, l2_trade_util
@@ -42,7 +43,11 @@
def process_first_codes_datas(dataList, request_id=None):
    logger_l2_codes_subscript.info(f"{request_id}加载l2代码相关数据")
    # 获取最近5天的交易日期,为后面的数据计算做准备
    HistoryKDatasUtils.get_latest_trading_date_cache(5)
    dates = HistoryKDatasUtils.get_latest_trading_date_cache(5)
    latest_trading_date = None
    if dates:
        latest_trading_date = dates[0]
    limit_up_price_dict = {}
    temp_codes = []
    codes = []
@@ -95,7 +100,12 @@
            if limit_up_price is None:
                continue
            try:
                volumes_data = init_data_util.get_volumns_by_code(code, 150)
                # 首先从缓存里面获取
                volumes_data = None
                if latest_trading_date:
                    volumes_data = HistoryKDataManager().get_history_bars(code, latest_trading_date)
                if not volumes_data:
                    volumes_data = init_data_util.get_volumns_by_code(code, 150)
                volumes = init_data_util.parse_max_volume(code, volumes_data[:90],
                                                          code_nature_analyse.is_new_top(code,
                                                                                         limit_up_price,
@@ -126,7 +136,8 @@
                            l2_trade_util.forbidden_trade(code,
                                                          f"无辨识度,涨停价({limit_up_price})>50")
                            continue
                    if code_nature_analyse.is_price_too_high_in_days(code, volumes_data, limit_up_price)[0] and code.find("30")!=0:
                    if code_nature_analyse.is_price_too_high_in_days(code, volumes_data, limit_up_price)[
                        0] and code.find("30") != 0:
                        # 判断是否太高
                        l2_trade_util.forbidden_trade(code, "6天内股价长得太高")
                        continue
test/test.py
@@ -4,7 +4,8 @@
from code_attribute.code_data_util import ZYLTGBUtil
from code_attribute.gpcode_manager import CodesNameManager
from huaxin_client import l1_subscript_codes_manager
from third_data.history_k_data_util import JueJinHttpApi
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import JueJinHttpApi, HistoryKDatasUtils
from utils import tool, init_data_util
@@ -25,5 +26,11 @@
if __name__ == "__main__":
    datas = init_data_util.get_volumns_by_code("000333")
    print(datas[0])
    # codes = ["600859", "000333"]
    # for code in codes:
    #     datas = init_data_util.get_volumns_by_code(code)
    #     HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas, force=True)
    codes = HistoryKDataManager().get_history_bars_codes("2024-07-02")
    print(codes)
    # print(count)
    # print(HistoryKDataManager().get_history_bars(code, "2024-07-02"))
third_data/history_k_data_manager.py
@@ -3,8 +3,45 @@
"""
import datetime
import os
import threading
import constant
from huaxin_client import l1_subscript_codes_manager
from log_module.log import logger_debug
from third_data import history_k_data_util
from utils import tool, init_data_util
def update_history_k_bars():
    """
    更新历史K线
    @return: 此次更新的数量
    """
    def update(codes_):
        for code in codes_:
            try:
                datas = init_data_util.get_volumns_by_code(code)
                HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas)
            except Exception as e:
                logger_debug.exception(e)
    previous_trading_date = history_k_data_util.JueJinApi.get_previous_trading_date(tool.get_now_date_str())
    if previous_trading_date is None:
        raise Exception("上一个交易日获取失败")
    # 刷新目标代码的自由流通量
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
    codes = set()
    if codes_sh:
        for code_byte in codes_sh:
            codes.add(code_byte.decode())
        for code_byte in codes_sz:
            codes.add(code_byte.decode())
    # 获取已经更新的数据
    codes_record = HistoryKDataManager().get_history_bars_codes(previous_trading_date)
    codes = codes - codes_record
    threading.Thread(target=lambda: update(codes), daemon=True).start()
    return len(codes)
class HistoryKDataManager:
@@ -97,17 +134,18 @@
                return datas
        return None
    def get_history_bars_code_count(self, day):
    def get_history_bars_codes(self, day):
        """
        获取某一天的历史K线的数量
        获取某一天的历史K线的代码数据
        @param day:
        @return:
        @return: 代码集合
        """
        dir_path = self.__get_cache_dir()
        count = 0
        codes = set()
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                # 输出文件的绝对路径
                if file.find(day) >= 0:
                    count += 1
        return count
                    codes.add(file.split("_")[1][:6])
        return codes
trade/huaxin/huaxin_trade_server.py
@@ -11,26 +11,18 @@
import threading
import time
import psutil
import requests
import schedule
import huaxin_client.constant
import constant
import inited_data
import outside_api_command_manager
from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer
from code_attribute import gpcode_manager, code_volumn_manager, global_data_loader, zyltgb_util
from code_attribute.code_data_util import ZYLTGBUtil
from code_attribute.gpcode_manager import CodePrePriceManager, CodesNameManager, WantBuyCodesManager
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol
from huaxin_client.client_network import SendResponseSkManager
from code_attribute.code_l1_data_manager import L1DataManager
from code_attribute.gpcode_manager import CodePrePriceManager
from huaxin_client import l2_data_transform_protocol
from huaxin_client.trade_transform_protocol import TradeResponse
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, transaction_progress, \
    l2_data_source_util, cancel_buy_strategy, l2_data_log
    l2_data_source_util, l2_data_log
from l2.cancel_buy_strategy import GCancelBigNumComputer, \
    DCancelBigNumComputer
from l2.code_price_manager import Buy1PriceManager
@@ -39,33 +31,24 @@
from l2.l2_data_manager import TradePointManager, OrderBeginPosInfo
from l2.l2_data_util import L2DataUtil
from l2.l2_sell_manager import L2MarketSellManager
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager, BigOrderDealManager
from l2.l2_transaction_data_processor import HuaXinTransactionDatasProcessor
from l2.place_order_single_data_manager import L2TradeSingleCallback, L2TradeSingleDataManager
from log_module import async_log_util, log_export
from log_module.log import hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_market_data, logger_l2_g_cancel, logger_debug, \
    logger_system, logger_trade, logger_trade_position_api_request, logger_request_api, \
    logger_local_huaxin_l1_trade_info, logger_real_place_order_position, logger_device, logger_l2_codes_subscript
from output import l2_output_util
from third_data import block_info, kpl_data_manager, kpl_util, history_k_data_util, kpl_api
    logger_system, logger_trade, logger_local_huaxin_l1_trade_info, logger_l2_codes_subscript
from third_data import block_info, kpl_data_manager
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager
from third_data.kpl_util import KPLDataType
from third_data.history_k_data_util import JueJinApi
from trade import trade_manager, l2_trade_util, \
    trade_data_manager, trade_constant
import l2_data_util as l2_data_util_old
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update, \
    huaxin_trade_record_manager, huaxin_trade_order_processor, huaxin_sell_util
from trade.huaxin.huaxin_trade_record_manager import PositionManager, DealRecordManager, DelegateRecordManager
from trade.sell import sell_manager
from trade.sell.sell_rule_manager import TradeRuleManager, SellRule
from trade.trade_manager import TradeTargetCodeModeManager, AutoCancelSellModeManager, \
    CodesTradeStateManager
from settings.trade_setting import MarketSituationManager
from utils import socket_util, data_export_util, middle_api_protocol, tool, huaxin_util, output_util, global_util
    huaxin_trade_record_manager, huaxin_sell_util
from trade.huaxin.outside_api_command_callback import OutsideApiCommandCallback
from trade.sell.sell_rule_manager import TradeRuleManager
from trade.trade_manager import CodesTradeStateManager
from utils import socket_util, middle_api_protocol, tool, huaxin_util, global_util
trade_data_request_queue = queue.Queue()
@@ -294,7 +277,6 @@
    __GCancelBigNumComputer = GCancelBigNumComputer()
    __sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    __process_l1_data_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    __current_price_dict = {}
    current_buy1_dict = {}
    __updating_jx_blocks_codes = set()
@@ -358,13 +340,13 @@
            code = d[0]
            # 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
            price = d[1]
            cls.__current_price_dict[code] = price
            L1DataManager.set_l1_current_price(code, price)
            cls.current_buy1_dict[code] = (d[5], d[6])
    # 获取L1现价
    @classmethod
    def get_l1_current_price(cls, code):
        return cls.__current_price_dict.get(code)
        return L1DataManager.get_l1_current_price(code)
    # 设置目标代码
    @classmethod
@@ -641,1112 +623,6 @@
            except Exception as e:
                logger_local_huaxin_l1_trade_info.exception(e)
                logging.exception(e)
class OutsideApiCommandCallback(outside_api_command_manager.ActionCallback):
    __cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
    __DealRecordManager = DealRecordManager()
    __code_sell_way_dict = {}
    @classmethod
    def __send_response(cls, data_bytes):
        sk = SendResponseSkManager.create_send_response_sk(addr=huaxin_client.constant.SERVER_IP,
                                                           port=huaxin_client.constant.SERVER_PORT)
        try:
            data_bytes = socket_util.load_header(data_bytes)
            sk.sendall(data_bytes)
            result, header_str = socket_util.recv_data(sk)
            result = json.loads(result)
            if result["code"] != 0:
                raise Exception(result['msg'])
        finally:
            sk.close()
    def send_response(self, data, _client_id, _request_id):
        data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id,
                                 "request_id": _request_id}).encode('utf-8')
        for i in range(3):
            try:
                self.__send_response(data_bytes)
                # print("发送数据成功")
                break
            except Exception as e1:
                logging.exception(e1)
    # 撤长期没有成交的单
    def __cancel_not_deal_order(self, code, order_ref, timeout=3):
        time.sleep(timeout)
        # 撤买单
        huaxin_trade_api.cancel_order(1, code, "", orderRef=order_ref)
    # 交易
    def OnTrade(self, client_id, request_id, data):
        try:
            trade_type = data["trade_type"]
            if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER:
                code = data["code"]
                direction = data["direction"]
                volume = data["volume"]
                price_type = data["price_type"]
                price = data["price"]
                sinfo = data["sinfo"]
                if direction == 2:
                    # price_type: 0-价格笼子 1-跌停价  2-涨停价 3-现价 4-买5价
                    async_log_util.info(logger_trade, f"API卖: 接收数据-{data}")
                    current_price = TradeServerProcessor.get_l1_current_price(code)
                    limit_down_price = gpcode_manager.get_limit_down_price(code)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    order_ref = huaxin_util.create_order_ref()
                    try:
                        result = huaxin_sell_util.start_sell(code, volume, price_type, limit_up_price, limit_down_price,
                                                             current_price, blocking=True, request_id=request_id,
                                                             order_ref=order_ref)
                        async_log_util.info(logger_trade, f"API卖结果: {result}")
                        self.send_response(result, client_id, request_id)
                    except Exception as e:
                        if str(e).find("超时") >= 0:
                            self.send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id)
                        else:
                            raise e
                else:
                    if not price:
                        if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") < 0:
                            # 开盘之前
                            limit_down_price = gpcode_manager.get_limit_down_price(code)
                            if not limit_down_price:
                                raise Exception("尚未获取跌停价")
                            # 比跌停价高1分
                            price = round(float(limit_down_price) + 0.01, 2)
                        else:
                            # 开盘之后
                            # 没有传入价格,以最新价买入
                            current_price = TradeServerProcessor.get_l1_current_price(code)
                            if not current_price:
                                raise Exception("尚未获取到现价")
                            # 获取买1金额
                            price = round(float(current_price), 2)
                            buy1_info = TradeServerProcessor.current_buy1_dict.get(code)
                            if buy1_info and buy1_info[0] * buy1_info[1] > 50 * 10000:
                                # 如果买1在50w以上就加一档
                                price += 0.01
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            if limit_up_price and price > float(limit_up_price):
                                price = round(float(limit_up_price), 2)
                        order_ref = huaxin_util.create_order_ref()
                        result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type,
                                                        sinfo=sinfo, order_ref=order_ref,
                                                        blocking=True, request_id=request_id)
                        # 2s内没成交就撤单
                        self.__cancel_sell_thread_pool.submit(self.__cancel_not_deal_order, code, order_ref)
                    else:
                        result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type,
                                                        sinfo=sinfo,
                                                        blocking=True, request_id=request_id)
                    self.send_response({"code": 0, "data": result}, client_id, request_id)
            elif trade_type == outside_api_command_manager.TRADE_TYPE_CANCEL_ORDER:
                # print("手动撤单:", data)
                code = data["code"]
                direction = data["direction"]
                accountID = data["accountID"]
                orderSysID = data["orderSysID"]
                sinfo = data["sinfo"]
                if orderSysID:
                    result = huaxin_trade_api.cancel_order(direction, code, orderSysID, sinfo=sinfo,
                                                           blocking=True, request_id=request_id)
                    self.send_response({"code": 0, "data": result}, client_id, request_id)
                elif code:
                    msg_list = []
                    try:
                        sell_count = 0
                        sell_orders = huaxin_trade_order_processor.TradeResultProcessor.get_huaxin_sell_order_by_code(
                            code)
                        if sell_orders:
                            for sell_order in sell_orders:
                                if huaxin_util.is_can_cancel(sell_order.orderStatus):
                                    sell_count += 1
                                    huaxin_trade_api.cancel_order(direction, code, sell_order.orderRef, blocking=False)
                        msg_list.append(f"撤卖单数量:{sell_count}")
                    except Exception as e:
                        logger_debug.exception(e)
                    can_cancel = l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤单")
                    if not can_cancel:
                        msg_list.append(f"无法撤买单")
                    else:
                        msg_list.append(f"已撤买单")
                    self.send_response({"code": 0, "data": {"code": 0, "msg": ";".join(msg_list)}}, client_id,
                                       request_id)
        except Exception as e:
            logger_debug.exception(e)
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 交易状态
    def OnTradeState(self, client_id, request_id, data):
        try:
            operate = data["operate"]
            if operate == outside_api_command_manager.OPERRATE_SET:
                state = data["state"]
                if state:
                    trade_manager.TradeStateManager().open_buy()
                else:
                    trade_manager.TradeStateManager().close_buy()
                self.send_response({"code": 0, "msg": ("开启成功" if state else "关闭成功")}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_GET:
                can_buy = trade_manager.TradeStateManager().is_can_buy_cache()
                self.send_response({"code": 0, "data": {"can_buy": can_buy}}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 交易模式
    def OnTradeMode(self, client_id, request_id, data):
        try:
            operate = data["operate"]
            if operate == outside_api_command_manager.OPERRATE_SET:
                mode = data["mode"]
                TradeTargetCodeModeManager().set_mode(mode)
                self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_GET:
                mode = TradeTargetCodeModeManager().get_mode_cache()
                self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnSellRule(self, client_id, request_id, data):
        try:
            operate = data["operate"]
            if operate == outside_api_command_manager.OPERRATE_ADD:
                data = data["data"]
                code = data["code"]
                type = data["type"]
                buy1_price = data.get("buy1_price")
                if not buy1_price:
                    buy1_price = gpcode_manager.get_limit_up_price(code)
                    if not buy1_price:
                        raise Exception("尚未获取到涨停价")
                rule = SellRule(code=data["code"], buy1_volume=data["buy1_volume"], buy1_price=buy1_price,
                                sell_volume=data.get("sell_volume"), sell_price_type=data.get("sell_price_type"),
                                end_time=data["end_time"], type=type)
                TradeRuleManager().add_rule(rule)
                self.send_response({"code": 0, "data": {}}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_SET:
                data = data["data"]
                code = data["code"]
                buy1_price = data.get("buy1_price")
                if not buy1_price:
                    buy1_price = gpcode_manager.get_limit_up_price(code)
                    if not buy1_price:
                        raise Exception("尚未获取到涨停价")
                rule = SellRule(id_=data["id"], code=data["code"], buy1_volume=data["buy1_volume"],
                                buy1_price=buy1_price,
                                sell_volume=data.get("sell_volume"), sell_price_type=data.get("sell_price_type"),
                                end_time=data["end_time"])
                TradeRuleManager().update_rule(rule)
                self.send_response({"code": 0, "data": {}}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_DELETE:
                data = data["data"]
                TradeRuleManager().del_rule(data["id"])
                self.send_response({"code": 0, "data": {}}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_GET:
                rules = TradeRuleManager().list_rules()
                fresults = []
                for rule in rules:
                    fresults.append(rule.to_dict())
                self.send_response({"code": 0, "data": fresults}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
        pass
    # 代码名单
    def OnCodeList(self, client_id, request_id, data):
        try:
            code_list_type = data["code_list_type"]
            operate = data["operate"]
            code = data.get("code")
            fresult = {"code": 0}
            if code_list_type == outside_api_command_manager.CODE_LIST_WANT:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.WantBuyCodesManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.WantBuyCodesManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_BLACK:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    # 先手动撤单
                    try:
                        l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动拉黑")
                    except Exception as e:
                        logger_debug.exception(e)
                    l2_trade_util.forbidden_trade(code, msg="手动加入 trade_server")
                    WantBuyCodesManager().remove_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    l2_trade_util.remove_from_forbidden_trade_codes(code)
                    WantBuyCodesManager().add_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.BlackListCodeManager().list_codes_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_WHITE:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.WhiteListCodeManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.WhiteListCodeManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.WhiteListCodeManager().list_codes_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_PAUSE_BUY:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.PauseBuyCodesManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.PauseBuyCodesManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.PauseBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_MUST_BUY:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.MustBuyCodesManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.MustBuyCodesManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.MustBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            self.send_response(fresult, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnExportL2(self, client_id, request_id, data):
        try:
            code = data["code"]
            excel_file_name = data_export_util.export_l2_excel(code)
            # print("导出L2数据目录:", excel_file_name)
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnEveryDayInit(self, client_id, request_id, data):
        try:
            inited_data.everyday_init()
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnRefreshTradeData(self, client_id, request_id, data):
        try:
            sync_type = data["ctype"]
            if sync_type == "delegate_list":
                huaxin_trade_data_update.add_delegate_list("API主动请求")
            elif sync_type == "deal_list":
                huaxin_trade_data_update.add_deal_list()
            elif sync_type == "money":
                huaxin_trade_data_update.add_money_list()
            elif sync_type == "position_list":
                huaxin_trade_data_update.add_position_list()
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetCodeAttribute(self, client_id, request_id, data):
        try:
            code = data["code"]
            # 查询是否想买单/白名单/黑名单/暂不买
            code_name = gpcode_manager.get_code_name(code)
            want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
            white = gpcode_manager.WhiteListCodeManager().is_in_cache(code)
            black = l2_trade_util.is_in_forbidden_trade_codes(code)
            pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
            desc_list = []
            if want:
                desc_list.append("【想买单】")
            if white:
                desc_list.append("【白名单】")
            if black:
                desc_list.append("【黑名单】")
            if pause_buy:
                desc_list.append("【暂不买】")
            result = {"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetCodeTradeState(self, client_id, request_id, data):
        try:
            code = data["code"]
            state = trade_manager.CodesTradeStateManager().get_trade_state(code)
            result = {"code": 0, "data": {"state": state}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetEnvInfo(self, client_id, request_id, data):
        try:
            fdata = {}
            try:
                date = JueJinApi.get_previous_trading_date(tool.get_now_date_str())
                if date:
                    fdata["juejin"] = 1
            except Exception as e:
                fdata["juejin"] = 0
            fdata["kpl"] = {}
            # 获取开盘啦数据
            kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value,
                         KPLDataType.INDUSTRY_RANK.value]
            for kpl_type in kpl_types:
                if kpl_type in KPLDataManager.kpl_data_update_info:
                    fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type)
            try:
                # 验证redis
                RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test")
                fdata["redis"] = 1
            except:
                fdata["redis"] = 0
            try:
                # 验证mysql
                mysql_data.Mysqldb().select_one("select 1")
                fdata["mysql"] = 1
            except:
                fdata["mysql"] = 0
            try:
                # redis异步任务数量
                fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count()
            except:
                pass
            # 获取交易通道
            try:
                can_access = huaxin_trade_api.test_trade_channel()
                fdata["trade_channel_access"] = 1 if can_access else 0
            except Exception as e:
                logger_debug.exception(e)
                fdata["trade_channel_access"] = 0
            # 获取CPU与内存适用情况
            memory_info = psutil.virtual_memory()
            cpu_percent = psutil.cpu_percent(interval=1)
            fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent}
            logger_device.info(fdata["device"])
            # 获取今日自由流通量的更新
            try:
                count = ZYLTGBUtil.count_today_updated_volume_codes()
                fdata["today_zylt_updated_count"] = count
            except Exception as e:
                logger_debug.exception(e)
                fdata["today_zylt_updated_count"] = -1
            # 获取交易通道
            result = {"code": 0, "data": fdata, "msg": ""}
            # print("OnGetEnvInfo 成功")
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 同步L2订阅代码
    def OnSyncL2SubscriptCodes(self, client_id, request_id):
        logger_debug.debug("OnSyncL2SubscriptCodes")
        try:
            codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes()
            if codes_sh and codes_sz:
                l1_subscript_codes_manager.save_codes(codes_sh, codes_sz)
            result = {"code": 0, "data": {"codes_sh": len(codes_sh), "codes_sz": len(codes_sz)}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            logger_debug.error(e)
    def OnSystemLog(self, client_id, request_id, data):
        try:
            start_index = data["start_index"]
            count = data["count"]
            # 读取系统日志
            logs_data = log_export.load_system_log()
            total_count = len(logs_data)
            if start_index >= 0:
                logs_data = logs_data[start_index:start_index + count]
            result = {"code": 0, "data": {"total_count": total_count, "list": logs_data}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            logger_debug.error(e)
    def OnGetFromDataServer(self, client_id, request_id, data):
        path = data["path"]
        params = data["params"]
        params_strs = []
        if params:
            for k in params:
                params_strs.append(f"{k}={params[k]}")
        if params_strs:
            path += "?"
            path += "&".join(params_strs)
        try:
            # 获取参数
            response = requests.get(f"http://127.0.0.1:9004{path}")
            if response.status_code == 200:
                self.send_response(response.text, client_id, request_id)
            else:
                self.send_response(json.dumps({"code": 1, "msg": f"网络请求状态错误:{response.status_code}"}), client_id,
                                   request_id)
        except:
            self.send_response(json.dumps({"code": 1, "msg": "网络请求出错"}), client_id, request_id)
    # 代码的交易信息
    def OnGetCodeTradeInfo(self, client_id, request_id, data):
        try:
            code = data["code"]
            # 获取交易信息,
            # 获取正在成交的位置/获取下单位置/获取成交速率
            total_datas = l2_data_util.local_today_datas.get(code)
            if total_datas is None:
                total_datas = []
            trade_index, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code)
            trade_speed = transaction_progress.TradeBuyQueue().get_trade_speed(code)
            # 下单位置
            place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code)
            fdata = {}
            if trade_index and place_order_index:
                # 有成交进度位与下单位
                total_count = 0
                total_money = 0
                big_money_300_indexs = []
                big_money_200_indexs = []
                for i in range(trade_index, place_order_index):
                    data = total_datas[i]
                    val = data["val"]
                    if not L2DataUtil.is_limit_up_price_buy(val):
                        continue
                    # 是否已经撤单
                    left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                             total_datas,
                                                                                                             l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                                 code))
                    if left_count <= 0:
                        continue
                    total_count += left_count
                    money = val["num"] * int(val["price"] * 100)
                    total_money += money
                    if money >= 300 * 10000:
                        big_money_300_indexs.append(i)
                    elif money >= 200 * 10000:
                        big_money_200_indexs.append(i)
                fdata["waiting_for_trade"] = f"{total_count}笔&{output_util.money_desc(total_money)}"
                total_count = 0
                total_money = 0
                for i in big_money_300_indexs:
                    data = total_datas[i]
                    val = data["val"]
                    total_count += 1
                    money = val["num"] * int(val["price"] * 100)
                    total_money += money
                fdata["big_num_300"] = {"desc": f"{total_count}笔&{output_util.money_desc(total_money)}",
                                        "datas": [output_util.format_l2_data(total_datas[x]) for x in
                                                  big_money_300_indexs]}
                total_count = 0
                total_money = 0
                for i in big_money_200_indexs:
                    data = total_datas[i]
                    val = data["val"]
                    total_count += 1
                    money = val["num"] * int(val["price"] * 100)
                    total_money += money
                fdata["big_num_200"] = {"desc": f"{total_count}笔&{output_util.money_desc(total_money)}",
                                        "datas": [output_util.format_l2_data(total_datas[x]) for x in
                                                  big_money_200_indexs]}
                if trade_speed:
                    seconds = int(total_money / trade_speed)
                    h = seconds // 3600
                    m = seconds % 3600 // 60
                    s = seconds % 60
                    fdata["trade_speed"] = f"{trade_speed}元/秒"
                    fdata["trade_use_time"] = "{:0>2d}:{:0>2d}:{:0>2d}".format(h, m, s)
                    fdata["trade_time"] = tool.trade_time_add_second(tool.get_now_time_str(), seconds)
            result = {"code": 0, "data": fdata}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id)
    def OnGetActiveListenCount(self, client_id, request_id):
        try:
            order = 0  # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_ORDER)
            transaction = 0  # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_TRANSACTION)
            market = 0  # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_MARKET)
            result = {"code": 0, "data": {"order": order, "transaction": transaction, "market": market}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id)
    def OnSaveRunningData(self, client_id, request_id):
        try:
            inited_data.save_running_data()
        except Exception as e:
            logging.exception(e)
            self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id)
    def OnGetCodePositionInfo(self, client_id, request_id, data):
        code = data.get("code")
        __start_time = time.time()
        try:
            if not tool.is_can_buy_code(code):
                raise Exception("非主板代码")
            # 获取代码基本信息
            # 查询是否想买单/白名单/黑名单/暂不买
            code_name = gpcode_manager.get_code_name(code)
            want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
            white = gpcode_manager.WhiteListCodeManager().is_in_cache(code)
            black = l2_trade_util.is_in_forbidden_trade_codes(code)
            pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
            desc_list = []
            if want:
                desc_list.append("【想买单】")
            if white:
                desc_list.append("【白名单】")
            if black:
                desc_list.append("【黑名单】")
            if pause_buy:
                desc_list.append("【暂不买】")
            # 获取持仓
            positions = PositionManager.latest_positions
            trade_rules_count = len(TradeRuleManager().list_can_excut_rules_cache())
            fdata = {"code": code, "total": 0, "available": 0, "sell_orders": [], "sell_rules_count": trade_rules_count,
                     "cost_price": 0, "cost_price_rate": 0,
                     "code_info": (code, code_name), "desc": "".join(desc_list)}
            if positions:
                for d in positions:
                    code_name = gpcode_manager.get_code_name(d["securityID"])
                    if not code_name:
                        # 判断是否有名称
                        results = HistoryKDatasUtils.get_gp_codes_names([d["securityID"]])
                        threading.Thread(
                            target=CodesNameManager.add_first_code_name(d["securityID"],
                                                                        results[d["securityID"]])).start()
                    if d["prePosition"] <= 0:
                        continue
                    if d["securityID"] != code:
                        continue
                    fdata["total"] = d["prePosition"]
                    fdata["available"] = d["availablePosition"]
                    fdata["cost_price"] = round(float(d["historyPosPrice"]), 2)
                    deal_order_system_id_infos = {}
                    # 获取已经卖的单数
                    deal_list = self.__DealRecordManager.list_sell_by_code_cache(code)
                    if deal_list:
                        for d in deal_list:
                            if d["orderSysID"] not in deal_order_system_id_infos:
                                deal_order_system_id_infos[d["orderSysID"]] = [d["volume"], d["tradeTime"]]
                            else:
                                deal_order_system_id_infos[d["orderSysID"]][0] += d["volume"]
                    # 获取9:30之前的卖委托
                    current_delegates = DelegateRecordManager().list_current_delegates(code)
                    if current_delegates:
                        for d in current_delegates:
                            if d["orderSysID"] not in deal_order_system_id_infos:
                                deal_order_system_id_infos[d["orderSysID"]] = [d["volume"], d["insertTime"]]
                    deal_list = [deal_order_system_id_infos[k] for k in deal_order_system_id_infos]
                    deal_list.sort(key=lambda x: x[1])
                    # TODO 测试
                    # deal_list.clear()
                    # fdata["available"] = fdata["total"]
                    fdata["sell_orders"] = [k[0] for k in deal_list]
                    break
            # 有现价就获取现价
            current_price = TradeServerProcessor.get_l1_current_price(code)
            if current_price:
                fdata["cost_price"] = current_price
                pre_close_price = CodePrePriceManager.get_price_pre_cache(code)
                if current_price and pre_close_price:
                    rate = round((float(current_price) - float(pre_close_price)) / float(pre_close_price), 4)
                    fdata["cost_price_rate"] = rate
                # 获取涨幅
            async_log_util.info(logger_trade_position_api_request, f"{fdata}")
            result = {"code": 0, "data": fdata}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response({"code": 1, "msg": f"数据处理出错:{e}"}, client_id, request_id)
        finally:
            use_time = time.time() - __start_time
            if use_time > 0.01:
                # 耗时10ms以上才记录日志
                async_log_util.info(logger_trade_position_api_request, f"{code}请求持仓耗时:{use_time * 1000}ms")
    def OnCommonRequest(self, client_id, request_id, data):
        # 通用请求
        ctype = data["ctype"]
        __start_time = time.time()
        try:
            if ctype == "get_sell_result":
                order_ref = data["order_ref"]
                order_entity = huaxin_trade_order_processor.TradeResultProcessor.get_huaxin_order_by_order_ref(
                    order_ref)
                result = {}
                if not order_entity:
                    result = {"code": 1, "msg": f"没有获取到订单状态"}
                else:
                    code_name = gpcode_manager.get_code_name(order_entity.code)
                    result = {}
                    if huaxin_util.is_canceled(order_entity.orderStatus):
                        result = {"code": 0,
                                  "data": {"orderStatus": order_entity.orderStatus, "code": order_entity.code,
                                           "msg": f"【{order_entity.code}({code_name})】已撤单"}}
                    elif huaxin_util.is_deal(order_entity.orderStatus):
                        result = {"code": 0,
                                  "data": {"orderStatus": order_entity.orderStatus, "code": order_entity.code,
                                           "msg": f"【{order_entity.code}({code_name})】已经成交"}}
                    else:
                        result = {"code": 0,
                                  "data": {"orderStatus": order_entity.orderStatus, "code": order_entity.code,
                                           "msg": f"【{order_entity.code}({code_name})】已挂单"}}
                self.send_response(result, client_id, request_id)
            elif ctype == "get_position_codes":
                # 获取今日可卖的持仓代码
                codes = PositionManager.get_position_codes()
                result = {"code": 0,
                          "data": codes}
                self.send_response(result, client_id, request_id)
            elif ctype == "market_situation":
                try:
                    operate = data["operate"]
                    if operate == outside_api_command_manager.OPERRATE_SET:
                        situation = data["situation"]
                        MarketSituationManager().set_situation(situation)
                        self.send_response({"code": 0, "data": {"situation": situation}}, client_id, request_id)
                    elif operate == outside_api_command_manager.OPERRATE_GET:
                        situation = MarketSituationManager().get_situation_cache()
                        self.send_response({"code": 0, "data": {"situation": situation}}, client_id, request_id)
                except Exception as e:
                    self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
            elif ctype == "get_kpl_limit_up_datas":
                # 获取开盘啦涨停队列
                try:
                    datas = kpl_data_manager.KPLDataManager.get_from_file(kpl_util.KPLDataType.LIMIT_UP,
                                                                          tool.get_now_date_str())
                    self.send_response({"code": 0, "data": datas}, client_id, request_id)
                except Exception as e:
                    self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
            elif ctype == "get_delegated_buy_code_infos":
                account_available_money = trade_manager.AccountAvailableMoneyManager().get_available_money_cache()
                # 获取委托中的代码
                # current_delegates = huaxin_trade_record_manager.DelegateRecordManager().list_current_delegates()
                current_delegates, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
                    tool.get_now_date_str("%Y%m%d"), None,
                    [huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded])
                fdatas = []
                if current_delegates:
                    for c in current_delegates:
                        try:
                            if int(c["direction"]) != huaxin_util.TORA_TSTP_D_Buy:
                                continue
                            code = c["securityID"]
                            orderSysID = c.get("orderSysID")
                            code_name = gpcode_manager.get_code_name(code)
                            # 获取下单位置信息
                            order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code)
                            if order_begin_pos is None or order_begin_pos.buy_single_index is None:
                                continue
                            l2_data_util.load_l2_data(code)
                            total_datas = l2_data_util.local_today_datas.get(code)
                            trade_index, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code)
                            if trade_index is None:
                                trade_index = 0
                            # 下单位置
                            place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code)
                            # 计算信号位置到真实下单位置的总买(不管是否已撤)
                            total_nums = 0
                            for i in range(order_begin_pos.buy_single_index, place_order_index):
                                data = total_datas[i]
                                val = data["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                total_nums += val["num"]
                            # 计算已成交/已撤单的数量
                            deal_or_cancel_num = 0
                            for i in range(order_begin_pos.buy_single_index, trade_index + 1):
                                data = total_datas[i]
                                val = data["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                deal_or_cancel_num += val["num"]
                            # 获取剩下的笔数
                            total_left_count = 0
                            total_left_num = 0
                            for i in range(trade_index + 1, place_order_index):
                                data = total_datas[i]
                                val = data["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                if val["num"] * float(val["price"]) < 5000:
                                    continue
                                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
                                    code,
                                    i,
                                    total_datas,
                                    l2_data_util.local_today_canceled_buyno_map.get(
                                        code))
                                if left_count > 0:
                                    total_left_count += left_count
                                    total_left_num += val["num"] * left_count
                            # 获取正在成交
                            dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code)
                            if dealing_info:
                                if str(total_datas[trade_index]["val"]["orderNo"]) == str(dealing_info[0]):
                                    total_left_num += (total_datas[trade_index]["val"]["num"] - dealing_info[1] // 100)
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            buy1_money = Buy1PriceManager().get_latest_buy1_money(code)
                            if buy1_money is None:
                                buy1_money = 0
                            # 获取已经成交的大单数量
                            total_big_num = 0
                            total_big_count = 0
                            is_ge_code = tool.is_ge_code(code)
                            for i in range(0, trade_index):
                                val = total_datas[i]["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                # 是不是大单
                                if not l2_data_util_old.is_big_money(val, is_ge_code):
                                    continue
                                canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(
                                    code,
                                    i,
                                    total_datas,
                                    l2_data_util.local_today_canceled_buyno_map.get(
                                        code))
                                if not canceled_data:
                                    total_big_count += 1
                                else:
                                    total_big_num -= canceled_data["val"]["num"]
                                total_big_num += val["num"]
                            not_deal_total_big_num_pre = 0
                            not_deal_total_big_count_pre = 0
                            not_deal_total_big_num_after = 0
                            not_deal_total_big_count_after = 0
                            is_ge_code = tool.is_ge_code(code)
                            for i in range(trade_index, total_datas[-1]["index"] + 1):
                                val = total_datas[i]["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                # 是不是大单
                                if not l2_data_util_old.is_big_money(val, is_ge_code):
                                    continue
                                canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(
                                    code,
                                    i,
                                    total_datas,
                                    l2_data_util.local_today_canceled_buyno_map.get(
                                        code))
                                if not canceled_data:
                                    if i < place_order_index:
                                        not_deal_total_big_count_pre += 1
                                    else:
                                        not_deal_total_big_count_after += 1
                                else:
                                    if i < place_order_index:
                                        not_deal_total_big_num_pre -= canceled_data["val"]["num"]
                                    else:
                                        not_deal_total_big_num_after -= canceled_data["val"]["num"]
                                if i < place_order_index:
                                    not_deal_total_big_num_pre += val["num"]
                                else:
                                    not_deal_total_big_num_after += val["num"]
                            real_place_order_after_count = 0
                            real_place_order_after_num = 0
                            is_ge_code = tool.is_ge_code(code)
                            # 统计真实下单位置后面未撤的金额
                            for i in range(place_order_index, total_datas[-1]["index"]):
                                val = total_datas[i]["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                # 是不是大单
                                if not l2_data_util_old.is_big_money(val, is_ge_code):
                                    continue
                                canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(
                                    code,
                                    i,
                                    total_datas,
                                    l2_data_util.local_today_canceled_buyno_map.get(
                                        code))
                                if not canceled_data:
                                    real_place_order_after_count += 1
                                    real_place_order_after_num += val["num"]
                            # 获取当日的量比
                            volume_rate = code_volumn_manager.get_volume_rate(code)
                            # 是否需要注意
                            need_pay_attention = (total_left_count <= 10 or total_left_num * float(
                                limit_up_price) * 100 < 1500 * 10000) and (
                                                         real_place_order_after_count <= 10 or real_place_order_after_num * float(
                                                     limit_up_price) * 100 < 1500 * 10000)
                            # 统计真实下单位是否距离大单位置过近
                            is_near_big_order = False
                            try:
                                count = 0
                                for i in range(place_order_index - 1, -1, -1):
                                    data = total_datas[i]
                                    val = data["val"]
                                    if not L2DataUtil.is_limit_up_price_buy(val):
                                        continue
                                    money = val["num"] * float(val["price"])
                                    if money < 50 * 100:
                                        continue
                                    left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
                                        code,
                                        i,
                                        total_datas,
                                        l2_data_util.local_today_canceled_buyno_map.get(
                                            code))
                                    if left_count <= 0:
                                        continue
                                    if money >= 299 * 100:
                                        if count < 1:
                                            is_near_big_order = True
                                    else:
                                        count += 1
                                        if count >= 1:
                                            break
                            except:
                                pass
                            fdata = {"id": orderSysID, "code_info": (code, code_name), "total_num": total_nums,
                                     "finish_num": deal_or_cancel_num,
                                     "buy1_money": output_util.money_desc(buy1_money),
                                     "big_num_count": total_big_count,
                                     "big_num_money": output_util.money_desc(
                                         total_big_num * float(limit_up_price) * 100),
                                     "not_deal_big_num_count": (
                                         not_deal_total_big_count_pre, not_deal_total_big_count_after),
                                     "not_deal_big_num_money": (output_util.money_desc(
                                         not_deal_total_big_num_pre * float(limit_up_price) * 100),
                                                                output_util.money_desc(
                                                                    not_deal_total_big_num_after * float(
                                                                        limit_up_price) * 100)),
                                     "left_count": total_left_count,
                                     "volume_rate": volume_rate,
                                     "left_money": output_util.money_desc(total_left_num * float(limit_up_price) * 100),
                                     "pay_attention": need_pay_attention,
                                     "trade_progress_percent": round(
                                         total_left_num * float(limit_up_price) * 100 * 100 / buy1_money, 2),  # 成交进度比例
                                     "limit_up_price": float(gpcode_manager.get_limit_up_price(code)),
                                     "is_near_big_order": is_near_big_order,
                                     "block": '',
                                     "trade_queue": []
                                     }
                            limit_up_data = kpl_data_manager.KPLLimitUpDataRecordManager.record_code_dict.get(code)
                            # 获取当前板块
                            try:
                                can_buy_result = CodePlateKeyBuyManager.can_buy(code)
                                if can_buy_result:
                                    if can_buy_result[0]:
                                        fdata['block'] = ",".join(
                                            [f"{x[0]}-{x[1] + 1}({x[2]}&{x[3] - x[2]})" for x in can_buy_result[0]])
                                    else:
                                        if can_buy_result[1]:
                                            if limit_up_data:
                                                fdata['block'] = f"{limit_up_data[5]}-独苗"
                                            else:
                                                fdata['block'] = f"独苗"
                            except:
                                pass
                            # 获取涨停时间
                            if limit_up_data:
                                fdata['limit_up_time'] = tool.to_time_str(limit_up_data[2])
                                # 获取委托队列
                            try:
                                real_place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code)
                                if real_place_order_index is not None:
                                    trade_queue = l2_output_util.get_trade_queue_at_near_place_order(code,
                                                                                                     real_place_order_index,
                                                                                                     9)
                                    fdata['trade_queue'] = trade_queue
                                # 自由流通股本
                                zyltgb = global_util.zyltgb_map.get(code)
                                if zyltgb is not None:
                                    fdata['zyltgb'] = output_util.money_desc(zyltgb)
                            except:
                                pass
                            fdatas.append(fdata)
                        except Exception as e:
                            logger_debug.exception(e)
                result = {"code": 0, "data": {"account_available_money": account_available_money, "delegates": fdatas}}
                self.send_response(result, client_id, request_id)
            elif ctype == "set_real_place_order_index":
                # 设置真实下单位置
                code = data["code"]
                real_order_index = data["index"]
                order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code)
                if order_begin_pos is None or order_begin_pos.buy_exec_index is None or order_begin_pos.buy_exec_index < 0:
                    raise Exception("尚未下单")
                cancel_buy_strategy.set_real_place_position(code, real_order_index,
                                                            buy_single_index=order_begin_pos.buy_single_index,
                                                            is_default=False)
                # 更新日志
                async_log_util.info(logger_real_place_order_position, f"真实下单位置(矫正):{code}-{real_order_index}")
                result = {"code": 0, "data": {}}
                self.send_response(result, client_id, request_id)
            elif ctype == "get_positions":
                # 获取所有持仓信息
                positions = PositionManager.latest_positions
                fdatas = []
                if positions:
                    for d in positions:
                        code_ = d["securityID"]
                        code_name = gpcode_manager.get_code_name(d["securityID"])
                        if not code_name:
                            # 判断是否有名称
                            results = HistoryKDatasUtils.get_gp_codes_names([code_])
                            threading.Thread(
                                target=CodesNameManager.add_first_code_name(code_,
                                                                            results[code_])).start()
                        if d["prePosition"] <= 0:
                            continue
                        fdatas.append({"code": code_, "code_name": code_name, "total": d["prePosition"],
                                       "available": d["availablePosition"]})
                result = {"code": 0, "data": fdatas}
                self.send_response(result, client_id, request_id)
            elif ctype == "set_code_sell_way":
                # 设置卖出方式
                # mode : 1-均分  2-百分比
                sell_manager.set_code_sell_way(data)
                result = {"code": 0, "data": {}}
                self.send_response(result, client_id, request_id)
            elif ctype == "get_buy1_info":
                # 获取代码的买1信息
                code = data["code"]
                results = HistoryKDatasUtils.get_gp_current_info([code])
                item = results[0]["quotes"][0]
                result = {"code": 0, "data": {"price": item["bid_p"], "volume": item["bid_v"]}}
                self.send_response(result, client_id, request_id)
            elif ctype == "auto_cancel_sell_mode":
                try:
                    operate = data["operate"]
                    if operate == outside_api_command_manager.OPERRATE_SET:
                        mode = data["mode"]
                        AutoCancelSellModeManager().set_mode(mode)
                        self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
                    elif operate == outside_api_command_manager.OPERRATE_GET:
                        sell_mode = AutoCancelSellModeManager().get_mode()
                        self.send_response({"code": 0, "data": {"mode": sell_mode}}, client_id, request_id)
                except Exception as e:
                    self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
            elif ctype == "set_per_code_buy_money":
                # 设置单只票的买入金额
                money = data["money"]
                if money > 30000:
                    raise Exception("最多只能设置3w")
                constant.BUY_MONEY_PER_CODE = money
                self.send_response({"code": 0, "data": {"money": constant.BUY_MONEY_PER_CODE}}, client_id, request_id)
            elif ctype == "get_per_code_buy_money":
                self.send_response({"code": 0, "data": {"money": constant.BUY_MONEY_PER_CODE}}, client_id, request_id)
            elif ctype == "repaire_task":
                # 修复任务
                kpl_data_manager.PullTask.repaire_pull_task()
                self.send_response({"code": 0, "data": {}}, client_id, request_id)
            elif ctype == "get_trade_queue":
                code = data["code"]
                count = data.get("count")
                if count is None:
                    count = 100
                real_place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code)
                trade_queue = l2_output_util.get_trade_queue(code, real_place_order_index, count)
                self.send_response({"code": 0, "data": trade_queue}, client_id, request_id)
            elif ctype == "get_deal_big_money_list":
                # 获取大单成交列表
                code = data["code"]
                data_list = BigOrderDealManager().get_total_buy_money_list(code)
                bigger_money = l2_data_util_old.get_big_money_val(float(gpcode_manager.get_limit_up_price(code)), tool.is_ge_code(code))
                fdatas = []
                for d in data_list:
                    if d < bigger_money:
                        continue
                    fdatas.append(d)
                results = [output_util.money_desc(d) for d in fdatas]
                self.send_response({"code": 0, "data": results}, client_id, request_id)
            elif ctype == "refresh_zylt_volume":
                update_count = zyltgb_util.update_all_zylt_volumes()
                self.send_response({"code": 0, "data": {}, "msg": f"更新代码数量:{update_count}"}, client_id, request_id)
            elif ctype == "get_today_updated_zylt_volume_count":
                # 获取今日已经更新的自由流通量的代码数量
                count = ZYLTGBUtil.count_today_updated_volume_codes()
                self.send_response({"code": 0, "data": {"count": count}}, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response({"code": 1, "msg": f"数据处理出错:{e}"}, client_id, request_id)
        finally:
            use_time = time.time() - __start_time
            if use_time > 5:
                logger_request_api.info(f"common_request请求时间过长,ctype-{ctype}")
class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack):
trade/huaxin/outside_api_command_callback.py
New file
@@ -0,0 +1,1176 @@
import concurrent.futures
import json
import logging
import threading
import time
import psutil
import requests
import huaxin_client.constant
import constant
import inited_data
import outside_api_command_manager
from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer
from code_attribute import gpcode_manager, code_volumn_manager, zyltgb_util
from code_attribute.code_data_util import ZYLTGBUtil
from code_attribute.code_l1_data_manager import L1DataManager
from code_attribute.gpcode_manager import CodePrePriceManager, CodesNameManager, WantBuyCodesManager
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l1_subscript_codes_manager
from huaxin_client.client_network import SendResponseSkManager
from l2 import l2_data_manager_new, l2_data_util, transaction_progress, \
    l2_data_source_util, cancel_buy_strategy
from l2.code_price_manager import Buy1PriceManager
from l2.l2_data_manager import TradePointManager
from l2.l2_data_util import L2DataUtil
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager, BigOrderDealManager
from log_module import async_log_util, log_export
from log_module.log import logger_debug, \
    logger_trade, logger_trade_position_api_request, logger_request_api, \
    logger_real_place_order_position, logger_device
from output import l2_output_util
from third_data import kpl_data_manager, kpl_util, history_k_data_manager
from third_data.code_plate_key_manager import CodePlateKeyBuyManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager
from third_data.kpl_util import KPLDataType
from trade import trade_manager, l2_trade_util
import l2_data_util as l2_data_util_old
from trade.huaxin import huaxin_trade_api, huaxin_trade_data_update, \
    huaxin_trade_record_manager, huaxin_trade_order_processor, huaxin_sell_util
from trade.huaxin.huaxin_trade_record_manager import PositionManager, DealRecordManager, DelegateRecordManager
from trade.sell import sell_manager
from trade.sell.sell_rule_manager import TradeRuleManager, SellRule
from trade.trade_manager import TradeTargetCodeModeManager, AutoCancelSellModeManager
from settings.trade_setting import MarketSituationManager
from utils import socket_util, data_export_util, tool, huaxin_util, output_util, global_util
class OutsideApiCommandCallback(outside_api_command_manager.ActionCallback):
    __cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
    __DealRecordManager = DealRecordManager()
    __code_sell_way_dict = {}
    @classmethod
    def __send_response(cls, data_bytes):
        sk = SendResponseSkManager.create_send_response_sk(addr=huaxin_client.constant.SERVER_IP,
                                                           port=huaxin_client.constant.SERVER_PORT)
        try:
            data_bytes = socket_util.load_header(data_bytes)
            sk.sendall(data_bytes)
            result, header_str = socket_util.recv_data(sk)
            result = json.loads(result)
            if result["code"] != 0:
                raise Exception(result['msg'])
        finally:
            sk.close()
    def send_response(self, data, _client_id, _request_id):
        data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id,
                                 "request_id": _request_id}).encode('utf-8')
        for i in range(3):
            try:
                self.__send_response(data_bytes)
                # print("发送数据成功")
                break
            except Exception as e1:
                logging.exception(e1)
    # 撤长期没有成交的单
    def __cancel_not_deal_order(self, code, order_ref, timeout=3):
        time.sleep(timeout)
        # 撤买单
        huaxin_trade_api.cancel_order(1, code, "", orderRef=order_ref)
    # 交易
    def OnTrade(self, client_id, request_id, data):
        try:
            trade_type = data["trade_type"]
            if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER:
                code = data["code"]
                direction = data["direction"]
                volume = data["volume"]
                price_type = data["price_type"]
                price = data["price"]
                sinfo = data["sinfo"]
                if direction == 2:
                    # price_type: 0-价格笼子 1-跌停价  2-涨停价 3-现价 4-买5价
                    async_log_util.info(logger_trade, f"API卖: 接收数据-{data}")
                    current_price = L1DataManager.get_l1_current_price(code)
                    limit_down_price = gpcode_manager.get_limit_down_price(code)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    order_ref = huaxin_util.create_order_ref()
                    try:
                        result = huaxin_sell_util.start_sell(code, volume, price_type, limit_up_price, limit_down_price,
                                                             current_price, blocking=True, request_id=request_id,
                                                             order_ref=order_ref)
                        async_log_util.info(logger_trade, f"API卖结果: {result}")
                        self.send_response(result, client_id, request_id)
                    except Exception as e:
                        if str(e).find("超时") >= 0:
                            self.send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id)
                        else:
                            raise e
                else:
                    if not price:
                        if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") < 0:
                            # 开盘之前
                            limit_down_price = gpcode_manager.get_limit_down_price(code)
                            if not limit_down_price:
                                raise Exception("尚未获取跌停价")
                            # 比跌停价高1分
                            price = round(float(limit_down_price) + 0.01, 2)
                        else:
                            # 开盘之后
                            # 没有传入价格,以最新价买入
                            current_price = L1DataManager.get_l1_current_price(code)
                            if not current_price:
                                raise Exception("尚未获取到现价")
                            # 获取买1金额
                            price = round(float(current_price), 2)
                            buy1_info = L1DataManager.current_buy1_dict.get(code)
                            if buy1_info and buy1_info[0] * buy1_info[1] > 50 * 10000:
                                # 如果买1在50w以上就加一档
                                price += 0.01
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            if limit_up_price and price > float(limit_up_price):
                                price = round(float(limit_up_price), 2)
                        order_ref = huaxin_util.create_order_ref()
                        result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type,
                                                        sinfo=sinfo, order_ref=order_ref,
                                                        blocking=True, request_id=request_id)
                        # 2s内没成交就撤单
                        self.__cancel_sell_thread_pool.submit(self.__cancel_not_deal_order, code, order_ref)
                    else:
                        result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type,
                                                        sinfo=sinfo,
                                                        blocking=True, request_id=request_id)
                    self.send_response({"code": 0, "data": result}, client_id, request_id)
            elif trade_type == outside_api_command_manager.TRADE_TYPE_CANCEL_ORDER:
                # print("手动撤单:", data)
                code = data["code"]
                direction = data["direction"]
                accountID = data["accountID"]
                orderSysID = data["orderSysID"]
                sinfo = data["sinfo"]
                if orderSysID:
                    result = huaxin_trade_api.cancel_order(direction, code, orderSysID, sinfo=sinfo,
                                                           blocking=True, request_id=request_id)
                    self.send_response({"code": 0, "data": result}, client_id, request_id)
                elif code:
                    msg_list = []
                    try:
                        sell_count = 0
                        sell_orders = huaxin_trade_order_processor.TradeResultProcessor.get_huaxin_sell_order_by_code(
                            code)
                        if sell_orders:
                            for sell_order in sell_orders:
                                if huaxin_util.is_can_cancel(sell_order.orderStatus):
                                    sell_count += 1
                                    huaxin_trade_api.cancel_order(direction, code, sell_order.orderRef, blocking=False)
                        msg_list.append(f"撤卖单数量:{sell_count}")
                    except Exception as e:
                        logger_debug.exception(e)
                    can_cancel = l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤单")
                    if not can_cancel:
                        msg_list.append(f"无法撤买单")
                    else:
                        msg_list.append(f"已撤买单")
                    self.send_response({"code": 0, "data": {"code": 0, "msg": ";".join(msg_list)}}, client_id,
                                       request_id)
        except Exception as e:
            logger_debug.exception(e)
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 交易状态
    def OnTradeState(self, client_id, request_id, data):
        try:
            operate = data["operate"]
            if operate == outside_api_command_manager.OPERRATE_SET:
                state = data["state"]
                if state:
                    trade_manager.TradeStateManager().open_buy()
                else:
                    trade_manager.TradeStateManager().close_buy()
                self.send_response({"code": 0, "msg": ("开启成功" if state else "关闭成功")}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_GET:
                can_buy = trade_manager.TradeStateManager().is_can_buy_cache()
                self.send_response({"code": 0, "data": {"can_buy": can_buy}}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 交易模式
    def OnTradeMode(self, client_id, request_id, data):
        try:
            operate = data["operate"]
            if operate == outside_api_command_manager.OPERRATE_SET:
                mode = data["mode"]
                TradeTargetCodeModeManager().set_mode(mode)
                self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_GET:
                mode = TradeTargetCodeModeManager().get_mode_cache()
                self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnSellRule(self, client_id, request_id, data):
        try:
            operate = data["operate"]
            if operate == outside_api_command_manager.OPERRATE_ADD:
                data = data["data"]
                code = data["code"]
                type = data["type"]
                buy1_price = data.get("buy1_price")
                if not buy1_price:
                    buy1_price = gpcode_manager.get_limit_up_price(code)
                    if not buy1_price:
                        raise Exception("尚未获取到涨停价")
                rule = SellRule(code=data["code"], buy1_volume=data["buy1_volume"], buy1_price=buy1_price,
                                sell_volume=data.get("sell_volume"), sell_price_type=data.get("sell_price_type"),
                                end_time=data["end_time"], type=type)
                TradeRuleManager().add_rule(rule)
                self.send_response({"code": 0, "data": {}}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_SET:
                data = data["data"]
                code = data["code"]
                buy1_price = data.get("buy1_price")
                if not buy1_price:
                    buy1_price = gpcode_manager.get_limit_up_price(code)
                    if not buy1_price:
                        raise Exception("尚未获取到涨停价")
                rule = SellRule(id_=data["id"], code=data["code"], buy1_volume=data["buy1_volume"],
                                buy1_price=buy1_price,
                                sell_volume=data.get("sell_volume"), sell_price_type=data.get("sell_price_type"),
                                end_time=data["end_time"])
                TradeRuleManager().update_rule(rule)
                self.send_response({"code": 0, "data": {}}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_DELETE:
                data = data["data"]
                TradeRuleManager().del_rule(data["id"])
                self.send_response({"code": 0, "data": {}}, client_id, request_id)
            elif operate == outside_api_command_manager.OPERRATE_GET:
                rules = TradeRuleManager().list_rules()
                fresults = []
                for rule in rules:
                    fresults.append(rule.to_dict())
                self.send_response({"code": 0, "data": fresults}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
        pass
    # 代码名单
    def OnCodeList(self, client_id, request_id, data):
        try:
            code_list_type = data["code_list_type"]
            operate = data["operate"]
            code = data.get("code")
            fresult = {"code": 0}
            if code_list_type == outside_api_command_manager.CODE_LIST_WANT:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.WantBuyCodesManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.WantBuyCodesManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_BLACK:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    # 先手动撤单
                    try:
                        l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动拉黑")
                    except Exception as e:
                        logger_debug.exception(e)
                    l2_trade_util.forbidden_trade(code, msg="手动加入 trade_server")
                    WantBuyCodesManager().remove_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    l2_trade_util.remove_from_forbidden_trade_codes(code)
                    WantBuyCodesManager().add_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.BlackListCodeManager().list_codes_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_WHITE:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.WhiteListCodeManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.WhiteListCodeManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.WhiteListCodeManager().list_codes_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_PAUSE_BUY:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.PauseBuyCodesManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.PauseBuyCodesManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.PauseBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_MUST_BUY:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    gpcode_manager.MustBuyCodesManager().add_code(code)
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
                        if results:
                            gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                elif operate == outside_api_command_manager.OPERRATE_DELETE:
                    gpcode_manager.MustBuyCodesManager().remove_code(code)
                elif operate == outside_api_command_manager.OPERRATE_GET:
                    codes = gpcode_manager.MustBuyCodesManager().list_code_cache()
                    datas = []
                    for code in codes:
                        name = gpcode_manager.get_code_name(code)
                        datas.append(f"{name}:{code}")
                    fresult = {"code": 0, "data": datas}
            self.send_response(fresult, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnExportL2(self, client_id, request_id, data):
        try:
            code = data["code"]
            excel_file_name = data_export_util.export_l2_excel(code)
            # print("导出L2数据目录:", excel_file_name)
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnEveryDayInit(self, client_id, request_id, data):
        try:
            inited_data.everyday_init()
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnRefreshTradeData(self, client_id, request_id, data):
        try:
            sync_type = data["ctype"]
            if sync_type == "delegate_list":
                huaxin_trade_data_update.add_delegate_list("API主动请求")
            elif sync_type == "deal_list":
                huaxin_trade_data_update.add_deal_list()
            elif sync_type == "money":
                huaxin_trade_data_update.add_money_list()
            elif sync_type == "position_list":
                huaxin_trade_data_update.add_position_list()
            self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetCodeAttribute(self, client_id, request_id, data):
        try:
            code = data["code"]
            # 查询是否想买单/白名单/黑名单/暂不买
            code_name = gpcode_manager.get_code_name(code)
            want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
            white = gpcode_manager.WhiteListCodeManager().is_in_cache(code)
            black = l2_trade_util.is_in_forbidden_trade_codes(code)
            pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
            desc_list = []
            if want:
                desc_list.append("【想买单】")
            if white:
                desc_list.append("【白名单】")
            if black:
                desc_list.append("【黑名单】")
            if pause_buy:
                desc_list.append("【暂不买】")
            result = {"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetCodeTradeState(self, client_id, request_id, data):
        try:
            code = data["code"]
            state = trade_manager.CodesTradeStateManager().get_trade_state(code)
            result = {"code": 0, "data": {"state": state}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    def OnGetEnvInfo(self, client_id, request_id, data):
        try:
            fdata = {}
            try:
                date = JueJinApi.get_previous_trading_date(tool.get_now_date_str())
                if date:
                    fdata["juejin"] = 1
            except Exception as e:
                fdata["juejin"] = 0
            fdata["kpl"] = {}
            # 获取开盘啦数据
            kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value,
                         KPLDataType.INDUSTRY_RANK.value]
            for kpl_type in kpl_types:
                if kpl_type in KPLDataManager.kpl_data_update_info:
                    fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type)
            try:
                # 验证redis
                RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test")
                fdata["redis"] = 1
            except:
                fdata["redis"] = 0
            try:
                # 验证mysql
                mysql_data.Mysqldb().select_one("select 1")
                fdata["mysql"] = 1
            except:
                fdata["mysql"] = 0
            try:
                # redis异步任务数量
                fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count()
            except:
                pass
            # 获取交易通道
            try:
                can_access = huaxin_trade_api.test_trade_channel()
                fdata["trade_channel_access"] = 1 if can_access else 0
            except Exception as e:
                logger_debug.exception(e)
                fdata["trade_channel_access"] = 0
            # 获取CPU与内存适用情况
            memory_info = psutil.virtual_memory()
            cpu_percent = psutil.cpu_percent(interval=1)
            fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent}
            logger_device.info(fdata["device"])
            # 获取今日自由流通量的更新
            try:
                count = ZYLTGBUtil.count_today_updated_volume_codes()
                fdata["today_zylt_updated_count"] = count
            except Exception as e:
                logger_debug.exception(e)
                fdata["today_zylt_updated_count"] = -1
            # 获取交易通道
            result = {"code": 0, "data": fdata, "msg": ""}
            # print("OnGetEnvInfo 成功")
            self.send_response(result, client_id, request_id)
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 同步L2订阅代码
    def OnSyncL2SubscriptCodes(self, client_id, request_id):
        logger_debug.debug("OnSyncL2SubscriptCodes")
        try:
            codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes()
            if codes_sh and codes_sz:
                l1_subscript_codes_manager.save_codes(codes_sh, codes_sz)
            result = {"code": 0, "data": {"codes_sh": len(codes_sh), "codes_sz": len(codes_sz)}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            logger_debug.error(e)
    def OnSystemLog(self, client_id, request_id, data):
        try:
            start_index = data["start_index"]
            count = data["count"]
            # 读取系统日志
            logs_data = log_export.load_system_log()
            total_count = len(logs_data)
            if start_index >= 0:
                logs_data = logs_data[start_index:start_index + count]
            result = {"code": 0, "data": {"total_count": total_count, "list": logs_data}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            logger_debug.error(e)
    def OnGetFromDataServer(self, client_id, request_id, data):
        path = data["path"]
        params = data["params"]
        params_strs = []
        if params:
            for k in params:
                params_strs.append(f"{k}={params[k]}")
        if params_strs:
            path += "?"
            path += "&".join(params_strs)
        try:
            # 获取参数
            response = requests.get(f"http://127.0.0.1:9004{path}")
            if response.status_code == 200:
                self.send_response(response.text, client_id, request_id)
            else:
                self.send_response(json.dumps({"code": 1, "msg": f"网络请求状态错误:{response.status_code}"}), client_id,
                                   request_id)
        except:
            self.send_response(json.dumps({"code": 1, "msg": "网络请求出错"}), client_id, request_id)
    # 代码的交易信息
    def OnGetCodeTradeInfo(self, client_id, request_id, data):
        try:
            code = data["code"]
            # 获取交易信息,
            # 获取正在成交的位置/获取下单位置/获取成交速率
            total_datas = l2_data_util.local_today_datas.get(code)
            if total_datas is None:
                total_datas = []
            trade_index, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code)
            trade_speed = transaction_progress.TradeBuyQueue().get_trade_speed(code)
            # 下单位置
            place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code)
            fdata = {}
            if trade_index and place_order_index:
                # 有成交进度位与下单位
                total_count = 0
                total_money = 0
                big_money_300_indexs = []
                big_money_200_indexs = []
                for i in range(trade_index, place_order_index):
                    data = total_datas[i]
                    val = data["val"]
                    if not L2DataUtil.is_limit_up_price_buy(val):
                        continue
                    # 是否已经撤单
                    left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                             total_datas,
                                                                                                             l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                                 code))
                    if left_count <= 0:
                        continue
                    total_count += left_count
                    money = val["num"] * int(val["price"] * 100)
                    total_money += money
                    if money >= 300 * 10000:
                        big_money_300_indexs.append(i)
                    elif money >= 200 * 10000:
                        big_money_200_indexs.append(i)
                fdata["waiting_for_trade"] = f"{total_count}笔&{output_util.money_desc(total_money)}"
                total_count = 0
                total_money = 0
                for i in big_money_300_indexs:
                    data = total_datas[i]
                    val = data["val"]
                    total_count += 1
                    money = val["num"] * int(val["price"] * 100)
                    total_money += money
                fdata["big_num_300"] = {"desc": f"{total_count}笔&{output_util.money_desc(total_money)}",
                                        "datas": [output_util.format_l2_data(total_datas[x]) for x in
                                                  big_money_300_indexs]}
                total_count = 0
                total_money = 0
                for i in big_money_200_indexs:
                    data = total_datas[i]
                    val = data["val"]
                    total_count += 1
                    money = val["num"] * int(val["price"] * 100)
                    total_money += money
                fdata["big_num_200"] = {"desc": f"{total_count}笔&{output_util.money_desc(total_money)}",
                                        "datas": [output_util.format_l2_data(total_datas[x]) for x in
                                                  big_money_200_indexs]}
                if trade_speed:
                    seconds = int(total_money / trade_speed)
                    h = seconds // 3600
                    m = seconds % 3600 // 60
                    s = seconds % 60
                    fdata["trade_speed"] = f"{trade_speed}元/秒"
                    fdata["trade_use_time"] = "{:0>2d}:{:0>2d}:{:0>2d}".format(h, m, s)
                    fdata["trade_time"] = tool.trade_time_add_second(tool.get_now_time_str(), seconds)
            result = {"code": 0, "data": fdata}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id)
    def OnGetActiveListenCount(self, client_id, request_id):
        try:
            order = 0  # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_ORDER)
            transaction = 0  # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_TRANSACTION)
            market = 0  # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_MARKET)
            result = {"code": 0, "data": {"order": order, "transaction": transaction, "market": market}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id)
    def OnSaveRunningData(self, client_id, request_id):
        try:
            inited_data.save_running_data()
        except Exception as e:
            logging.exception(e)
            self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id)
    def OnGetCodePositionInfo(self, client_id, request_id, data):
        code = data.get("code")
        __start_time = time.time()
        try:
            if not tool.is_can_buy_code(code):
                raise Exception("非主板代码")
            # 获取代码基本信息
            # 查询是否想买单/白名单/黑名单/暂不买
            code_name = gpcode_manager.get_code_name(code)
            want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
            white = gpcode_manager.WhiteListCodeManager().is_in_cache(code)
            black = l2_trade_util.is_in_forbidden_trade_codes(code)
            pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
            desc_list = []
            if want:
                desc_list.append("【想买单】")
            if white:
                desc_list.append("【白名单】")
            if black:
                desc_list.append("【黑名单】")
            if pause_buy:
                desc_list.append("【暂不买】")
            # 获取持仓
            positions = PositionManager.latest_positions
            trade_rules_count = len(TradeRuleManager().list_can_excut_rules_cache())
            fdata = {"code": code, "total": 0, "available": 0, "sell_orders": [], "sell_rules_count": trade_rules_count,
                     "cost_price": 0, "cost_price_rate": 0,
                     "code_info": (code, code_name), "desc": "".join(desc_list)}
            if positions:
                for d in positions:
                    code_name = gpcode_manager.get_code_name(d["securityID"])
                    if not code_name:
                        # 判断是否有名称
                        results = HistoryKDatasUtils.get_gp_codes_names([d["securityID"]])
                        threading.Thread(
                            target=CodesNameManager.add_first_code_name(d["securityID"],
                                                                        results[d["securityID"]])).start()
                    if d["prePosition"] <= 0:
                        continue
                    if d["securityID"] != code:
                        continue
                    fdata["total"] = d["prePosition"]
                    fdata["available"] = d["availablePosition"]
                    fdata["cost_price"] = round(float(d["historyPosPrice"]), 2)
                    deal_order_system_id_infos = {}
                    # 获取已经卖的单数
                    deal_list = self.__DealRecordManager.list_sell_by_code_cache(code)
                    if deal_list:
                        for d in deal_list:
                            if d["orderSysID"] not in deal_order_system_id_infos:
                                deal_order_system_id_infos[d["orderSysID"]] = [d["volume"], d["tradeTime"]]
                            else:
                                deal_order_system_id_infos[d["orderSysID"]][0] += d["volume"]
                    # 获取9:30之前的卖委托
                    current_delegates = DelegateRecordManager().list_current_delegates(code)
                    if current_delegates:
                        for d in current_delegates:
                            if d["orderSysID"] not in deal_order_system_id_infos:
                                deal_order_system_id_infos[d["orderSysID"]] = [d["volume"], d["insertTime"]]
                    deal_list = [deal_order_system_id_infos[k] for k in deal_order_system_id_infos]
                    deal_list.sort(key=lambda x: x[1])
                    # TODO 测试
                    # deal_list.clear()
                    # fdata["available"] = fdata["total"]
                    fdata["sell_orders"] = [k[0] for k in deal_list]
                    break
            # 有现价就获取现价
            current_price = L1DataManager.get_l1_current_price(code)
            if current_price:
                fdata["cost_price"] = current_price
                pre_close_price = CodePrePriceManager.get_price_pre_cache(code)
                if current_price and pre_close_price:
                    rate = round((float(current_price) - float(pre_close_price)) / float(pre_close_price), 4)
                    fdata["cost_price_rate"] = rate
                # 获取涨幅
            async_log_util.info(logger_trade_position_api_request, f"{fdata}")
            result = {"code": 0, "data": fdata}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response({"code": 1, "msg": f"数据处理出错:{e}"}, client_id, request_id)
        finally:
            use_time = time.time() - __start_time
            if use_time > 0.01:
                # 耗时10ms以上才记录日志
                async_log_util.info(logger_trade_position_api_request, f"{code}请求持仓耗时:{use_time * 1000}ms")
    def OnCommonRequest(self, client_id, request_id, data):
        # 通用请求
        ctype = data["ctype"]
        __start_time = time.time()
        try:
            if ctype == "get_sell_result":
                order_ref = data["order_ref"]
                order_entity = huaxin_trade_order_processor.TradeResultProcessor.get_huaxin_order_by_order_ref(
                    order_ref)
                result = {}
                if not order_entity:
                    result = {"code": 1, "msg": f"没有获取到订单状态"}
                else:
                    code_name = gpcode_manager.get_code_name(order_entity.code)
                    result = {}
                    if huaxin_util.is_canceled(order_entity.orderStatus):
                        result = {"code": 0,
                                  "data": {"orderStatus": order_entity.orderStatus, "code": order_entity.code,
                                           "msg": f"【{order_entity.code}({code_name})】已撤单"}}
                    elif huaxin_util.is_deal(order_entity.orderStatus):
                        result = {"code": 0,
                                  "data": {"orderStatus": order_entity.orderStatus, "code": order_entity.code,
                                           "msg": f"【{order_entity.code}({code_name})】已经成交"}}
                    else:
                        result = {"code": 0,
                                  "data": {"orderStatus": order_entity.orderStatus, "code": order_entity.code,
                                           "msg": f"【{order_entity.code}({code_name})】已挂单"}}
                self.send_response(result, client_id, request_id)
            elif ctype == "get_position_codes":
                # 获取今日可卖的持仓代码
                codes = PositionManager.get_position_codes()
                result = {"code": 0,
                          "data": codes}
                self.send_response(result, client_id, request_id)
            elif ctype == "market_situation":
                try:
                    operate = data["operate"]
                    if operate == outside_api_command_manager.OPERRATE_SET:
                        situation = data["situation"]
                        MarketSituationManager().set_situation(situation)
                        self.send_response({"code": 0, "data": {"situation": situation}}, client_id, request_id)
                    elif operate == outside_api_command_manager.OPERRATE_GET:
                        situation = MarketSituationManager().get_situation_cache()
                        self.send_response({"code": 0, "data": {"situation": situation}}, client_id, request_id)
                except Exception as e:
                    self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
            elif ctype == "get_kpl_limit_up_datas":
                # 获取开盘啦涨停队列
                try:
                    datas = kpl_data_manager.KPLDataManager.get_from_file(kpl_util.KPLDataType.LIMIT_UP,
                                                                          tool.get_now_date_str())
                    self.send_response({"code": 0, "data": datas}, client_id, request_id)
                except Exception as e:
                    self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
            elif ctype == "get_delegated_buy_code_infos":
                account_available_money = trade_manager.AccountAvailableMoneyManager().get_available_money_cache()
                # 获取委托中的代码
                # current_delegates = huaxin_trade_record_manager.DelegateRecordManager().list_current_delegates()
                current_delegates, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
                    tool.get_now_date_str("%Y%m%d"), None,
                    [huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded])
                fdatas = []
                if current_delegates:
                    for c in current_delegates:
                        try:
                            if int(c["direction"]) != huaxin_util.TORA_TSTP_D_Buy:
                                continue
                            code = c["securityID"]
                            orderSysID = c.get("orderSysID")
                            code_name = gpcode_manager.get_code_name(code)
                            # 获取下单位置信息
                            order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code)
                            if order_begin_pos is None or order_begin_pos.buy_single_index is None:
                                continue
                            l2_data_util.load_l2_data(code)
                            total_datas = l2_data_util.local_today_datas.get(code)
                            trade_index, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code)
                            if trade_index is None:
                                trade_index = 0
                            # 下单位置
                            place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code)
                            # 计算信号位置到真实下单位置的总买(不管是否已撤)
                            total_nums = 0
                            for i in range(order_begin_pos.buy_single_index, place_order_index):
                                data = total_datas[i]
                                val = data["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                total_nums += val["num"]
                            # 计算已成交/已撤单的数量
                            deal_or_cancel_num = 0
                            for i in range(order_begin_pos.buy_single_index, trade_index + 1):
                                data = total_datas[i]
                                val = data["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                deal_or_cancel_num += val["num"]
                            # 获取剩下的笔数
                            total_left_count = 0
                            total_left_num = 0
                            for i in range(trade_index + 1, place_order_index):
                                data = total_datas[i]
                                val = data["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                if val["num"] * float(val["price"]) < 5000:
                                    continue
                                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
                                    code,
                                    i,
                                    total_datas,
                                    l2_data_util.local_today_canceled_buyno_map.get(
                                        code))
                                if left_count > 0:
                                    total_left_count += left_count
                                    total_left_num += val["num"] * left_count
                            # 获取正在成交
                            dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code)
                            if dealing_info:
                                if str(total_datas[trade_index]["val"]["orderNo"]) == str(dealing_info[0]):
                                    total_left_num += (total_datas[trade_index]["val"]["num"] - dealing_info[1] // 100)
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            buy1_money = Buy1PriceManager().get_latest_buy1_money(code)
                            if buy1_money is None:
                                buy1_money = 0
                            # 获取已经成交的大单数量
                            total_big_num = 0
                            total_big_count = 0
                            is_ge_code = tool.is_ge_code(code)
                            for i in range(0, trade_index):
                                val = total_datas[i]["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                # 是不是大单
                                if not l2_data_util_old.is_big_money(val, is_ge_code):
                                    continue
                                canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(
                                    code,
                                    i,
                                    total_datas,
                                    l2_data_util.local_today_canceled_buyno_map.get(
                                        code))
                                if not canceled_data:
                                    total_big_count += 1
                                else:
                                    total_big_num -= canceled_data["val"]["num"]
                                total_big_num += val["num"]
                            not_deal_total_big_num_pre = 0
                            not_deal_total_big_count_pre = 0
                            not_deal_total_big_num_after = 0
                            not_deal_total_big_count_after = 0
                            is_ge_code = tool.is_ge_code(code)
                            for i in range(trade_index, total_datas[-1]["index"] + 1):
                                val = total_datas[i]["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                # 是不是大单
                                if not l2_data_util_old.is_big_money(val, is_ge_code):
                                    continue
                                canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(
                                    code,
                                    i,
                                    total_datas,
                                    l2_data_util.local_today_canceled_buyno_map.get(
                                        code))
                                if not canceled_data:
                                    if i < place_order_index:
                                        not_deal_total_big_count_pre += 1
                                    else:
                                        not_deal_total_big_count_after += 1
                                else:
                                    if i < place_order_index:
                                        not_deal_total_big_num_pre -= canceled_data["val"]["num"]
                                    else:
                                        not_deal_total_big_num_after -= canceled_data["val"]["num"]
                                if i < place_order_index:
                                    not_deal_total_big_num_pre += val["num"]
                                else:
                                    not_deal_total_big_num_after += val["num"]
                            real_place_order_after_count = 0
                            real_place_order_after_num = 0
                            is_ge_code = tool.is_ge_code(code)
                            # 统计真实下单位置后面未撤的金额
                            for i in range(place_order_index, total_datas[-1]["index"]):
                                val = total_datas[i]["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                # 是不是大单
                                if not l2_data_util_old.is_big_money(val, is_ge_code):
                                    continue
                                canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(
                                    code,
                                    i,
                                    total_datas,
                                    l2_data_util.local_today_canceled_buyno_map.get(
                                        code))
                                if not canceled_data:
                                    real_place_order_after_count += 1
                                    real_place_order_after_num += val["num"]
                            # 获取当日的量比
                            volume_rate = code_volumn_manager.get_volume_rate(code)
                            # 是否需要注意
                            need_pay_attention = (total_left_count <= 10 or total_left_num * float(
                                limit_up_price) * 100 < 1500 * 10000) and (
                                                         real_place_order_after_count <= 10 or real_place_order_after_num * float(
                                                     limit_up_price) * 100 < 1500 * 10000)
                            # 统计真实下单位是否距离大单位置过近
                            is_near_big_order = False
                            try:
                                count = 0
                                for i in range(place_order_index - 1, -1, -1):
                                    data = total_datas[i]
                                    val = data["val"]
                                    if not L2DataUtil.is_limit_up_price_buy(val):
                                        continue
                                    money = val["num"] * float(val["price"])
                                    if money < 50 * 100:
                                        continue
                                    left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
                                        code,
                                        i,
                                        total_datas,
                                        l2_data_util.local_today_canceled_buyno_map.get(
                                            code))
                                    if left_count <= 0:
                                        continue
                                    if money >= 299 * 100:
                                        if count < 1:
                                            is_near_big_order = True
                                    else:
                                        count += 1
                                        if count >= 1:
                                            break
                            except:
                                pass
                            fdata = {"id": orderSysID, "code_info": (code, code_name), "total_num": total_nums,
                                     "finish_num": deal_or_cancel_num,
                                     "buy1_money": output_util.money_desc(buy1_money),
                                     "big_num_count": total_big_count,
                                     "big_num_money": output_util.money_desc(
                                         total_big_num * float(limit_up_price) * 100),
                                     "not_deal_big_num_count": (
                                         not_deal_total_big_count_pre, not_deal_total_big_count_after),
                                     "not_deal_big_num_money": (output_util.money_desc(
                                         not_deal_total_big_num_pre * float(limit_up_price) * 100),
                                                                output_util.money_desc(
                                                                    not_deal_total_big_num_after * float(
                                                                        limit_up_price) * 100)),
                                     "left_count": total_left_count,
                                     "volume_rate": volume_rate,
                                     "left_money": output_util.money_desc(total_left_num * float(limit_up_price) * 100),
                                     "pay_attention": need_pay_attention,
                                     "trade_progress_percent": round(
                                         total_left_num * float(limit_up_price) * 100 * 100 / buy1_money, 2),  # 成交进度比例
                                     "limit_up_price": float(gpcode_manager.get_limit_up_price(code)),
                                     "is_near_big_order": is_near_big_order,
                                     "block": '',
                                     "trade_queue": []
                                     }
                            limit_up_data = kpl_data_manager.KPLLimitUpDataRecordManager.record_code_dict.get(code)
                            # 获取当前板块
                            try:
                                can_buy_result = CodePlateKeyBuyManager.can_buy(code)
                                if can_buy_result:
                                    if can_buy_result[0]:
                                        fdata['block'] = ",".join(
                                            [f"{x[0]}-{x[1] + 1}({x[2]}&{x[3] - x[2]})" for x in can_buy_result[0]])
                                    else:
                                        if can_buy_result[1]:
                                            if limit_up_data:
                                                fdata['block'] = f"{limit_up_data[5]}-独苗"
                                            else:
                                                fdata['block'] = f"独苗"
                            except:
                                pass
                            # 获取涨停时间
                            if limit_up_data:
                                fdata['limit_up_time'] = tool.to_time_str(limit_up_data[2])
                                # 获取委托队列
                            try:
                                real_place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code)
                                if real_place_order_index is not None:
                                    trade_queue = l2_output_util.get_trade_queue_at_near_place_order(code,
                                                                                                     real_place_order_index,
                                                                                                     9)
                                    fdata['trade_queue'] = trade_queue
                                # 自由流通股本
                                zyltgb = global_util.zyltgb_map.get(code)
                                if zyltgb is not None:
                                    fdata['zyltgb'] = output_util.money_desc(zyltgb)
                            except:
                                pass
                            fdatas.append(fdata)
                        except Exception as e:
                            logger_debug.exception(e)
                result = {"code": 0, "data": {"account_available_money": account_available_money, "delegates": fdatas}}
                self.send_response(result, client_id, request_id)
            elif ctype == "set_real_place_order_index":
                # 设置真实下单位置
                code = data["code"]
                real_order_index = data["index"]
                order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code)
                if order_begin_pos is None or order_begin_pos.buy_exec_index is None or order_begin_pos.buy_exec_index < 0:
                    raise Exception("尚未下单")
                cancel_buy_strategy.set_real_place_position(code, real_order_index,
                                                            buy_single_index=order_begin_pos.buy_single_index,
                                                            is_default=False)
                # 更新日志
                async_log_util.info(logger_real_place_order_position, f"真实下单位置(矫正):{code}-{real_order_index}")
                result = {"code": 0, "data": {}}
                self.send_response(result, client_id, request_id)
            elif ctype == "get_positions":
                # 获取所有持仓信息
                positions = PositionManager.latest_positions
                fdatas = []
                if positions:
                    for d in positions:
                        code_ = d["securityID"]
                        code_name = gpcode_manager.get_code_name(d["securityID"])
                        if not code_name:
                            # 判断是否有名称
                            results = HistoryKDatasUtils.get_gp_codes_names([code_])
                            threading.Thread(
                                target=CodesNameManager.add_first_code_name(code_,
                                                                            results[code_])).start()
                        if d["prePosition"] <= 0:
                            continue
                        fdatas.append({"code": code_, "code_name": code_name, "total": d["prePosition"],
                                       "available": d["availablePosition"]})
                result = {"code": 0, "data": fdatas}
                self.send_response(result, client_id, request_id)
            elif ctype == "set_code_sell_way":
                # 设置卖出方式
                # mode : 1-均分  2-百分比
                sell_manager.set_code_sell_way(data)
                result = {"code": 0, "data": {}}
                self.send_response(result, client_id, request_id)
            elif ctype == "get_buy1_info":
                # 获取代码的买1信息
                code = data["code"]
                results = HistoryKDatasUtils.get_gp_current_info([code])
                item = results[0]["quotes"][0]
                result = {"code": 0, "data": {"price": item["bid_p"], "volume": item["bid_v"]}}
                self.send_response(result, client_id, request_id)
            elif ctype == "auto_cancel_sell_mode":
                try:
                    operate = data["operate"]
                    if operate == outside_api_command_manager.OPERRATE_SET:
                        mode = data["mode"]
                        AutoCancelSellModeManager().set_mode(mode)
                        self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
                    elif operate == outside_api_command_manager.OPERRATE_GET:
                        sell_mode = AutoCancelSellModeManager().get_mode()
                        self.send_response({"code": 0, "data": {"mode": sell_mode}}, client_id, request_id)
                except Exception as e:
                    self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
            elif ctype == "set_per_code_buy_money":
                # 设置单只票的买入金额
                money = data["money"]
                if money > 30000:
                    raise Exception("最多只能设置3w")
                constant.BUY_MONEY_PER_CODE = money
                self.send_response({"code": 0, "data": {"money": constant.BUY_MONEY_PER_CODE}}, client_id, request_id)
            elif ctype == "get_per_code_buy_money":
                self.send_response({"code": 0, "data": {"money": constant.BUY_MONEY_PER_CODE}}, client_id, request_id)
            elif ctype == "repaire_task":
                # 修复任务
                kpl_data_manager.PullTask.repaire_pull_task()
                self.send_response({"code": 0, "data": {}}, client_id, request_id)
            elif ctype == "get_trade_queue":
                code = data["code"]
                count = data.get("count")
                if count is None:
                    count = 100
                real_place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code)
                trade_queue = l2_output_util.get_trade_queue(code, real_place_order_index, count)
                self.send_response({"code": 0, "data": trade_queue}, client_id, request_id)
            elif ctype == "get_deal_big_money_list":
                # 获取大单成交列表
                code = data["code"]
                data_list = BigOrderDealManager().get_total_buy_money_list(code)
                bigger_money = l2_data_util_old.get_big_money_val(float(gpcode_manager.get_limit_up_price(code)),
                                                                  tool.is_ge_code(code))
                fdatas = []
                for d in data_list:
                    if d < bigger_money:
                        continue
                    fdatas.append(d)
                results = [output_util.money_desc(d) for d in fdatas]
                self.send_response({"code": 0, "data": results}, client_id, request_id)
            elif ctype == "refresh_zylt_volume":
                update_count = zyltgb_util.update_all_zylt_volumes()
                self.send_response({"code": 0, "data": {}, "msg": f"更新代码数量:{update_count}"}, client_id, request_id)
            elif ctype == "get_today_updated_zylt_volume_count":
                # 获取今日已经更新的自由流通量的代码数量
                count = ZYLTGBUtil.count_today_updated_volume_codes()
                self.send_response({"code": 0, "data": {"count": count}}, client_id, request_id)
            elif ctype == "get_history_bars_codes_count":
                # 获取历史K线更新的代码数量
                dates = HistoryKDatasUtils.get_latest_trading_date_cache(5)
                latest_trading_date = None
                if dates:
                    latest_trading_date = dates[0]
                if latest_trading_date is None:
                    raise Exception("没有获取到上一个交易日的日期")
                codes = HistoryKDataManager().get_history_bars_codes(latest_trading_date)
                self.send_response({"code": 0, "data": {"count": len(codes)}}, client_id, request_id)
            # 更新代码的K线
            elif ctype == "update_history_k_bars":
                # 更新历史K线
                count = history_k_data_manager.update_history_k_bars()
                self.send_response({"code": 0, "data": {"count": count}}, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response({"code": 1, "msg": f"数据处理出错:{e}"}, client_id, request_id)
        finally:
            use_time = time.time() - __start_time
            if use_time > 5:
                logger_request_api.info(f"common_request请求时间过长,ctype-{ctype}")