Administrator
2023-09-25 c0bcfe746b97bc126636a658b1f01fc6a51f9f95
将华鑫订单交易成功独立出来处理
8个文件已修改
1个文件已添加
357 ■■■■ 已修改文件
huaxin_client/command_manager.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api_server.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_data_update.py 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_order_processor.py 180 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_huaxin.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py
@@ -74,11 +74,10 @@
        return cls._instance
    @classmethod
    def init(cls, trade_action_callback: TradeActionCallback, pipe_l2, pipe_strategy, queue_strategy_trade):
    def init(cls, trade_action_callback: TradeActionCallback, pipe_l2, queue_strategy_trade_read: multiprocessing.Queue):
        cls.action_callback = trade_action_callback
        cls.pipe_strategy = pipe_strategy
        cls.pipe_l2 = pipe_l2
        cls.queue_strategy_trade = queue_strategy_trade
        cls.queue_strategy_trade_read = queue_strategy_trade_read
    @classmethod
    def process_command(cls, _type, client_id, result_json, sk=None):
@@ -144,10 +143,10 @@
    # 维护连接数的稳定
    def run(self, blocking=True):
        if blocking:
            self.run_process_command(self.queue_strategy_trade)
            self.run_process_command(self.queue_strategy_trade_read)
        else:
            # 接受命令
            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade), daemon=True)
            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
            t1.start()
huaxin_client/trade_client.py
@@ -2,6 +2,7 @@
import concurrent.futures
import json
import logging
import multiprocessing
import os
import threading
import time
@@ -1013,7 +1014,7 @@
        # 采用的是socket通信
        sk.sendall(socket_util.load_header(data.encode('utf-8')))
    else:
        strategy_pipe.send(data)
        queue_strategy_trade_write.put_nowait(data)
# 交易反馈回调
@@ -1069,7 +1070,9 @@
addr, port = constant.SERVER_IP, constant.SERVER_PORT
def run(trade_response_: TradeResponse = None, pipe_l2=None, pipe_strategy=None, queue_strategy_trade=None):
def run(trade_response_: TradeResponse = None, pipe_l2=None, queue_strategy_trade_write_=None,
        queue_strategy_trade_read=None):
    try:
        logger_system.info("交易进程ID:{}", os.getpid())
        logger_system.info(f"trade 线程ID:{tool.get_thread_id()}")
@@ -1077,8 +1080,8 @@
        global l2pipe
        l2pipe = pipe_l2
        global strategy_pipe
        strategy_pipe = pipe_strategy
        global queue_strategy_trade_write
        queue_strategy_trade_write = queue_strategy_trade_write_
        global trade_response
        trade_response = trade_response_
@@ -1088,7 +1091,7 @@
        global tradeCommandManager
        tradeCommandManager = command_manager.TradeCommandManager()
        tradeCommandManager.init(MyTradeActionCallback(), l2pipe, pipe_strategy, queue_strategy_trade)
        tradeCommandManager.init(MyTradeActionCallback(), l2pipe, queue_strategy_trade_read)
        logger_system.info("华鑫交易服务启动")
        tradeCommandManager.run()
    except Exception as e:
main.py
@@ -4,7 +4,6 @@
import multiprocessing
import os
import sys
from db import redis_manager_delegate as redis_manager
import huaxin_client.trade_client
@@ -23,7 +22,7 @@
# from huaxin_api import trade_client, l2_client, l1_client
def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, queue_strategy_trade):
def createTradeServer(pipe_server, queue_strategy_r_trade_w, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, queue_strategy_w_trade_r):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -49,7 +48,7 @@
    t1.start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(pipe_trade, pipe_l1, pipe_l2, queue_strategy_trade)
    huaxin_trade_server.run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r)
# 主服务
@@ -83,8 +82,7 @@
        log.close_print()
        # 策略与server间的通信
        pss_server, pss_strategy = multiprocessing.Pipe()
        # 策略与交易间的通信
        pst_trade, pst_strategy = multiprocessing.Pipe()
        # 交易与l2之间的通信
        ptl2_trade, ptl2_l2 = multiprocessing.Pipe()
        # 策略与l2之间的通信
@@ -93,7 +91,10 @@
        # l1与策略间的通信
        pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
        queue_strategy_trade = multiprocessing.Queue()
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue()
        # 策略读交易写
        queue_strategy_r_trade_w = multiprocessing.Queue()
        # 托管环境下不创建
        # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
