Administrator
2023-08-30 c265bb186a6b6e2a31689d599be59b5b6f10cf42
将交易合并进策略进程
1个文件已添加
7个文件已修改
413 ■■■■■ 已修改文件
huaxin_client/l2_client.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_transform_protocol.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_transform_protocol.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 260 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -587,9 +587,8 @@
    global l2_data_callback
    l2_data_callback = _l2_data_callback
    l2_data_manager.run_upload_common()
    l2_data_manager.run_upload_trading_canceled()
    l2_data_manager.run_upload_common(l2_data_callback)
    l2_data_manager.run_upload_trading_canceled(l2_data_callback)
    l2_data_manager.run_log()
    # l2_data_manager.run_test(l2_data_callback)
    global l2CommandManager
@@ -601,6 +600,6 @@
if __name__ == "__main__":
    run(None, None)
    run(None, None, None)
    # spi.set_codes_data([("000333", 12000)])
    input()
huaxin_client/l2_data_manager.py
@@ -180,7 +180,7 @@
                if udatas:
                    start_time = time.time()
                    # upload_data(code, "l2_order", udatas)
                    l2_data_callback.OnL2Order(code,  udatas, int(time.time() * 1000))
                    l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000))
                    # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
                    use_time = int((time.time() - start_time) * 1000)
                    if use_time > 20:
@@ -194,7 +194,7 @@
                pass
def __run_upload_trans(code):
def __run_upload_trans(code, l2_data_callback: L2DataCallBack):
    if code not in tmep_transaction_queue_dict:
        tmep_transaction_queue_dict[code] = queue.Queue()
    while True:
@@ -208,33 +208,38 @@
                temp = tmep_transaction_queue_dict[code].get()
                udatas.append(temp)
            if udatas:
                upload_data(code, "l2_trans", udatas)
                # upload_data(code, "l2_trans", udatas)
                l2_data_callback.OnL2Transaction(code, udatas)
            time.sleep(0.01)
        except Exception as e:
            logger_local_huaxin_l2_error.error(f"上传成交数据出错:{str(e)}")
def __run_upload_common():
def __run_upload_common(l2_data_callback: L2DataCallBack):
    print("__run_upload_common")
    while True:
        try:
            while not common_queue.empty():
                temp = common_queue.get()
                upload_data(temp[0], temp[1], temp[2])
                if temp[1] == "l2_market_data":
                    l2_data_callback.OnMarketData(temp[0], temp[2])
                else:
                    upload_data(temp[0], temp[1], temp[2])
            time.sleep(0.01)
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
            logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
def __run_upload_trading_canceled():
def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
    print("__run_upload_trading_canceled")
    while True:
        try:
            temp = trading_canceled_queue.get()
            if temp:
                logger_local_huaxin_g_cancel.info(f"准备上报:{temp}")
                upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
                # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
                l2_data_callback.OnTradingOrderCancel(temp[0], temp[1])
                logger_local_huaxin_g_cancel.info(f"上报成功:{temp}")
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
@@ -263,17 +268,17 @@
        t.start()
    if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_trans(code), daemon=True)
        t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
        t.start()
def run_upload_common():
    t = threading.Thread(target=lambda: __run_upload_common(), daemon=True)
def run_upload_common(l2_data_callback: L2DataCallBack):
    t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True)
    t.start()
def run_upload_trading_canceled():
    t = threading.Thread(target=lambda: __run_upload_trading_canceled(), daemon=True)
def run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
    t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True)
    t.start()
huaxin_client/l2_data_transform_protocol.py
@@ -8,8 +8,11 @@
    def OnL2Order(self, code, datas, timestamp):
        pass
    def OnL2Transaction(self, code, datas, timestamp):
    def OnL2Transaction(self, code, datas):
        pass
    def OnMarketData(self, code, datas, timestamp):
    def OnMarketData(self, code, datas):
        pass
    def OnTradingOrderCancel(self, code, buy_no):
        pass
