Administrator
2025-06-03 c4ed4da4ac8b8bc24e0a3ed0e782e9248b4a511c
huaxin_client/trade_client.py
@@ -2,8 +2,12 @@
import concurrent.futures
import json
import logging
import multiprocessing
import os
import threading
import time
import zmq
from huaxin_client import command_manager
from huaxin_client import constant
@@ -18,6 +22,7 @@
from log_module.log import logger_local_huaxin_trade_debug, logger_system, logger_trade
from utils import tool
########B类########
UserID = '388000013349'
# 登陆密码
Password = '110808'
@@ -31,6 +36,30 @@
SSE_ShareHolderID = 'A641420991'
# 深市股东账号
SZSE_ShareHolderID = '0345104949'
LOCAL_IP = constant.LOCAL_IP
FRONT_ADDRESS = "tcp://192.168.84.31:6500"
FRONT_ADDRESS1 = "tcp://192.168.84.32:26500"
########A类########
if constant.IS_A:
    UserID = '388000013942'
    # 登陆密码
    Password = '110808'
    # 投资者账户
    InvestorID = '388000013942'
    # 经济公司部门代码
    DepartmentID = '0003'
    # 资金账户
    AccountID = '388000013942'
    # 沪市股东账号
    SSE_ShareHolderID = 'A856881552'
    # 深市股东账号
    SZSE_ShareHolderID = '0363800008'
    LOCAL_IP = "192.168.10.111"
    FRONT_ADDRESS = "tcp://10.224.123.143:6500"
    FRONT_ADDRESS1 = "tcp://10.224.123.147:26500"
# # 仿真
# from mylog import logger_trade_debug
@@ -89,8 +118,21 @@
        cls.__session_id = session_id
        cls.__front_id = front_id
    # sinfo char(32)
    def buy(self, code, count, price, sinfo, order_ref):
    # sinfo
    def buy(self, code, count, price, sinfo, order_ref, shadow_price=None, cancel_shadow_order=True,
            shadow_volume=constant.SHADOW_ORDER_VOLUME):
        """
        下单
        @param shadow_volume: 影子单的量
        @param code:
        @param count:
        @param price:
        @param sinfo:char(32)
        @param order_ref:
        @param shadow_price: 影子单价格
        @param cancel_shadow_order: 是否撤影子单
        @return:
        """
        if not ENABLE_ORDER:
            return
        if sinfo in self.__buy_sinfo_set:
@@ -107,10 +149,10 @@
        # TORA_TSTP_EXD_SZSE(2): 深圳交易所
        # TORA_TSTP_EXD_HK(3): 香港交易所
        # TORA_TSTP_EXD_BSE(4): 北京证券交易所
        if code.find('00') == 0:
        if tool.is_sz_code(code):
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE
            req_field.ShareholderID = SZSE_ShareHolderID
        elif code.find('60') == 0:
        elif tool.is_sh_code(code):
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE
            req_field.ShareholderID = SSE_ShareHolderID