@@ -106,11 +107,11 @@
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=lambda: huaxin_client.trade_client.run(None, ptl2_trade, pst_trade, queue_strategy_trade))
            target=lambda: huaxin_client.trade_client.run(None, ptl2_trade, queue_strategy_r_trade_w, queue_strategy_w_trade_r))
        tradeProcess.start()
        # 主进程
        createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, queue_strategy_trade)
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, queue_strategy_w_trade_r)
        # 将tradeServer作为主进程
        l1Process.join()
trade/huaxin/huaxin_trade_api.py
@@ -1,27 +1,24 @@
"""
交易API
"""
import copy
import json
import logging
import multiprocessing
import random
import threading
import time
from huaxin_client.trade_transform_protocol import TradeRequest
from log_module import async_log_util
from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, logger_trade, \
    logger_system
from trade.huaxin import huaxin_trade_data_update
from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager
from trade.huaxin.huaxin_trade_order_processor import CancelOrderManager, HuaxinOrderEntity, TradeResultProcessor
from utils import socket_util, huaxin_util, tool
# 外部传入的交易队列
pipe_trade = None
def __run_recv_pipe_trade():
    logger_system.info(f"huaxin_trade_api __run_recv_pipe_trade 线程ID:{tool.get_thread_id()}")
def __run_recv_queue_trade(queue: multiprocessing.Queue):
    # 设置结果
    def __set_response(data_json):
        if 'request_id' not in data_json:
@@ -31,10 +28,12 @@
        # 设置响应内容
        set_response(data_json["request_id"], data_json['data'])
    if pipe_trade is not None:
    logger_system.info(f"huaxin_trade_api __run_recv_pipe_trade 线程ID:{tool.get_thread_id()}")
    if queue is not None:
        while True:
            try:
                val = pipe_trade.recv()
                val = queue.get()
                if val:
                    data_json = json.loads(val)
                    # 处理数据
@@ -43,7 +42,6 @@
                        # 主动触发的响应
                        async_log_util.info(hx_logger_trade_callback,
                                            f"response:request_id-{data_json['request_id']}")
                        # 设置响应内容
                        threading.Thread(target=lambda: __set_response(data_json), daemon=True).start()
                    elif type_ == "trade_callback":
                        try:
@@ -51,11 +49,26 @@
                            data_json = data_json["data"]
                            ctype = data_json["type"]
                            # 记录交易反馈日志
                            async_log_util.info(hx_logger_trade_callback, data_json)
                            # 重新请求委托列表与资金
                            huaxin_trade_data_update.add_delegate_list("来自交易管道")
                            huaxin_trade_data_update.add_deal_list()
                            huaxin_trade_data_update.add_money_list()
                            async_log_util.info(hx_logger_trade_callback, f"{data_json}")
                            if ctype == 0:
                                data = data_json.get("data")
                                # 获取订单状态
                                code = data["securityID"]
                                accountID = data["accountID"]
                                orderStatus = data["orderStatus"]
                                orderRef = data["orderRef"]
                                orderSysID = data["orderSysID"]
                                insertTime = data.get("insertTime")
                                acceptTime = data.get("acceptTime")
                                order = HuaxinOrderEntity(code, orderStatus, orderRef, accountID, orderSysID,
                                                          insertTime=insertTime, acceptTime=acceptTime)
                                TradeResultProcessor.process_order(order)
                                # 订单相关回调
                                # 重新请求委托列表与资金
                                huaxin_trade_data_update.add_delegate_list("来自交易管道")
                                huaxin_trade_data_update.add_deal_list()
                                huaxin_trade_data_update.add_money_list()
                            # print("响应结果:", data_json['data'])
                        finally:
                            pass
@@ -64,13 +77,15 @@
# 设置交易通信队列
def run_pipe_trade(pipe_trade_, queue_strategy_trade_):
    global pipe_trade
    pipe_trade = pipe_trade_
    global queue_strategy_trade
    queue_strategy_trade = queue_strategy_trade_
    t1 = threading.Thread(target=lambda: __run_recv_pipe_trade(), daemon=True)
def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_):
    global queue_strategy_w_trade_r
    queue_strategy_w_trade_r = queue_strategy_w_trade_r_
    t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: CancelOrderManager().run(cancel_order), daemon=True)
    t1.start()
# 交易通道的错误次数
@@ -251,8 +266,7 @@
                     "request_id": request_id}
        root_data = socket_util.encryp_client_params_sign(root_data)
        start_time = time.time()
        queue_strategy_trade.put_nowait(root_data)
        # pipe_trade.send(json.dumps(root_data).encode("utf-8"))
        queue_strategy_w_trade_r.put_nowait(root_data)
        use_time = int((time.time() - start_time) * 1000)
        if use_time > 10:
            async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}")