huaxin_client/trade_client.py
@@ -13,6 +13,7 @@
from huaxin_client.log import logger
# 正式账号
from huaxin_client.trade_transform_protocol import TradeRequest, TradeResponse
from log_module.log import logger_local_huaxin_trade_debug, logger_system
UserID = '388000013349'
@@ -990,20 +991,24 @@
            logger_local_huaxin_trade_debug.info("API回调 request_id-{}", request_id)
            # 测试
            send_response(
                json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                            "request_id": request_id}), type, client_id, request_id, temp_params[2])
            # send_response(
            #     json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
            #                 "request_id": request_id}), type, client_id, request_id, temp_params[2])
            trade_response.OnTradeResponse({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, "request_id": request_id})
            logger_local_huaxin_trade_debug.info("API回调结束 req_id-{} request_id-{}", req_id, request_id)
            print("API回调结束")
        else:
            logger_local_huaxin_trade_debug.info("非API回调 req_id-{}", req_id)
            print("非API回调")
            # 非API回调
            send_response(
                json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}),
                type,
                None,
                req_id)
            trade_response.OnTradeCallback({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}})
            # # 非API回调
            # send_response(
            #     json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}),
            #     type,
            #     None,
            #     req_id)
            print("非API结束")
    except Exception as e:
        logging.exception(e)
@@ -1018,7 +1023,11 @@
addr, port = constant.SERVER_IP, constant.SERVER_PORT
def run(pipe_l2=None, pipe_strategy=None):
def process_cmd(tradeRequest: TradeRequest):
    tradeCommandManager.process_command(tradeRequest.type_, None, tradeRequest.data)
def run(trade_response_: TradeResponse, pipe_l2=None, pipe_strategy=None):
    logger_system.info("交易进程ID:{}", os.getpid())
    __init_trade_data_server()
    global l2pipe
@@ -1027,6 +1036,9 @@
    global strategy_pipe
    strategy_pipe = pipe_strategy
    global trade_response
    trade_response = trade_response_
    t1 = threading.Thread(target=lambda: trade_client_server.run(), daemon=True)
    t1.start()
@@ -1034,7 +1046,8 @@
    tradeCommandManager = command_manager.TradeCommandManager()
    tradeCommandManager.init(MyTradeActionCallback(), l2pipe, pipe_strategy)
    logger_system.info("华鑫交易服务启动")
    tradeCommandManager.run()
    # 不需要运行命令解析
    # tradeCommandManager.run()
    while True:
        time.sleep(2)
huaxin_client/trade_transform_protocol.py
New file
@@ -0,0 +1,13 @@
class TradeRequest:
    def __init__(self, type_, data, request_id):
        self.type_ = type_
        self.data = data
        self.request_id = request_id
class TradeResponse:
    def OnTradeResponse(self, data_json):
        pass
    def OnTradeCallback(self, data_json):
        pass
main.py
@@ -23,7 +23,7 @@
# from huaxin_api import trade_client, l2_client, l1_client
def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2,ptl2_l2,psl2_l2):
def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, ptl2_trade, pst_trade):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -45,8 +45,13 @@
    t1 = threading.Thread(target=huaxin_client.l2_client.run, args=(ptl2_l2, psl2_l2, trade_server.my_l2_data_callback), daemon=True)
    t1.start()
    # 启动华鑫交易服务
    t1 = threading.Thread(target=huaxin_client.trade_client.run, args=(trade_server.my_trade_response, ptl2_trade, pst_trade),
                          daemon=True)
    t1.start()
    # 交易服务
    trade_server.run(pipe_trade, pipe_l1, pipe_l2)
    trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd)