@@ -150,17 +192,122 @@
        '''
        其它字段置空
        '''
        # 给L2发送消息
        if l2pipe is not None:
            l2pipe.send(json.dumps({"type": "listen_volume", "data": {"code": code, "volume": count}}).encode('utf-8'))
        ret = api.ReqOrderInsert(req_field, self.req_id)
        if ret != 0:
            raise Exception('ReqOrderInsert fail, ret[%d]' % ret)
        async_log_util.info(logger_trade, f"{code}华鑫本地真实下单结束")
        return
        # --------------------------------影子订单--------------------------------
        if shadow_price:
            if order_ref:
                # 下一个影子订单
                shadow_order_ref = order_ref + 1
                shadow_sinfo = f"s_b_{order_ref}"
                req_field.LimitPrice = shadow_price
                req_field.SInfo = shadow_sinfo
                req_field.OrderRef = shadow_order_ref
                req_field.VolumeTotalOriginal = shadow_volume
                self.req_id += 1
                ret = api.ReqOrderInsert(req_field, self.req_id)
                if ret != 0:
                    raise Exception('ReqOrderInsert fail, ret[%d]' % ret)
                # 影子订单撤单
                # 撤掉影子单
                shadow_cancel_order_ref = shadow_order_ref + 1
                # 深证停留50ms上证停留200ms
                delay_s = 0.05 if tool.is_sz_code(code) else 0.2
                if cancel_shadow_order:
                    self.cancel_buy(code, f"s_c_{shadow_order_ref}", order_sys_id=None,
                                    order_ref=shadow_order_ref,
                                    order_action_ref=None, delay_s=delay_s)
        return ret
    # sinfo
    def buy_new(self, code, order_info_list):
        """
        批量下单
        @param code:
        @param order_info_list:[(量, 价, order_ref, sinfo)]
        @return:
        """
        if not ENABLE_ORDER:
            return
        async_log_util.info(logger_trade, f"{code}华鑫本地真实下单开始")
        async_log_util.info(logger_local_huaxin_trade_debug,
                            f"进入买入方法:code-{code} order_info_list-{order_info_list}")
        for order_info in order_info_list:
            count = order_info[0]
            price = order_info[1]
            order_ref = order_info[2]
            sinfo = order_info[3]
            if sinfo in self.__buy_sinfo_set:
                raise Exception(f'下单请求已经提交:{sinfo}')
            self.__buy_sinfo_set.add(sinfo)
            self.req_id += 1
            # 请求报单
            req_field = traderapi.CTORATstpInputOrderField()
            # TORA_TSTP_EXD_COMM(0): 通用(内部使用)
            # TORA_TSTP_EXD_SSE(1): 上海交易所
            # TORA_TSTP_EXD_SZSE(2): 深圳交易所
            # TORA_TSTP_EXD_HK(3): 香港交易所
            # TORA_TSTP_EXD_BSE(4): 北京证券交易所
            if tool.is_sz_code(code):
                req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE
                req_field.ShareholderID = SZSE_ShareHolderID
            elif tool.is_sh_code(code):
                req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE
                req_field.ShareholderID = SSE_ShareHolderID
            # 证券代码
            req_field.SecurityID = code
            req_field.Direction = traderapi.TORA_TSTP_D_Buy
            req_field.VolumeTotalOriginal = count
            req_field.SInfo = sinfo
            req_field.OrderRef = order_ref
            '''
            上交所支持限价指令和最优五档剩撤、最优五档剩转限两种市价指令,对于科创板额外支持本方最优和对手方最优两种市价指令和盘后固定价格申报指令
            深交所支持限价指令和立即成交剩余撤销、全额成交或撤销、本方最优、对手方最优和最优五档剩撤五种市价指令
            限价指令和上交所科创板盘后固定价格申报指令需填写报单价格,其它市价指令无需填写报单价格
            以下以上交所限价指令为例,其它指令参考开发指南相关说明填写OrderPriceType、TimeCondition和VolumeCondition三个字段:
            '''
            req_field.LimitPrice = price
            req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_LimitPrice
            req_field.TimeCondition = traderapi.TORA_TSTP_TC_GFD
            req_field.VolumeCondition = traderapi.TORA_TSTP_VC_AV
            '''
            OrderRef为报单引用,类型为整型,该字段报单时为选填
            若不填写,则系统会为每笔报单自动分配一个报单引用
            若填写,则需保证同一个TCP会话下报单引用严格单调递增,不要求连续递增,至少需从1开始编号
            '''
            # req_field.OrderRef = 1
            '''
            InvestorID为选填,若填写则需保证填写正确
            Operway为委托方式,根据券商要求填写,无特殊说明置空即可
            终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在报单回报和查询报单时返回给终端
            '''
            # req_field.SInfo = 'sinfo'
            # req_field.IInfo = 123
            '''
            其它字段置空
            '''
            ret = api.ReqOrderInsert(req_field, self.req_id)
            if ret != 0:
                raise Exception('ReqOrderInsert fail, ret[%d]' % ret)
        async_log_util.info(logger_trade, f"{code}华鑫本地真实下单结束")
    # 撤买
    def cancel_buy(self, code, sinfo, order_sys_id=None, order_ref=None, order_action_ref=None):
    def cancel_buy(self, code, sinfo, order_sys_id=None, order_ref=None, order_action_ref=None, delay_s=0.0):
        if delay_s > 0:
            time.sleep(delay_s)
        if sinfo in self.__cancel_buy_sinfo_set:
            raise Exception(f'撤单请求已经提交:{sinfo}')
        async_log_util.info(logger_local_huaxin_trade_debug,
@@ -169,9 +316,9 @@
        self.req_id += 1
        # 请求撤单
        req_field = traderapi.CTORATstpInputOrderActionField()
        if code.find('00') == 0:
        if tool.is_sz_code(code):
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE
        elif code.find('60') == 0:
        elif tool.is_sh_code(code):
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE
        req_field.ActionFlag = traderapi.TORA_TSTP_AF_Delete
@@ -208,8 +355,29 @@
            raise Exception('ReqOrderAction fail, ret[%d]' % ret)
        return
    # 批量撤买
    def batch_cancel_buy(self, code, order_infos, sinfos, order_action_refs, delay_s=0.0):
        """
        批量撤单
        @param code:
        @param order_infos:[(order_ref, order_sys_id)]
        @param sinfos:
        @param order_action_refs:
        @param delay_s:
        @return:
        """
        for i in range(len(order_infos)):
            order_ref, order_sys_id = order_infos[i][0],  order_infos[i][1]
            sinfo = sinfos[i]
            order_action_ref = order_action_refs[i]
            if order_sys_id:
                self.cancel_buy(code, sinfo, order_sys_id=order_sys_id, order_action_ref=order_action_ref, delay_s=delay_s)
            else:
                self.cancel_buy(code, sinfo,  order_ref=order_ref, order_action_ref=order_action_ref, delay_s=delay_s)
        return
    # 卖
    def sell(self, code, count, price, price_type, sinfo):
    def sell(self, code, count, price, price_type, sinfo, order_ref=None):
        if sinfo in self.__sell_sinfo_set:
            raise Exception(f'下单请求已经提交:{sinfo}')
        self.__sell_sinfo_set.add(sinfo)
@@ -221,10 +389,10 @@
        # TORA_TSTP_EXD_SZSE(2): 深圳交易所
        # TORA_TSTP_EXD_HK(3): 香港交易所
        # TORA_TSTP_EXD_BSE(4): 北京证券交易所
        if code.find('00') == 0:
        if tool.is_sz_code(code):
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE
            req_field.ShareholderID = SZSE_ShareHolderID
        elif code.find('60') == 0:
        elif tool.is_sh_code(code):
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE
            req_field.ShareholderID = SSE_ShareHolderID
@@ -259,6 +427,8 @@
        req_field.TimeCondition = traderapi.TORA_TSTP_TC_GFD
        req_field.VolumeCondition = traderapi.TORA_TSTP_VC_AV
        if order_ref:
            req_field.OrderRef = order_ref
        '''
        OrderRef为报单引用,类型为整型,该字段报单时为选填
@@ -291,9 +461,9 @@
        self.req_id += 1
        # 请求撤单
        req_field = traderapi.CTORATstpInputOrderActionField()
        if code.find('00') == 0:
        if tool.is_sz_code(code):
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE
        elif code.find('60') == 0:
        elif tool.is_sh_code(code):
            req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE
        req_field.ActionFlag = traderapi.TORA_TSTP_AF_Delete
@@ -405,6 +575,9 @@
        # 终端信息采集
        # UserProductInfo填写终端名称
        login_req.UserProductInfo = 'jiabei'
        login_req.DynamicPassword = 'rxoB195F'
        # 按照监管要求填写终端信息
        login_req.TerminalInfo = 'PC;IIP=123.112.154.118;IPORT=50361;LIP=192.168.118.107;MAC=54EE750B1713FCF8AE5CBD58;HD=TF655AY91GHRVL'
        # 以下内外网IP地址若不填则柜台系统自动采集,若填写则以终端填值为准报送
@@ -445,11 +618,11 @@
    def OnRspGetConnectionInfo(self, pConnectionInfoField: "CTORATstpConnectionInfoField",
                               pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        if pRspInfoField.ErrorID == 0:
            logger.info('inner_ip_address[%s]' % pConnectionInfoField.InnerIPAddress)
            logger.info('inner_port[%d]' % pConnectionInfoField.InnerPort)
            logger.info('outer_ip_address[%s]' % pConnectionInfoField.OuterIPAddress)
            logger.info('outer_port[%d]' % pConnectionInfoField.OuterPort)
            logger.info('mac_address[%s]' % pConnectionInfoField.MacAddress)
            logger_trade.info('inner_ip_address[%s]' % pConnectionInfoField.InnerIPAddress)
            logger_trade.info('inner_port[%d]' % pConnectionInfoField.InnerPort)
            logger_trade.info('outer_ip_address[%s]' % pConnectionInfoField.OuterIPAddress)
            logger_trade.info('outer_port[%d]' % pConnectionInfoField.OuterPort)
            logger_trade.info('mac_address[%s]' % pConnectionInfoField.MacAddress)
            # 请求登录
            login_req = traderapi.CTORATstpReqUserLoginField()
@@ -483,8 +656,10 @@
            # 终端信息采集
            # UserProductInfo填写终端名称
            login_req.UserProductInfo = 'jiabei'
            login_req.DynamicPassword = 'rxoB195F'
            # 按照监管要求填写终端信息
            login_req.TerminalInfo = 'PC;IIP=NA;IPORT=NA;LIP=192.168.84.75;MAC=5C6F69CC2B40;HD=004bc76004aff0882b9052ba0eb00506;@jiabei'
            # a0:36:9f:ea:fb:bc
            login_req.TerminalInfo = f'PC;IIP=NA;IPORT=NA;LIP={LOCAL_IP};MAC=A0369FEAFBBC;HD=00e3aeeed512b6782d0043b96480e04e;@jiabei'
            # 以下内外网IP地址若不填则柜台系统自动采集,若填写则以终端填值为准报送
            # login_req.MacAddress = '5C-87-9C-96-F3-E3'
            # login_req.InnerIPAddress = '10.0.1.102'
@@ -493,20 +668,20 @@
            TradeSimpleApi.req_id += 1
            ret = api.ReqUserLogin(login_req, TradeSimpleApi.req_id)
            if ret != 0:
                logger.info('ReqUserLogin fail, ret[%d]' % ret)
                logger_trade.info('ReqUserLogin fail, ret[%d]' % ret)
        else:
            logger.info('GetConnectionInfo fail, [%d] [%d] [%s]!!!' % (
            logger_trade.info('GetConnectionInfo fail, [%d] [%d] [%s]!!!' % (
                nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspUserLogin(self, pRspUserLoginField: "CTORATstpRspUserLoginField", pRspInfoField: "CTORATstpRspInfoField",
                       nRequestID: "int") -> "void":
        if pRspInfoField.ErrorID == 0:
            logger.info('Login success! [%d]' % nRequestID)
            logger_trade.info('Login success! [%d]' % nRequestID)
            self.__front_id = pRspUserLoginField.FrontID
            self.__session_id = pRspUserLoginField.SessionID
            TradeSimpleApi.set_login_info(self.__session_id, self.__front_id)
            if 0:
            if 1:
                # 查询股东账号
                req_field = traderapi.CTORATstpQryShareholderAccountField()
@@ -525,8 +700,8 @@
                TradeSimpleApi().get_money_account()
        else:
            logger.info('Login fail!!! [%d] [%d] [%s]'
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
            logger_trade.info('Login fail!!! [%d] [%d] [%s]'
                              % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspUserPasswordUpdate(self, pUserPasswordUpdateField: "CTORATstpUserPasswordUpdateField",
                                pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
@@ -579,12 +754,12 @@
                            pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        try:
            if pInputOrderActionField and pRspInfoField:
                async_log_util.info(logger_local_huaxin_trade_debug, 'OnErrRtnOrderAction: Error! [%d] [%d] [%d] [%s]'
                async_log_util.info(logger_local_huaxin_trade_debug, 'OnErrRtnOrderAction: Error! [%s] [%s] [%s] [%s]'
                                    % (nRequestID, pInputOrderActionField.OrderSysID,
                                       pRspInfoField.ErrorID,
                                       pRspInfoField.ErrorMsg))
        except:
            async_log_util.info(logger_local_huaxin_trade_debug, "OnErrRtnOrderAction: 撤单出错")
        except Exception as e:
            async_log_util.info(logger_local_huaxin_trade_debug, f"OnErrRtnOrderAction: 撤单出错-{str(e)}")
    def OnRspInquiryJZFund(self, pRspInquiryJZFundField: "CTORATstpRspInquiryJZFundField",
                           pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
@@ -620,17 +795,25 @@
                                   pOrderField.OrderRef, pOrderField.OrderLocalID,
                                   pOrderField.LimitPrice, pOrderField.VolumeTotalOriginal, pOrderField.OrderSysID,
                                   pOrderField.OrderStatus, pOrderField.InsertTime))
            if pOrderField.OrderStatus != traderapi.TORA_TSTP_OST_Unknown:
            if pOrderField.OrderStatus == traderapi.TORA_TSTP_OST_Unknown:
                pass
            #     if queue_trade_w_l2_r is not None:
            #         queue_trade_w_l2_r.put_nowait(
            #             json.dumps({"type": "listen_volume", "data": {"code": pOrderField.SecurityID,
            #                                                           "volume": pOrderField.VolumeTotalOriginal}}).encode(
            #                 'utf-8'))
            else:
                order_data = {"sinfo": pOrderField.SInfo, "securityID": pOrderField.SecurityID,
                              "orderLocalID": pOrderField.OrderLocalID,
                              "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID,
                              "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate,
                              "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime,
                              "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID,
                              "turnover": pOrderField.Turnover,
                              "volume": pOrderField.VolumeTotalOriginal,
                              "volumeTraded": pOrderField.VolumeTraded, "orderStatus": pOrderField.OrderStatus,
                              "orderSubmitStatus": pOrderField.OrderSubmitStatus, "statusMsg": pOrderField.StatusMsg}
                              "orderRef": pOrderField.OrderRef, "turnover": pOrderField.Turnover,
                              "volume": pOrderField.VolumeTotalOriginal, "volumeTraded": pOrderField.VolumeTraded,
                              "orderStatus": pOrderField.OrderStatus,
                              "orderSubmitStatus": pOrderField.OrderSubmitStatus,
                              "statusMsg": pOrderField.StatusMsg}
                self.call_back_thread_pool.submit(self.__data_callback, TYPE_ORDER, 0, order_data)
        except Exception as e:
            async_log_util.error(logger_local_huaxin_trade_debug, "OnRtnOrder 出错")
@@ -689,9 +872,10 @@
                                   pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int",
                                   bIsLast: "bool") -> "void":
        if bIsLast != 1:
            logger.info('OnRspQryShareholderAccount[%d]: InvestorID[%s] ExchangeID[%s] ShareholderID[%s]'
                        % (nRequestID, pShareholderAccountField.InvestorID, pShareholderAccountField.ExchangeID,
                           pShareholderAccountField.ShareholderID))
            logger_local_huaxin_trade_debug.info(
                'OnRspQryShareholderAccount[%d]: InvestorID[%s] ExchangeID[%s] ShareholderID[%s]'
                % (nRequestID, pShareholderAccountField.InvestorID, pShareholderAccountField.ExchangeID,
                   pShareholderAccountField.ShareholderID))
        else:
            logger.info('查询股东账户结束[%d] ErrorID[%d] ErrorMsg[%s]'
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
@@ -708,7 +892,9 @@
                     "usefulMoney": round(pTradingAccountField.UsefulMoney, 2),
                     "frozenCash": round(pTradingAccountField.FrozenCash, 2),
                     "fetchLimit": round(pTradingAccountField.FetchLimit, 2),
                     "preDeposit": round(pTradingAccountField.PreDeposit, 2)})
                     "preDeposit": round(pTradingAccountField.PreDeposit, 2),
                     "commission": round(pTradingAccountField.Commission, 2)
                     })
                # logger.info(
                #     'OnRspQryTradingAccount[%d]: DepartmentID[%s] InvestorID[%s] AccountID[%s] CurrencyID[%s] UsefulMoney[%.2f] FetchLimit[%.2f]'
                #     % (nRequestID, pTradingAccountField.DepartmentID, pTradingAccountField.InvestorID,
@@ -740,10 +926,11 @@
                     "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate,
                     "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime,
                     "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID,
                     "turnover": pOrderField.Turnover,
                     "turnover": pOrderField.Turnover, "orderRef": pOrderField.OrderRef,
                     "volume": pOrderField.VolumeTotalOriginal,
                     "volumeTraded": pOrderField.VolumeTraded, "orderStatus": pOrderField.OrderStatus,
                     "orderSubmitStatus": pOrderField.OrderSubmitStatus, "statusMsg": pOrderField.StatusMsg})
                     "orderSubmitStatus": pOrderField.OrderSubmitStatus, "statusMsg": pOrderField.StatusMsg,"sinfo": pOrderField.SInfo
                     })
            else:
                # logger.info('查询报单结束[%d] ErrorID[%d] ErrorMsg[%s]'
                #             % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
@@ -820,29 +1007,55 @@
class MyTradeActionCallback(command_manager.TradeActionCallback):
    __tradeSimpleApi = TradeSimpleApi()
    trade_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    trade_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30)
    def OnTrade(self, client_id, request_id, sk, type_, data):
        if type_ == 1:
            async_log_util.info(logger_local_huaxin_trade_debug,
                                f"\n---------------------\n请求下单:client_id-{client_id} request_id-{request_id}")
                                f"\n---------------------\n请求下单:client_id-{client_id} request_id-{request_id}  data:{data}")
            # 下单
            # 1-买 2-卖
            direction = data["direction"]
            code = data["code"]
            volume = data["volume"]
            price = data["price"]
            sinfo = data["sinfo"]
            order_ref = data.get("order_ref")
            sinfo = data["sinfo"]
            # 老版本下单
            volume = data.get("volume")
            price = data.get("price")
            order_ref = data.get("order_ref")
            shadow_price = data.get("shadow_price")
            shadow_volume = data.get("shadow_volume")
            cancel_shadow = data.get("cancel_shadow")
            # 新版下单
            order_info_list = data.get("order_info_list")
            blocking = data.get("blocking")
            if cancel_shadow is None:
                cancel_shadow = True
            if shadow_volume is None:
                shadow_volume = constant.SHADOW_ORDER_VOLUME
            if direction == 1:
                async_log_util.info(logger_trade, f"{code}华鑫本地开始下单")
                # 买
                try:
                    req_rid_dict[sinfo] = (client_id, request_id, sk, order_ref)
                    if blocking:
                        if not order_info_list:
                            req_rid_dict[sinfo] = (client_id, request_id, sk, order_ref)
                    # threading.Thread(target=lambda: self.__tradeSimpleApi.buy(code, volume, price, sinfo, order_ref),
                    #                  daemon=True).start()
                    self.trade_thread_pool.submit(self.__tradeSimpleApi.buy, code, volume, price, sinfo, order_ref)
                    if not order_info_list:
                        self.trade_thread_pool.submit(self.__tradeSimpleApi.buy, code, volume, price, sinfo, order_ref,
                                                      shadow_price, cancel_shadow, shadow_volume)
                    else:
                        if order_info_list:
                            for i in range(len(order_info_list)):
                                order_info = order_info_list[i]
                                order_info_list[i] = (order_info[0], order_info[1], order_info[2], f"{sinfo}_{i}")
                        if blocking:
                            for x in order_info_list:
                                req_rid_dict[x[3]] = (client_id, request_id, sk, x[2])
                        self.trade_thread_pool.submit(self.__tradeSimpleApi.buy_new, code, order_info_list)
                    async_log_util.info(logger_trade, f"{code}华鑫本地下单线程结束")
                except Exception as e:
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id,
@@ -852,8 +1065,9 @@
            elif direction == 2:
                try:
                    price_type = data["price_type"]
                    req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.__tradeSimpleApi.sell(code, volume, price, price_type, sinfo)
                    if blocking:
                        req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.__tradeSimpleApi.sell(code, volume, price, price_type, sinfo, order_ref)
                    print("sell", req_rid_dict)
                except Exception as e:
                    logging.exception(e)
@@ -869,18 +1083,34 @@
            orderSysID = data.get("orderSysID")
            orderRef = data.get("orderRef")
            orderActionRef = data.get("orderActionRef")
            sinfo = data["sinfo"]
            sinfo = data.get("sinfo")
            # =====批量撤单采用此种方法======
            # [(orderRef, orderSysID)]
            orderInfos = data.get("orderInfos")
            orderActionRefs = data.get("orderActionRefs")
            sinfos = data.get("sinfos")
            if direction == 1:
                # 撤买
                # 撤买 ,可采用单个撤单与批量撤单
                try:
                    if not orderSysID and orderRef is None:
                        raise Exception("没有找到系统订单号或者报单引用")
                    req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.trade_thread_pool.submit(
                        lambda: self.__tradeSimpleApi.cancel_buy(code, sinfo, order_sys_id=orderSysID,
                                                                 order_ref=orderRef, order_action_ref=orderActionRef))
                    async_log_util.info(logger_local_huaxin_trade_debug,
                                        f"撤单结束:code-{code} order_sys_id-{orderSysID} sinfo-{sinfo}")
                    if orderInfos:
                        # 批量撤买
                        if len(orderInfos) != len(orderActionRefs) or len(orderInfos) != len(sinfos):
                            raise Exception("批量撤单:订单数量与orderActionRefs/sinfos数量不匹配")
                        req_rid_dict[sinfo] = (client_id, request_id, sk)
                        self.trade_thread_pool.submit(
                            lambda: self.__tradeSimpleApi.batch_cancel_buy(code, orderInfos, sinfos, orderActionRefs))
                        async_log_util.info(logger_local_huaxin_trade_debug,
                                            f"批量撤单结束:code-{code} order_infos-{orderInfos} sinfos-{sinfos} order_action_refs-{orderActionRefs}")
                    else:
                        if not orderSysID and orderRef is None:
                            raise Exception("没有找到系统订单号或者报单引用")
                        req_rid_dict[sinfo] = (client_id, request_id, sk)
                        self.trade_thread_pool.submit(
                            lambda: self.__tradeSimpleApi.cancel_buy(code, sinfo, order_sys_id=orderSysID,
                                                                     order_ref=orderRef, order_action_ref=orderActionRef))
                        async_log_util.info(logger_local_huaxin_trade_debug,
                                            f"撤单结束:code-{code} order_sys_id-{orderSysID} sinfo-{sinfo}")
                except Exception as e:
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_CANCEL_ORDER, client_id,
                                  request_id)
@@ -955,9 +1185,14 @@
    if 1:  # 模拟环境,TCP 直连Front方式
        # 注册单个交易前置服务地址
        ##B类服务器##
        logger.info(f"注册交易地址:{FRONT_ADDRESS}/{FRONT_ADDRESS1}")
        api.RegisterFront(FRONT_ADDRESS)  # 正式环境主地址
        api.RegisterFront(FRONT_ADDRESS1)  # 正式环境备用地址
        api.RegisterFront("tcp://192.168.84.31:6500")  # 正式环境主地址
        api.RegisterFront("tcp://192.168.84.32:26500")  # 正式环境备用地址
        ##A类服务器##
        # api.RegisterFront("tcp://10.224.123.143:6500")  # 正式环境主地址
        # api.RegisterFront("tcp://10.224.123.147:26500")  # 正式环境备用地址
        # TD_TCP_FrontAddress = "tcp://210.14.72.21:4400"  # 仿真交易环境
        # TD_TCP_FrontAddress = "tcp://210.14.72.15:4400"  # 24小时环境A套
@@ -1010,7 +1245,7 @@
        # 采用的是socket通信
        sk.sendall(socket_util.load_header(data.encode('utf-8')))
    else:
        strategy_pipe.send(data)
        queue_strategy_trade_write.put_nowait(data)
# 交易反馈回调
@@ -1021,6 +1256,7 @@
        key = data["sinfo"]
    try:
        if req_rid_dict and key in req_rid_dict:
            # TODO 处理批量下单
            temp_params = req_rid_dict.pop(key)
            client_id, request_id = temp_params[0], temp_params[1]
            # 本地订单号-系统订单号映射
@@ -1033,27 +1269,18 @@
            # send_response(
            #     json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
            #                 "request_id": request_id}), type, client_id, request_id, temp_params[2])
            if trade_response:
                trade_response.OnTradeResponse(
                    {"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                     "request_id": request_id})
            else:
                send_response(
                    json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                                "request_id": request_id}), type, client_id, request_id, temp_params[2])
            send_response(
                json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                            "request_id": request_id}), type, client_id, request_id)
            async_log_util.info(logger_local_huaxin_trade_debug, "API回调结束 req_id-{} request_id-{}", req_id, request_id)
        else:
            async_log_util.info(logger_local_huaxin_trade_debug, "非API回调 req_id-{}", req_id)
            if trade_response:
                trade_response.OnTradeCallback({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}})
            # # 非API回调
            else:
                send_response(
                    json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}),
                    type,
                    None,
                    req_id)
            send_response(
                json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}),
                type,
                None,
                req_id)
    except Exception as e:
        logging.exception(e)
@@ -1066,43 +1293,33 @@
addr, port = constant.SERVER_IP, constant.SERVER_PORT
def process_cmd(tradeRequest: TradeRequest):
    tradeCommandManager.process_command(tradeRequest.type_, None, tradeRequest.data)
def __test():
    # 测试撤单
    for i in range(0, 10):
        code = "600190"
        orderSysID = "0190000229"
        sinfo = f"test_cancel_{i}"
        data = {"type": "trade", "trade_type": 2,
                "direction": 0,
                "code": code,
                "localOrderID": "",
                "orderSysID": orderSysID, "sinfo": sinfo}
        process_cmd(TradeRequest("trade", {"type": "trade", "data": data, "request_id": f"test-{i}"}, f"test-{i}"))
        time.sleep(2)
def run(trade_response_: TradeResponse=None, pipe_l2=None, pipe_strategy=None):
def run(ipc_order_addr, ipc_cancel_order_addr, queue_strategy_trade_write_=None,
        queue_strategy_trade_read=None, queue_strategy_trade_read_for_read=None):
    """
    运行
    @param ipc_order_addr: zmq下单命令ipc地址
    @param ipc_cancel_order_addr: zmq撤单命令ipc地址
    @param queue_strategy_trade_write_:
    @param queue_strategy_trade_read:
    @param queue_strategy_trade_read_for_read:
    @return:
    """
    try:
        logger_system.info("交易进程ID:{}", os.getpid())
        logger_system.info(f"trade 线程ID:{tool.get_thread_id()}")
        __init_trade_data_server()
        global l2pipe
        l2pipe = pipe_l2
        global strategy_pipe
        strategy_pipe = pipe_strategy
        global queue_strategy_trade_write
        queue_strategy_trade_write = queue_strategy_trade_write_
        global trade_response
        trade_response = trade_response_
        # 运行日志同步
        threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
        global tradeCommandManager
        tradeCommandManager = command_manager.TradeCommandManager()
        tradeCommandManager.init(MyTradeActionCallback(), l2pipe, pipe_strategy)
        tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_trade_read, queue_strategy_trade_read_for_read)
        logger_system.info("华鑫交易服务启动")
        tradeCommandManager.run(ipc_order_addr, ipc_cancel_order_addr)
    except Exception as e:
        logger_system.exception(e)
    # 不需要运行命令解析
@@ -1125,5 +1342,4 @@
    # while True:
    #     time.sleep(1)
    run()
    input()