@@ -353,12 +367,22 @@
        huaxin_trade_data_update.add_money_list()
def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None):
__canceling_order_dict = {}
def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None,
                 recancel=False):
    if not recancel:
        CancelOrderManager.start_cancel(code, orderRef, orderSysID)
    if not sinfo:
        sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}"
    order_action_ref = huaxin_util.create_order_ref()
    if not request_id:
        request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
    # 加入撤单记录,用于校验最后的撤单是否成功
    if code not in __canceling_order_dict:
        __canceling_order_dict[code] = set()
    __canceling_order_dict[code].add(json.dumps((orderRef, orderSysID)))
    # 执行2次撤单,防止没有撤到
    for i in range(2):
        request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
@@ -444,5 +468,4 @@
if __name__ == "__main__":
    d = {"id": "123123"}
    print(d.pop("id"))
    pass
trade/huaxin/huaxin_trade_api_server.py
@@ -24,6 +24,7 @@
from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager, \
    huaxin_trade_data_update
from trade.huaxin.huaxin_trade_api import ClientSocketManager
from trade.huaxin.huaxin_trade_order_processor import TradeResultProcessor, HuaxinOrderEntity
from utils import socket_util, tool, huaxin_util, data_export_util
@@ -116,12 +117,9 @@
                                            # 交易所拒绝
                                            raise Exception(resultJSON['statusMsg'])
                                        else:
                                            trade_huaxin.order_success(resultJSON['securityId'],
                                                                       resultJSON['accountID'],
                                                                       resultJSON['orderSysID'],
                                                                       resultJSON['orderRef'],
                                                                       resultJSON['insertTime']
                                                                       )
                                            # code, orderStatus, orderRef, accountID, orderSysID, insertTime=None
                                            order = HuaxinOrderEntity(resultJSON['securityId'],statusCode,resultJSON['orderRef'],resultJSON['accountID'],resultJSON['orderSysID'],resultJSON['insertTime'])
                                            TradeResultProcessor.order_success(order)
                                            return_str = json.dumps({"code": 0})
                                    finally:
                                        # 更新委托列表
@@ -148,7 +146,7 @@
                            if result["code"] == 0:
                                if result["data"]["cancel"] == 1:
                                    # 撤单成功
                                    trade_huaxin.cancel_order_success(code, accountId, orderSysID)
                                    TradeResultProcessor.cancel_order_success(code, accountId, orderSysID)
                                    return_str = json.dumps({"code": 0})
                                else:
                                    # 撤单失败
trade/huaxin/huaxin_trade_data_update.py
@@ -13,12 +13,14 @@
from trade import trade_huaxin, trade_manager
from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager
from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager
from trade.huaxin.huaxin_trade_order_processor import CancelOrderManager, HuaxinOrderEntity, TradeResultProcessor
from utils import huaxin_util
trade_data_request_queue = queue.Queue()
def __read_trade_data_queue():
# 主动更新数据
def __read_update_task_queue():
    logger_system.info("启动读取交易数据更新队列")
    while True:
        try:
@@ -40,30 +42,19 @@
                            if data:
                                codes = []
                                for d in data:
                                    if huaxin_util.is_can_cancel(d["orderStatus"]):
                                        codes.append(d["securityID"])
                                        # 设置下单成功
                                        new_place_order_index = trade_huaxin.order_success(d['securityID'],
                                                                                           d['accountID'],
                                                                                           d['orderSysID'],
                                                                                           d['orderRef'],
                                                                                           d['insertTime'])
                                        if new_place_order_index:
                                            buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = TradePointManager().get_buy_compute_start_data_cache(
                                                d['securityID'])
                                            cancel_buy_strategy.set_real_place_position(d['securityID'],
                                                                                        new_place_order_index, buy_single_index)
                                    code = d["securityID"]
                                    orderStatus = d["orderStatus"]
                                    orderSysID = d.get("orderSysID")
                                    orderRef = d["orderRef"]
                                    accountID = d["accountID"]
                                    insertTime = d.get('insertTime')
                                    acceptTime = d.get('acceptTime')
                                    elif huaxin_util.is_canceled(d["orderStatus"]) or huaxin_util.is_deal(
                                            d["orderStatus"]):
                                        # 已经撤单/已经成交,需要处理临时保存的系统订单号
                                        TradeOrderIdManager().remove_order_id(d['securityID'],
                                                                              d['accountID'],
                                                                              d['orderSysID'])
                                        if huaxin_util.is_deal(d["orderStatus"]):
                                            # 成交之后处理
                                            trade_manager.buy_success(d['securityID'])
                                    order = HuaxinOrderEntity(code, orderStatus, orderRef, accountID, orderSysID,
                                                              insertTime=insertTime, acceptTime=acceptTime)
                                    TradeResultProcessor.process_order(order)
                                    if huaxin_util.is_can_cancel(orderStatus):
                                        codes.append(code)
                                if codes:
                                    try:
                                        trade_manager.process_trade_delegate_data([{"code": c} for c in codes])