# 主服务
@@ -94,14 +99,10 @@
    logger_system.info("主进程ID:{}", os.getpid())
    tradeServerProcess = multiprocessing.Process(target=createTradeServer,
                                                 args=(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy,ptl2_l2, psl2_l2))
                                                 args=(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy,ptl2_l2, psl2_l2,ptl2_trade,pst_trade))
    tradeServerProcess.start()
    #
    tradeProcess = multiprocessing.Process(target=huaxin_client.trade_client.run, args=(ptl2_trade, pst_trade,))
    tradeProcess.start()
    # L1订阅数据
    huaxin_client.l1_client.run(pl1t_l1)
    # 将tradeServer作为主进程
    tradeServerProcess.join()
    tradeProcess.join()
trade/huaxin/huaxin_trade_api.py
@@ -8,6 +8,7 @@
import time
from huaxin_client import trade_client_server
from huaxin_client.trade_transform_protocol import TradeRequest
from log_module import async_log_util
from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, logger_trade
from trade.huaxin import huaxin_trade_data_update
@@ -63,9 +64,12 @@
# 设置交易通信队列
def run_pipe_trade(pipe_trade_):
def run_pipe_trade(pipe_trade_, trade_cmd_callback_):
    global pipe_trade
    pipe_trade = pipe_trade_
    global trade_cmd_callback
    trade_cmd_callback = trade_cmd_callback_
    t1 = threading.Thread(target=lambda: __run_recv_pipe_trade(), daemon=True)
    t1.start()
@@ -76,7 +80,7 @@
# pipe的交易通道是否正常
def is_pipe_channel_normal():
    return trade_pipe_channel_error_count < 3
    return False
# 测试交易通道
@@ -240,23 +244,15 @@
    if not request_id:
        request_id = __get_request_id(_type)
    try:
        async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{} is_pipe-{}", 0, request_id, is_pipe)
        async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{} is_pipe-{}", 0, request_id,
                            is_pipe)
        root_data = {"type": _type,
                     "data": data,
                     "request_id": request_id}
        root_data = socket_util.encryp_client_params_sign(root_data)
        if not is_pipe:
            sk = socket_util.create_socket("127.0.0.1", trade_client_server.SERVER_PORT)
            sk.sendall(socket_util.load_header(json.dumps(root_data).encode("utf-8")))
            try:
                if blocking:
                    result_data, header_str = socket_util.recv_data(sk)
                    if result_data:
                        data_json = json.loads(result_data)
                        set_response(request_id, data_json["data"])
            finally:
                sk.close()
            trade_cmd_callback(TradeRequest(_type, root_data, request_id))
        else:
            pipe_trade.send(json.dumps(root_data).encode("utf-8"))
        async_log_util.info(hx_logger_trade_loop, "请求发送成功:request_id-{}", request_id)
@@ -315,13 +311,6 @@
                        __TradeOrderIdManager.remove_local_order_id(code, localOrderId)
        except:
            pass
    else:
        # 被动触发
        pass
@@ -346,7 +335,8 @@
                            "local_order_id": local_order_id,
                            "volume": volume,
                            "price_type": price_type,
                            "price": price, "sinfo": sinfo}, request_id=request_id, blocking=blocking, is_pipe=is_pipe_channel_normal())
                            "price": price, "sinfo": sinfo}, request_id=request_id, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
    if blocking:
        try:
            return __read_response(request_id, blocking)
@@ -365,7 +355,8 @@
                            "direction": direction,
                            "code": code,
                            "localOrderID": localOrderID,
                            "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking, is_pipe=is_pipe_channel_normal())
                            "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
    try:
        return __read_response(request_id, blocking)
    finally:
@@ -392,28 +383,32 @@
# 获取成交列表
def get_deal_list(blocking=True):
    request_id = __request(ClientSocketManager.CLIENT_TYPE_DEAL_LIST,
                           {"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST}, blocking=blocking, is_pipe=is_pipe_channel_normal())
                           {"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST}, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
    return __read_response(request_id, blocking)
# 获取持仓列表
def get_position_list(blocking=True):
    request_id = __request(ClientSocketManager.CLIENT_TYPE_POSITION_LIST,
                           {"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST}, blocking=blocking, is_pipe=is_pipe_channel_normal())
                           {"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST}, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
    return __read_response(request_id, blocking)
# 获取账户资金状况
def get_money(blocking=True):
    request_id = __request(ClientSocketManager.CLIENT_TYPE_MONEY,
                           {"type": ClientSocketManager.CLIENT_TYPE_MONEY}, blocking=blocking, is_pipe=is_pipe_channel_normal())
                           {"type": ClientSocketManager.CLIENT_TYPE_MONEY}, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
    return __read_response(request_id, blocking)
# 设置L2订阅数据
def set_l2_codes_data(codes_data, blocking=True):
    request_id = __request(ClientSocketManager.CLIENT_TYPE_CMD_L2,
                           {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data}, blocking=blocking, is_pipe=is_pipe_channel_normal())
                           {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data}, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
    return __read_response(request_id, blocking)
trade/huaxin/trade_server.py
@@ -26,6 +26,7 @@
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol
from huaxin_client.client_network import SendResponseSkManager
from huaxin_client.trade_transform_protocol import TradeResponse
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \
    GCancelBigNumComputer, SecondCancelBigNumComputer
@@ -161,24 +162,13 @@
                    elif data_json["type"] == "response":
                        # 主动触发的响应
                        try:
                            client_id = data_json.get("client_id")
                            hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
                            # 设置响应内容
                            trade_api.set_response(data_json["request_id"], data_json['data'])
                            my_trade_response.OnTradeResponse(data_json)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "trade_callback":
                        try:
                            # 交易回调
                            data_json = data_json["data"]
                            type_ = data_json["type"]
                            # 记录交易反馈日志
                            hx_logger_trade_callback.info(data_json)
                            # 重新请求委托列表与资金
                            huaxin_trade_data_update.add_delegate_list()
                            huaxin_trade_data_update.add_deal_list()
                            huaxin_trade_data_update.add_money_list()
                            # print("响应结果:", data_json['data'])
                            my_trade_response.OnTradeCallback(data_json)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "l2_order":
@@ -197,71 +187,7 @@
                            data = data_json["data"]
                            code = data["code"]
                            datas = data["data"]
                            hx_logger_l2_transaction.info(f"{code}#{datas}")
                            if datas:
                                # 设置成交价
                                current_price_process_manager.set_trade_price(code, datas[-1][1])
                            try:
                                buyno_map = l2_data_util.local_today_buyno_map.get(code)
                                hx_logger_l2_transaction.info(
                                    f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
                                buy_progress_index = None
                                for i in range(len(datas) - 1, -1, -1):
                                    d = datas[i]
                                    buy_no = f"{d[6]}"
                                    if buyno_map and buy_no in buyno_map:
                                        hx_logger_l2_transaction.info(f"{code}成交进度:{buyno_map[buy_no]}")
                                        buy_progress_index = buyno_map[buy_no]["index"]
                                        break
                                # 获取执行位时间
                                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                                    code)
                                if True:
                                    if buy_progress_index is not None:
                                        self.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
                                        total_datas = l2_data_util.local_today_datas.get(code)
                                        num_operate_map = l2_data_util.local_today_num_operate_map.get(
                                            code)
                                        logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code,
                                                                       buy_progress_index)
                                        buy_time = total_datas[buy_progress_index]["val"]["time"]
                                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if buy_exec_index:
                                            need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                                                                                                          buy_progress_index,
                                                                                                          buy_exec_index,
                                                                                                          total_datas,
                                                                                                          num_operate_map,
                                                                                                          num * 100 * float(
                                                                                                              limit_up_price),
                                                                                                          limit_up_price)
                                            if need_cancel:
                                                L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                                        f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time,
                                                                                                         buy_exec_index,
                                                                                                         buy_progress_index,
                                                                                                         total_datas,
                                                                                                         num_operate_map)
                                        f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
                                                                                                      buy_progress_index,
                                                                                                      total_datas)
                                        f3 = dask.delayed(
                                            deal_big_money_manager.DealComputeProgressManager().set_trade_progress)(
                                            code,
                                            buy_progress_index,
                                            total_datas,
                                            num_operate_map)
                                        f4 = dask.delayed(
                                            SecondCancelBigNumComputer().set_transaction_index)(
                                            code,
                                            buy_progress_index)
                                        dask.compute(f1, f2, f3, f4)
                            except Exception as e:
                                hx_logger_l2_transaction.exception(e)
                            TradeServerProcessor.l2_transaction(code, datas)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "l2_market_data":
@@ -269,42 +195,7 @@
                            data = data_json["data"]
                            code = data["code"]
                            data = data["data"]
                            time_str = f"{data['dataTimeStamp']}"
                            if time_str.startswith("9"):
                                time_str = "0" + time_str
                            time_str = time_str[:6]
                            time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
                            buy_1_price, buy_1_volume = data["buy"][0]
                            sell_1_price, sell_1_volume = data["sell"][0]
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            if limit_up_price is not None:
                                # 处理买1,卖1信息
                                code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str,
                                                                              limit_up_price,
                                                                              sell_1_price, sell_1_volume // 100)
                                pre_close_price = round(float(limit_up_price) / 1.1, 2)
                                # 如果涨幅大于7%就读取板块
                                price_rate = (buy_1_price - pre_close_price) / pre_close_price
                                if price_rate > 0.07:
                                    if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code):
                                        blocks = kpl_api.getCodeJingXuanBlocks(code)
                                        self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks)
                                elif price_rate > 0.03:
                                    # 添加备用板块
                                    if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True):
                                        blocks = kpl_api.getCodeJingXuanBlocks(code)
                                        self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks, by=True)
                                # 更新板块信息
                                yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
                                CodePlateKeyBuyManager.update_can_buy_blocks(code,
                                                                             kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
                                                                             kpl_data_manager.KPLLimitUpDataRecordManager.total_datas,
                                                                             yesterday_codes,
                                                                             block_info.get_before_blocks_dict())
                            hx_logger_l2_market_data.info(f"{code}#{data}")
                            TradeServerProcessor.l2_market_data(code, data)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "l2_subscript_codes":