@@ -99,7 +90,6 @@
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            huaxin_trade_record_manager.PositionManager.add(data)
                    hx_logger_trade_debug.info(f"获取交易数据成功:{type_}")
                except Exception as e1:
                    # if str(e1).find("超时") >= 0:
@@ -136,5 +126,5 @@
# 运行
def run():
    t1 = threading.Thread(target=lambda: __read_trade_data_queue(), daemon=True)
    t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
    t1.start()
trade/huaxin/huaxin_trade_order_processor.py
New file
@@ -0,0 +1,180 @@
"""
华鑫交易结果处理器
"""
import copy
import json
import time
from l2 import cancel_buy_strategy, l2_data_util
from l2.huaxin import huaxin_delegate_postion_manager
from l2.l2_data_manager import TradePointManager
from log_module import async_log_util
from log_module.log import logger_trade, hx_logger_trade_debug
from trade import trade_manager
from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager
from utils import huaxin_util, tool
class HuaxinOrderEntity:
    def __init__(self, code, orderStatus, orderRef, accountID, orderSysID, insertTime=None, acceptTime=None):
        self.code = code
        self.orderStatus = orderStatus
        self.orderRef = orderRef
        self.accountID = accountID
        self.orderSysID = orderSysID
        self.insertTime = insertTime
        self.acceptTime = acceptTime
class CancelOrderManager:
    __canceling_order_dict = {}
    __recancel_order_count = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CancelOrderManager, cls).__new__(cls, *args, **kwargs)
        return cls.__instance
    # 开始撤单
    def start_cancel(self, code, order_ref, order_sys_id):
        if code not in self.__canceling_order_dict:
            self.__canceling_order_dict[code] = set()
        self.__canceling_order_dict[code].add(json.dumps((order_ref, order_sys_id, int(time.time() * 1000))))
    def __cancel_finish(self, code, order_ref, order_sys_id):
        if code not in self.__canceling_order_dict:
            return
        if not self.__canceling_order_dict[code]:
            return
        infos = copy.deepcopy(self.__canceling_order_dict[code])
        for info in infos:
            _info = json.loads(info)
            if _info[0] == order_ref or _info[1] == order_sys_id:
                # 匹配到目标数据
                self.__canceling_order_dict[code].discard(info)
    def __add_recancel_count(self, code, order_ref, order_sys_id):
        key = f"{code}_{order_ref}_{order_sys_id}"
        if key not in self.__recancel_order_count:
            self.__recancel_order_count[key] = 0
        self.__recancel_order_count[key] += 1
    def __can_recancel(self, code, order_ref, order_sys_id):
        key = f"{code}_{order_ref}_{order_sys_id}"
        if key not in self.__recancel_order_count:
            return True
        return self.__recancel_order_count[key] < 2
    # 撤单成功
    def cancel_success(self, code, order_ref, order_sys_id):
        self.__cancel_finish(code, order_ref, order_sys_id)
    # 买入成功
    def buy_success(self, code, order_ref, order_sys_id):
        self.__cancel_finish(code, order_ref, order_sys_id)
    # 传入重新下单
    def run(self, re_cancel_method):
        while True:
            try:
                if self.__canceling_order_dict:
                    for code in self.__canceling_order_dict:
                        infos = self.__canceling_order_dict[code]
                        infos = copy.deepcopy(infos)
                        for info in infos:
                            _info = json.loads(info)
                            timestamp = _info[2]
                            # 查询是否还能重新撤单
                            if not self.__can_recancel(code, _info[0], _info[1]):
                                self.__canceling_order_dict[code].discard(info)
                                continue
                            if time.time() * 1000 - timestamp > 100:
                                async_log_util.info(logger_trade, f"{code}触发重新撤单:{info}")
                                # 100ms后才进行
                                self.__add_recancel_count(code, _info[0], _info[1])
                                re_cancel_method(1, code, _info[1], orderRef=_info[0], recancel=True)
                time.sleep(0.05)
            except:
                pass