@@ -401,6 +292,9 @@
# 交易服务处理器
class TradeServerProcessor:
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    __KPLCodeJXBlockManager = KPLCodeJXBlockManager()
    # 设置目标代码
    @classmethod
    def set_target_codes(cls, data_json):
@@ -415,6 +309,113 @@
                            f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{_datas}")
        l2_log.threadIds[code] = random.randint(0, 100000)
        l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas)
    @classmethod
    def l2_transaction(cls, code, datas):
        hx_logger_l2_transaction.info(f"{code}#{datas}")
        if datas:
            # 设置成交价
            current_price_process_manager.set_trade_price(code, datas[-1][1])
        try:
            buyno_map = l2_data_util.local_today_buyno_map.get(code)
            hx_logger_l2_transaction.info(
                f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
            buy_progress_index = None
            for i in range(len(datas) - 1, -1, -1):
                d = datas[i]
                buy_no = f"{d[6]}"
                if buyno_map and buy_no in buyno_map:
                    hx_logger_l2_transaction.info(f"{code}成交进度:{buyno_map[buy_no]}")
                    buy_progress_index = buyno_map[buy_no]["index"]
                    break
            # 获取执行位时间
            buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                code)
            if True:
                if buy_progress_index is not None:
                    cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
                    total_datas = l2_data_util.local_today_datas.get(code)
                    num_operate_map = l2_data_util.local_today_num_operate_map.get(
                        code)
                    logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code,
                                                   buy_progress_index)
                    buy_time = total_datas[buy_progress_index]["val"]["time"]
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if buy_exec_index:
                        need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                                                                                      buy_progress_index,
                                                                                      buy_exec_index,
                                                                                      total_datas,
                                                                                      num_operate_map,
                                                                                      num * 100 * float(
                                                                                          limit_up_price),
                                                                                      limit_up_price)
                        if need_cancel:
                            L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                    f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time,
                                                                                     buy_exec_index,
                                                                                     buy_progress_index,
                                                                                     total_datas,
                                                                                     num_operate_map)
                    f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
                                                                                  buy_progress_index,
                                                                                  total_datas)
                    f3 = dask.delayed(
                        deal_big_money_manager.DealComputeProgressManager().set_trade_progress)(
                        code,
                        buy_progress_index,
                        total_datas,
                        num_operate_map)
                    f4 = dask.delayed(
                        SecondCancelBigNumComputer().set_transaction_index)(
                        code,
                        buy_progress_index)
                    dask.compute(f1, f2, f3, f4)
        except Exception as e:
            hx_logger_l2_transaction.exception(e)
    @classmethod
    def l2_market_data(cls, code, data):
        time_str = f"{data['dataTimeStamp']}"
        if time_str.startswith("9"):
            time_str = "0" + time_str
        time_str = time_str[:6]
        time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
        buy_1_price, buy_1_volume = data["buy"][0]
        sell_1_price, sell_1_volume = data["sell"][0]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is not None:
            # 处理买1,卖1信息
            code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str,
                                                          limit_up_price,
                                                          sell_1_price, sell_1_volume // 100)
            pre_close_price = round(float(limit_up_price) / 1.1, 2)
            # 如果涨幅大于7%就读取板块
            price_rate = (buy_1_price - pre_close_price) / pre_close_price
            if price_rate > 0.07:
                if not cls.__KPLCodeJXBlockManager.get_jx_blocks_cache(code):
                    blocks = kpl_api.getCodeJingXuanBlocks(code)
                    cls.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks)
            elif price_rate > 0.03:
                # 添加备用板块
                if not cls.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True):
                    blocks = kpl_api.getCodeJingXuanBlocks(code)
                    cls.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks, by=True)
            # 更新板块信息
            yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
            CodePlateKeyBuyManager.update_can_buy_blocks(code,
                                                         kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
                                                         kpl_data_manager.KPLLimitUpDataRecordManager.total_datas,
                                                         yesterday_codes,
                                                         block_info.get_before_blocks_dict())
        hx_logger_l2_market_data.info(f"{code}#{data}")
def clear_invalid_client():
@@ -764,18 +765,37 @@
    def OnL2Order(self, code, datas, timestamp):
        TradeServerProcessor.l2_order(code, datas, timestamp)
    def OnL2Transaction(self, code, datas, timestamp):
        pass
    def OnL2Transaction(self, code, datas):
        TradeServerProcessor.l2_transaction(code, datas)
    def OnMarketData(self, code, datas, timestamp):
        pass
    def OnMarketData(self, code, datas):
        TradeServerProcessor.l2_market_data(code, datas)
class MyTradeResponse(TradeResponse):
    def OnTradeCallback(self, data_json):
        data_json = data_json["data"]
        type_ = data_json["type"]
        # 记录交易反馈日志
        hx_logger_trade_callback.info(data_json)
        # 重新请求委托列表与资金
        huaxin_trade_data_update.add_delegate_list()
        huaxin_trade_data_update.add_deal_list()
        huaxin_trade_data_update.add_money_list()
    def OnTradeResponse(self, data_json):
        hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
        # 设置响应内容
        trade_api.set_response(data_json["request_id"], data_json['data'])
# 回调
my_l2_data_callback = MyL2DataCallback()
my_trade_response = MyTradeResponse()
def run(pipe_trade, pipe_l1, pipe_l2):
def run(pipe_trade, pipe_l1, pipe_l2, trade_cmd_callback):
    # 执行一些初始化数据
    block_info.init()
@@ -787,7 +807,7 @@
    manager.run(blocking=False)
    # 启动交易服务
    huaxin_trade_api.run_pipe_trade(pipe_trade)
    huaxin_trade_api.run_pipe_trade(pipe_trade, trade_cmd_callback)
    # 监听l1那边传过来的代码
    t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True)