class TradeResultProcessor:
    __TradeOrderIdManager = TradeOrderIdManager()
    @classmethod
    def process_order(cls, order: HuaxinOrderEntity):
        if huaxin_util.is_can_cancel(order.orderStatus):
            # 设置下单成功
            new_place_order_index = cls.order_success(order)
            if new_place_order_index:
                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = TradePointManager().get_buy_compute_start_data_cache(
                    order.code)
                cancel_buy_strategy.set_real_place_position(order.code,
                                                            new_place_order_index,
                                                            buy_single_index)
        elif huaxin_util.is_canceled(order.orderStatus) or huaxin_util.is_deal(
                order.orderStatus):
            # 已经撤单/已经成交,需要处理临时保存的系统订单号
            cls.__TradeOrderIdManager.remove_order_id(order.code,
                                                      order.accountID,
                                                      order.orderSysID)
            if huaxin_util.is_deal(order.orderStatus):
                # 成交之后处理
                trade_manager.buy_success(order.orderStatus)
                CancelOrderManager().buy_success(order.code, order.orderRef, order.orderSysID)
            elif huaxin_util.is_canceled(order.orderStatus):
                CancelOrderManager().cancel_success(order.code, order.orderRef, order.orderSysID)
    @classmethod
    def order_success(cls, order: HuaxinOrderEntity):
        # 加入系统订单号
        cls.__TradeOrderIdManager.add_order_id(order.code, order.accountID, order.orderSysID)
        # 删除临时订单号
        cls.__TradeOrderIdManager.remove_order_ref(order.code, order.orderRef)
        # 根据插入时间判断下单位置是否正确
        try:
            place_index = huaxin_delegate_postion_manager.get_place_order_position(order.code)
            if place_index and order.acceptTime:
                # 大致判断是否为真实下单位置
                total_datas = l2_data_util.local_today_datas.get(order.code)
                if total_datas:
                    if 0 < tool.trade_time_sub(order.acceptTime, total_datas[place_index]["val"]["time"]) < 4:
                        # 4s内才会校验
                        volume = total_datas[place_index]["val"]["num"]
                        for i in range(place_index + 1, len(total_datas)):
                            if total_datas[i]["val"]["num"] == volume and order.acceptTime == total_datas[i]["val"][
                                "time"]:
                                huaxin_delegate_postion_manager.set_place_order_position(order.code, i)
                                async_log_util.info(hx_logger_trade_debug, "{}校验真实下单成功,{}->{}", order.code, place_index,
                                                    i)
                                return i
                    else:
                        raise Exception(
                            f"不满足校验条件,真实下单时间:{order.acceptTime}  预估下单时间:{total_datas[place_index]['val']['time']}")
                else:
                    raise Exception("未获取到L2数据")
            else:
                raise Exception(f"尚未获取到数据(place_index-{place_index} acceptTime-{order.acceptTime})")
        except Exception as e:
            async_log_util.warning(hx_logger_trade_debug, "{}校验真实下单位置出错:{}", order.code, str(e))
        return None
    @classmethod
    def cancel_order_success(cls, code, accountId, orderSysID):
        cls.__TradeOrderIdManager.remove_order_id(code, accountId, orderSysID)
def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None,
                 recancel=False):
    pass
if __name__ == "__main__":
    CancelOrderManager().start_cancel("000333", 1, "123123")
    # CancelOrderManager().cancel_success("000333", 1, "123123")
    # CancelOrderManager().buy_success("000333", 1, "123123")
    CancelOrderManager().run(cancel_order)
trade/huaxin/huaxin_trade_server.py
@@ -818,7 +818,7 @@
my_trade_response = MyTradeResponse()
def run(pipe_trade, pipe_l1, pipe_l2, queue_strategy_trade: multiprocessing.Queue):
def run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
@@ -832,7 +832,7 @@
        manager.run(blocking=False)
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(pipe_trade, queue_strategy_trade)
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r)
        # 监听l1那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True)
trade/trade_huaxin.py
@@ -112,11 +112,6 @@
        hx_logger_trade_debug.warning("{}校验真实下单位置出错:{}", code, str(e))
    return None
def cancel_order_success(code, accountId, orderSysID):
    __TradeOrderIdManager.remove_order_id(code, accountId, orderSysID)
# 撤单
def cancel_order(code, msg=''):
    if not constant.TRADE_ENABLE: