# -*- coding: utf-8 -*- import concurrent.futures import json import logging import os import threading import time from huaxin_client import command_manager from huaxin_client import constant from huaxin_client import socket_util import traderapi from huaxin_client.client_network import SendResponseSkManager from huaxin_client.log import logger # 正式账号 from log_module import async_log_util from log_module.log import logger_local_huaxin_trade_debug, logger_system, logger_trade from utils import tool ########B类######## UserID = '388000013349' # 登陆密码 Password = '110808' # 投资者账户 InvestorID = '388000013349' # 经济公司部门代码 DepartmentID = '0003' # 资金账户 AccountID = '388000013349' # 沪市股东账号 SSE_ShareHolderID = 'A641420991' # 深市股东账号 SZSE_ShareHolderID = '0345104949' LOCAL_IP = constant.LOCAL_IP FRONT_ADDRESS = "tcp://192.168.84.31:6500" FRONT_ADDRESS1 = "tcp://192.168.84.32:26500" # # 仿真 # from mylog import logger_trade_debug # # UserID = '00043201' # # 登陆密码 # Password = '45249973' # # 投资者账户 # InvestorID = '11160150' # # 经济公司部门代码 # DepartmentID = '0003' # # 资金账户 # AccountID = '00043201' # # 沪市股东账号 # SSE_ShareHolderID = 'A00043201' # # 深市股东账号 # SZSE_ShareHolderID = '700043201' # # 登录用户 # UserID = '00572083' # # 登陆密码 # Password = '16121950' # # 投资者账户 # InvestorID = '11160150' # # 经济公司部门代码 # DepartmentID = '0057' # # 资金账户 # AccountID = '00572083' # # 沪市股东账号 # SSE_ShareHolderID = 'A00572083' # # 深市股东账号 # SZSE_ShareHolderID = '700572083' TYPE_ORDER = 0 TYPE_CANCEL_ORDER = 1 TYPE_LIST_DELEGATE = 2 TYPE_LIST_TRADED = 3 TYPE_LIST_POSITION = 4 TYPE_LIST_MONEY = 5 # 成交 TYPE_DEAL = 6 ENABLE_ORDER = True class TradeSimpleApi: req_id = 0 __buy_sinfo_set = set() __sell_sinfo_set = set() __cancel_buy_sinfo_set = set() __cancel_sell_sinfo_set = set() @classmethod def set_login_info(cls, session_id, front_id): cls.__session_id = session_id cls.__front_id = front_id # sinfo def buy(self, code, count, price, sinfo, order_ref, shadow_price=None, cancel_shadow_order=True, shadow_volume=constant.SHADOW_ORDER_VOLUME): """ 下单 @param shadow_volume: 影子单的量 @param code: @param count: @param price: @param sinfo:char(32) @param order_ref: @param shadow_price: 影子单价格 @param cancel_shadow_order: 是否撤影子单 @return: """ if not ENABLE_ORDER: return if sinfo in self.__buy_sinfo_set: raise Exception(f'下单请求已经提交:{sinfo}') async_log_util.info(logger_trade, f"{code}华鑫本地真实下单开始") async_log_util.info(logger_local_huaxin_trade_debug, f"进入买入方法:code-{code} sinfo-{sinfo} order_ref-{order_ref}") self.__buy_sinfo_set.add(sinfo) self.req_id += 1 # 请求报单 req_field = traderapi.CTORATstpInputOrderField() # TORA_TSTP_EXD_COMM(0): 通用(内部使用) # TORA_TSTP_EXD_SSE(1): 上海交易所 # TORA_TSTP_EXD_SZSE(2): 深圳交易所 # TORA_TSTP_EXD_HK(3): 香港交易所 # TORA_TSTP_EXD_BSE(4): 北京证券交易所 if tool.is_sz_code(code): req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE req_field.ShareholderID = SZSE_ShareHolderID elif tool.is_sh_code(code): req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE req_field.ShareholderID = SSE_ShareHolderID # 证券代码 req_field.SecurityID = code req_field.Direction = traderapi.TORA_TSTP_D_Buy req_field.VolumeTotalOriginal = count req_field.SInfo = sinfo req_field.OrderRef = order_ref ''' 上交所支持限价指令和最优五档剩撤、最优五档剩转限两种市价指令,对于科创板额外支持本方最优和对手方最优两种市价指令和盘后固定价格申报指令 深交所支持限价指令和立即成交剩余撤销、全额成交或撤销、本方最优、对手方最优和最优五档剩撤五种市价指令 限价指令和上交所科创板盘后固定价格申报指令需填写报单价格,其它市价指令无需填写报单价格 以下以上交所限价指令为例,其它指令参考开发指南相关说明填写OrderPriceType、TimeCondition和VolumeCondition三个字段: ''' req_field.LimitPrice = price req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_LimitPrice req_field.TimeCondition = traderapi.TORA_TSTP_TC_GFD req_field.VolumeCondition = traderapi.TORA_TSTP_VC_AV ''' OrderRef为报单引用,类型为整型,该字段报单时为选填 若不填写,则系统会为每笔报单自动分配一个报单引用 若填写,则需保证同一个TCP会话下报单引用严格单调递增,不要求连续递增,至少需从1开始编号 ''' # req_field.OrderRef = 1 ''' InvestorID为选填,若填写则需保证填写正确 Operway为委托方式,根据券商要求填写,无特殊说明置空即可 终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在报单回报和查询报单时返回给终端 ''' # req_field.SInfo = 'sinfo' # req_field.IInfo = 123 ''' 其它字段置空 ''' ret = api.ReqOrderInsert(req_field, self.req_id) if ret != 0: raise Exception('ReqOrderInsert fail, ret[%d]' % ret) async_log_util.info(logger_trade, f"{code}华鑫本地真实下单结束") # --------------------------------影子订单-------------------------------- if shadow_price: if order_ref: # 下一个影子订单 shadow_order_ref = order_ref + 1 shadow_sinfo = f"s_b_{order_ref}" req_field.LimitPrice = shadow_price req_field.SInfo = shadow_sinfo req_field.OrderRef = shadow_order_ref req_field.VolumeTotalOriginal = shadow_volume self.req_id += 1 ret = api.ReqOrderInsert(req_field, self.req_id) if ret != 0: raise Exception('ReqOrderInsert fail, ret[%d]' % ret) # 影子订单撤单 # 撤掉影子单 shadow_cancel_order_ref = shadow_order_ref + 1 # 深证停留50ms上证停留200ms delay_s = 0.05 if tool.is_sz_code(code) else 0.2 if cancel_shadow_order: self.cancel_buy(code, f"s_c_{shadow_order_ref}", order_sys_id=None, order_ref=shadow_order_ref, order_action_ref=None, delay_s=delay_s) return ret # sinfo def buy_new(self, code, order_info_list): """ 批量下单 @param code: @param order_info_list:[(量, 价, order_ref, sinfo)] @return: """ if not ENABLE_ORDER: return async_log_util.info(logger_trade, f"{code}华鑫本地真实下单开始") async_log_util.info(logger_local_huaxin_trade_debug, f"进入买入方法:code-{code} order_info_list-{order_info_list}") for order_info in order_info_list: count = order_info[0] price = order_info[1] order_ref = order_info[2] sinfo = order_info[3] if sinfo in self.__buy_sinfo_set: raise Exception(f'下单请求已经提交:{sinfo}') self.__buy_sinfo_set.add(sinfo) self.req_id += 1 # 请求报单 req_field = traderapi.CTORATstpInputOrderField() # TORA_TSTP_EXD_COMM(0): 通用(内部使用) # TORA_TSTP_EXD_SSE(1): 上海交易所 # TORA_TSTP_EXD_SZSE(2): 深圳交易所 # TORA_TSTP_EXD_HK(3): 香港交易所 # TORA_TSTP_EXD_BSE(4): 北京证券交易所 if tool.is_sz_code(code): req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE req_field.ShareholderID = SZSE_ShareHolderID elif tool.is_sh_code(code): req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE req_field.ShareholderID = SSE_ShareHolderID # 证券代码 req_field.SecurityID = code req_field.Direction = traderapi.TORA_TSTP_D_Buy req_field.VolumeTotalOriginal = count req_field.SInfo = sinfo req_field.OrderRef = order_ref ''' 上交所支持限价指令和最优五档剩撤、最优五档剩转限两种市价指令,对于科创板额外支持本方最优和对手方最优两种市价指令和盘后固定价格申报指令 深交所支持限价指令和立即成交剩余撤销、全额成交或撤销、本方最优、对手方最优和最优五档剩撤五种市价指令 限价指令和上交所科创板盘后固定价格申报指令需填写报单价格,其它市价指令无需填写报单价格 以下以上交所限价指令为例,其它指令参考开发指南相关说明填写OrderPriceType、TimeCondition和VolumeCondition三个字段: ''' req_field.LimitPrice = price req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_LimitPrice req_field.TimeCondition = traderapi.TORA_TSTP_TC_GFD req_field.VolumeCondition = traderapi.TORA_TSTP_VC_AV ''' OrderRef为报单引用,类型为整型,该字段报单时为选填 若不填写,则系统会为每笔报单自动分配一个报单引用 若填写,则需保证同一个TCP会话下报单引用严格单调递增,不要求连续递增,至少需从1开始编号 ''' # req_field.OrderRef = 1 ''' InvestorID为选填,若填写则需保证填写正确 Operway为委托方式,根据券商要求填写,无特殊说明置空即可 终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在报单回报和查询报单时返回给终端 ''' # req_field.SInfo = 'sinfo' # req_field.IInfo = 123 ''' 其它字段置空 ''' ret = api.ReqOrderInsert(req_field, self.req_id) if ret != 0: raise Exception('ReqOrderInsert fail, ret[%d]' % ret) async_log_util.info(logger_trade, f"{code}华鑫本地真实下单结束") # 撤买 def cancel_buy(self, code, sinfo, order_sys_id=None, order_ref=None, order_action_ref=None, delay_s=0.0): if delay_s > 0: time.sleep(delay_s) if sinfo in self.__cancel_buy_sinfo_set: raise Exception(f'撤单请求已经提交:{sinfo}') async_log_util.info(logger_local_huaxin_trade_debug, f"进入撤单方法:code-{code} order_sys_id-{order_sys_id} order_ref-{order_ref} sinfo-{sinfo}") self.__cancel_buy_sinfo_set.add(sinfo) self.req_id += 1 # 请求撤单 req_field = traderapi.CTORATstpInputOrderActionField() if tool.is_sz_code(code): req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE elif tool.is_sh_code(code): req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE req_field.ActionFlag = traderapi.TORA_TSTP_AF_Delete # 撤单支持以下两种方式定位原始报单: # (1)报单引用方式 # req_field.FrontID = self.__front_id # req_field.SessionID = self.__session_id # req_field.OrderRef = 1 # (2)系统报单编号方式 if order_sys_id: req_field.OrderSysID = order_sys_id elif order_ref is not None: req_field.OrderRef = order_ref req_field.SessionID = self.__session_id req_field.FrontID = self.__front_id if order_action_ref: req_field.OrderActionRef = order_action_ref # OrderActionRef报单操作引用,用法同报单引用,可根据需要选填 ''' 终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在查询撤单时返回给终端 ''' req_field.SInfo = sinfo # req_field.IInfo = 123 ''' 委托方式字段根据券商要求填写,无特殊说明置空即可 其它字段置空 ''' ret = api.ReqOrderAction(req_field, self.req_id) if ret != 0: raise Exception('ReqOrderAction fail, ret[%d]' % ret) return # 卖 def sell(self, code, count, price, price_type, sinfo, order_ref=None): if sinfo in self.__sell_sinfo_set: raise Exception(f'下单请求已经提交:{sinfo}') self.__sell_sinfo_set.add(sinfo) self.req_id += 1 # 请求报单 req_field = traderapi.CTORATstpInputOrderField() # TORA_TSTP_EXD_COMM(0): 通用(内部使用) # TORA_TSTP_EXD_SSE(1): 上海交易所 # TORA_TSTP_EXD_SZSE(2): 深圳交易所 # TORA_TSTP_EXD_HK(3): 香港交易所 # TORA_TSTP_EXD_BSE(4): 北京证券交易所 if tool.is_sz_code(code): req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE req_field.ShareholderID = SZSE_ShareHolderID elif tool.is_sh_code(code): req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE req_field.ShareholderID = SSE_ShareHolderID # 证券代码 req_field.SecurityID = code req_field.Direction = traderapi.TORA_TSTP_D_Sell req_field.VolumeTotalOriginal = count req_field.SInfo = sinfo ''' 上交所支持限价指令和最优五档剩撤、最优五档剩转限两种市价指令,对于科创板额外支持本方最优和对手方最优两种市价指令和盘后固定价格申报指令 深交所支持限价指令和立即成交剩余撤销、全额成交或撤销、本方最优、对手方最优和最优五档剩撤五种市价指令 限价指令和上交所科创板盘后固定价格申报指令需填写报单价格,其它市价指令无需填写报单价格 以下以上交所限价指令为例,其它指令参考开发指南相关说明填写OrderPriceType、TimeCondition和VolumeCondition三个字段: ''' print('卖 price', price, price_type) if price and price > 0: req_field.LimitPrice = price if price_type == 1: req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_AnyPrice elif price_type == 2: req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_LimitPrice elif price_type == 3: req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_BestPrice elif price_type == 4: req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_FixPrice elif price_type == 5: req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_FiveLevelPrice elif price_type == 6: req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_HomeBestPrice req_field.TimeCondition = traderapi.TORA_TSTP_TC_GFD req_field.VolumeCondition = traderapi.TORA_TSTP_VC_AV if order_ref: req_field.OrderRef = order_ref ''' OrderRef为报单引用,类型为整型,该字段报单时为选填 若不填写,则系统会为每笔报单自动分配一个报单引用 若填写,则需保证同一个TCP会话下报单引用严格单调递增,不要求连续递增,至少需从1开始编号 ''' # req_field.OrderRef = 1 ''' InvestorID为选填,若填写则需保证填写正确 Operway为委托方式,根据券商要求填写,无特殊说明置空即可 终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在报单回报和查询报单时返回给终端 ''' # req_field.SInfo = 'sinfo' # req_field.IInfo = 123 ''' 其它字段置空 ''' ret = api.ReqOrderInsert(req_field, self.req_id) if ret != 0: raise Exception('ReqOrderInsert fail, ret[%d]' % ret) return # 撤卖 def cancel_sell(self, code, order_sys_id, sinfo): if sinfo in self.__cancel_sell_sinfo_set: raise Exception(f'撤单请求已经提交:{sinfo}') self.__cancel_sell_sinfo_set.add(sinfo) self.req_id += 1 # 请求撤单 req_field = traderapi.CTORATstpInputOrderActionField() if tool.is_sz_code(code): req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE elif tool.is_sh_code(code): req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE req_field.ActionFlag = traderapi.TORA_TSTP_AF_Delete # 撤单支持以下两种方式定位原始报单: # (1)报单引用方式 # req_field.FrontID = self.__front_id # req_field.SessionID = self.__session_id # req_field.OrderRef = 1 # (2)系统报单编号方式 req_field.OrderSysID = order_sys_id # OrderActionRef报单操作引用,用法同报单引用,可根据需要选填 ''' 终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在查询撤单时返回给终端 ''' req_field.SInfo = sinfo # req_field.IInfo = 123 ''' 委托方式字段根据券商要求填写,无特殊说明置空即可 其它字段置空 ''' ret = api.ReqOrderAction(req_field, self.req_id) if ret != 0: raise Exception('ReqOrderAction fail, ret[%d]' % ret) return # 查询当日可撤销的委托 def list_delegate_orders(self, is_cancel): self.req_id += 1 req_id = self.req_id req_field = traderapi.CTORATstpQryOrderField() # 以下字段不填表示不设过滤条件,即查询所有报单 # req_field.SecurityID = '600000' req_field.InsertTimeStart = '09:15:00' req_field.InsertTimeEnd = '15:00:00' # IsCancel字段填1表示只查询可撤报单 if is_cancel: req_field.IsCancel = 1 # req_field.SInfo = sinfo ret = api.ReqQryOrder(req_field, req_id) if ret != 0: raise Exception('ReqQryOrder fail, ret[%d]' % ret) return req_id # 查询当日成交的订单 def list_traded_orders(self): self.req_id += 1 req_id = self.req_id req_field = traderapi.CTORATstpQryTradeField() ret = api.ReqQryTrade(req_field, req_id) if ret != 0: raise Exception('ReqQryTrade fail, ret[%d]' % ret) return req_id # 查询持仓 def list_positions(self): self.req_id += 1 req_id = self.req_id req_field = traderapi.CTORATstpQryPositionField() ret = api.ReqQryPosition(req_field, req_id) if ret != 0: raise Exception('ReqQryPosition fail, ret[%d]' % ret) return req_id # 查询资金账户 def get_money_account(self): self.req_id += 1 req_id = self.req_id req_field = traderapi.CTORATstpQryTradingAccountField() req_field.CurrencyID = traderapi.TORA_TSTP_CID_CNY ret = api.ReqQryTradingAccount(req_field, req_id) if ret != 0: raise Exception('ReqQryTradingAccount fail, ret[%d]' % ret) return req_id def login(self): # 请求登录 login_req = traderapi.CTORATstpReqUserLoginField() # 支持以用户代码、资金账号和股东账号方式登录 # (1)以用户代码方式登录 login_req.LogInAccount = UserID login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_UserID # (2)以资金账号方式登录 # login_req.DepartmentID = DepartmentID # login_req.LogInAccount = AccountID # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_AccountID # (3)以上海股东账号方式登录 # login_req.LogInAccount = SSE_ShareHolderID # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SHAStock # (4)以深圳股东账号方式登录 # login_req.LogInAccount = SZSE_ShareHolderID # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SZAStock # 支持以密码和指纹(移动设备)方式认证 # (1)密码认证 # 密码认证时AuthMode可不填 # login_req.AuthMode = traderapi.TORA_TSTP_AM_Password login_req.Password = Password # (2)指纹认证 # 非密码认证时AuthMode必填 # login_req.AuthMode = traderapi.TORA_TSTP_AM_FingerPrint # login_req.DeviceID = '03873902' # login_req.CertSerial = '9FAC09383D3920CAEFF039' # 终端信息采集 # UserProductInfo填写终端名称 login_req.UserProductInfo = 'jiabei' login_req.DynamicPassword = 'rxoB195F' # 按照监管要求填写终端信息 login_req.TerminalInfo = 'PC;IIP=123.112.154.118;IPORT=50361;LIP=192.168.118.107;MAC=54EE750B1713FCF8AE5CBD58;HD=TF655AY91GHRVL' # 以下内外网IP地址若不填则柜台系统自动采集,若填写则以终端填值为准报送 # login_req.MacAddress = '5C-87-9C-96-F3-E3' # login_req.InnerIPAddress = '10.0.1.102' # login_req.OuterIPAddress = '58.246.43.50' TradeSimpleApi.req_id += 1 ret = api.ReqUserLogin(login_req, TradeSimpleApi.req_id) if ret != 0: raise Exception('ReqUserLogin fail, ret[%d]' % ret) class TraderSpi(traderapi.CTORATstpTraderSpi): def __init__(self, callback): traderapi.CTORATstpTraderSpi.__init__(self) self.__front_id = 0 self.__session_id = 0 self.__data_callback = callback self.__temp_order_list_dict = {} self.__temp_position_list_dict = {} self.__temp_money_account_list_dict = {} self.call_back_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) def OnFrontConnected(self) -> "void": logger.info('OnFrontConnected') # 获取终端信息 TradeSimpleApi.req_id += 1 ret = api.ReqGetConnectionInfo(TradeSimpleApi.req_id) if ret != 0: logger.info('ReqGetConnectionInfo fail, ret[%d]' % ret) def OnFrontDisconnected(self, nReason: "int") -> "void": logger.info('OnFrontDisconnected: [%d]' % nReason) def OnRspGetConnectionInfo(self, pConnectionInfoField: "CTORATstpConnectionInfoField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": if pRspInfoField.ErrorID == 0: logger_trade.info('inner_ip_address[%s]' % pConnectionInfoField.InnerIPAddress) logger_trade.info('inner_port[%d]' % pConnectionInfoField.InnerPort) logger_trade.info('outer_ip_address[%s]' % pConnectionInfoField.OuterIPAddress) logger_trade.info('outer_port[%d]' % pConnectionInfoField.OuterPort) logger_trade.info('mac_address[%s]' % pConnectionInfoField.MacAddress) # 请求登录 login_req = traderapi.CTORATstpReqUserLoginField() # 支持以用户代码、资金账号和股东账号方式登录 # (1)以用户代码方式登录 login_req.LogInAccount = UserID login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_UserID # (2)以资金账号方式登录 # login_req.DepartmentID = DepartmentID # login_req.LogInAccount = AccountID # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_AccountID # (3)以上海股东账号方式登录 # login_req.LogInAccount = SSE_ShareHolderID # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SHAStock # (4)以深圳股东账号方式登录 # login_req.LogInAccount = SZSE_ShareHolderID # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SZAStock # 支持以密码和指纹(移动设备)方式认证 # (1)密码认证 # 密码认证时AuthMode可不填 # login_req.AuthMode = traderapi.TORA_TSTP_AM_Password login_req.Password = Password # (2)指纹认证 # 非密码认证时AuthMode必填 # login_req.AuthMode = traderapi.TORA_TSTP_AM_FingerPrint # login_req.DeviceID = '03873902' # login_req.CertSerial = '9FAC09383D3920CAEFF039' # 终端信息采集 # UserProductInfo填写终端名称 login_req.UserProductInfo = 'jiabei' login_req.DynamicPassword = 'rxoB195F' # 按照监管要求填写终端信息 # a0:36:9f:ea:fb:bc login_req.TerminalInfo = f'PC;IIP=NA;IPORT=NA;LIP={LOCAL_IP};MAC=A0369FEAFBBC;HD=00e3aeeed512b6782d0043b96480e04e;@jiabei' # 以下内外网IP地址若不填则柜台系统自动采集,若填写则以终端填值为准报送 # login_req.MacAddress = '5C-87-9C-96-F3-E3' # login_req.InnerIPAddress = '10.0.1.102' # login_req.OuterIPAddress = '58.246.43.50' TradeSimpleApi.req_id += 1 ret = api.ReqUserLogin(login_req, TradeSimpleApi.req_id) if ret != 0: logger_trade.info('ReqUserLogin fail, ret[%d]' % ret) else: logger_trade.info('GetConnectionInfo fail, [%d] [%d] [%s]!!!' % ( nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRspUserLogin(self, pRspUserLoginField: "CTORATstpRspUserLoginField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": if pRspInfoField.ErrorID == 0: logger_trade.info('Login success! [%d]' % nRequestID) self.__front_id = pRspUserLoginField.FrontID self.__session_id = pRspUserLoginField.SessionID TradeSimpleApi.set_login_info(self.__session_id, self.__front_id) if 1: # 查询股东账号 req_field = traderapi.CTORATstpQryShareholderAccountField() # 以下字段不填表示不设过滤条件,即查询所有股东账号 # req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE TradeSimpleApi.req_id += 1 ret = api.ReqQryShareholderAccount(req_field, TradeSimpleApi.req_id) if ret != 0: logger.info('ReqQryShareholderAccount fail, ret[%d]' % ret) if 0: TradeSimpleApi().list_traded_orders() if 1: TradeSimpleApi().get_money_account() else: logger_trade.info('Login fail!!! [%d] [%d] [%s]' % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRspUserPasswordUpdate(self, pUserPasswordUpdateField: "CTORATstpUserPasswordUpdateField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": if pRspInfoField.ErrorID == 0: logger.info('OnRspUserPasswordUpdate: OK! [%d]' % nRequestID) else: logger.info('OnRspUserPasswordUpdate: Error! [%d] [%d] [%s]' % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRspOrderInsert(self, pInputOrderField: "CTORATstpInputOrderField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": try: if pRspInfoField.ErrorID == 0: async_log_util.info(logger_local_huaxin_trade_debug, '[%d] OnRspOrderInsert: OK! [%d]' % (round(time.time() * 1000), nRequestID)) else: async_log_util.error(logger_local_huaxin_trade_debug, f"OnRspOrderInsert 报单出错:{pRspInfoField.ErrorID}-{pRspInfoField.ErrorMsg}") self.call_back_thread_pool.submit(self.__data_callback, TYPE_ORDER, nRequestID, {"sinfo": pInputOrderField.SInfo, "orderStatus": -1, "orderStatusMsg": pRspInfoField.ErrorMsg}) except: pass # 撤单响应 def OnRspOrderAction(self, pInputOrderActionField: "CTORATstpInputOrderActionField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": try: if pRspInfoField.ErrorID == 0: async_log_util.info(logger_local_huaxin_trade_debug, 'OnRspOrderAction: OK! [%d]' % nRequestID) self.call_back_thread_pool.submit(self.__data_callback, TYPE_CANCEL_ORDER, nRequestID, {"sinfo": pInputOrderActionField.SInfo, "orderSysID": pInputOrderActionField.OrderSysID, "cancel": 1}) else: async_log_util.info(logger_local_huaxin_trade_debug, 'OnRspOrderAction: Error! [%d] [%d] [%s]' % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) self.call_back_thread_pool.submit(self.__data_callback, TYPE_CANCEL_ORDER, nRequestID, {"sinfo": pInputOrderActionField.SInfo, "orderSysID": pInputOrderActionField.OrderSysID, "cancel": 0, "errorID": pRspInfoField.ErrorID, "errorMsg": pRspInfoField.ErrorMsg}) except: pass # 撤单错误回报 def OnErrRtnOrderAction(self, pInputOrderActionField: "CTORATstpInputOrderActionField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": try: if pInputOrderActionField and pRspInfoField: async_log_util.info(logger_local_huaxin_trade_debug, 'OnErrRtnOrderAction: Error! [%s] [%s] [%s] [%s]' % (nRequestID, pInputOrderActionField.OrderSysID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) except Exception as e: async_log_util.info(logger_local_huaxin_trade_debug, f"OnErrRtnOrderAction: 撤单出错-{str(e)}") def OnRspInquiryJZFund(self, pRspInquiryJZFundField: "CTORATstpRspInquiryJZFundField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": # try: # if pRspInfoField.ErrorID == 0: # logger.info('OnRspInquiryJZFund: OK! [%d] [%.2f] [%.2f]' # % (nRequestID, pRspInquiryJZFundField.UsefulMoney, pRspInquiryJZFundField.FetchLimit)) # else: # logger.info('OnRspInquiryJZFund: Error! [%d] [%d] [%s]' # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) # except: # pass pass def OnRspTransferFund(self, pInputTransferFundField: "CTORATstpInputTransferFundField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": # try: # if pRspInfoField.ErrorID == 0: # logger.info('OnRspTransferFund: OK! [%d]' % nRequestID) # else: # logger.info('OnRspTransferFund: Error! [%d] [%d] [%s]' # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) # except: # pass pass def OnRtnOrder(self, pOrderField: "CTORATstpOrderField") -> "void": try: async_log_util.info(logger_local_huaxin_trade_debug, '[%d] OnRtnOrder: SInfo[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] LimitPrice[%.2f] VolumeTotalOriginal[%d] OrderSysID[%s] OrderStatus[%s] InsertTime[%s]' % (round(time.time() * 1000), pOrderField.SInfo, pOrderField.InvestorID, pOrderField.SecurityID, pOrderField.OrderRef, pOrderField.OrderLocalID, pOrderField.LimitPrice, pOrderField.VolumeTotalOriginal, pOrderField.OrderSysID, pOrderField.OrderStatus, pOrderField.InsertTime)) if pOrderField.OrderStatus == traderapi.TORA_TSTP_OST_Unknown: pass # if queue_trade_w_l2_r is not None: # queue_trade_w_l2_r.put_nowait( # json.dumps({"type": "listen_volume", "data": {"code": pOrderField.SecurityID, # "volume": pOrderField.VolumeTotalOriginal}}).encode( # 'utf-8')) else: order_data = {"sinfo": pOrderField.SInfo, "securityID": pOrderField.SecurityID, "orderLocalID": pOrderField.OrderLocalID, "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID, "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate, "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime, "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID, "orderRef": pOrderField.OrderRef, "turnover": pOrderField.Turnover, "volume": pOrderField.VolumeTotalOriginal, "volumeTraded": pOrderField.VolumeTraded, "orderStatus": pOrderField.OrderStatus, "orderSubmitStatus": pOrderField.OrderSubmitStatus, "statusMsg": pOrderField.StatusMsg} self.call_back_thread_pool.submit(self.__data_callback, TYPE_ORDER, 0, order_data) except Exception as e: async_log_util.error(logger_local_huaxin_trade_debug, "OnRtnOrder 出错") except: async_log_util.error(logger_local_huaxin_trade_debug, "OnRtnOrder 出错") def OnRtnTrade(self, pTradeField: "CTORATstpTradeField") -> "void": try: async_log_util.info(logger_local_huaxin_trade_debug, 'OnRtnTrade: TradeID[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] Price[%.2f] Volume[%d]' % (pTradeField.TradeID, pTradeField.InvestorID, pTradeField.SecurityID, pTradeField.OrderRef, pTradeField.OrderLocalID, pTradeField.Price, pTradeField.Volume)) except: pass def OnRtnMarketStatus(self, pMarketStatusField: "CTORATstpMarketStatusField") -> "void": # TORA_TSTP_MKD_SHA(1): 上海A股 # TORA_TSTP_MKD_SZA(2): 深圳A股 # TORA_TSTP_MKD_BJMain(a):北京主板 # TORA_TSTP_MST_UnKnown( # ):未知 # TORA_TSTP_MST_BeforeTrading(0): 开盘前 # TORA_TSTP_MST_Continous(1): 连续交易 # TORA_TSTP_MST_Closed(2): 收盘 # TORA_TSTP_MST_OpenCallAuction(3): 开盘集合竞价 try: logger.info('OnRtnMarketStatus: MarketID[%s] MarketStatus[%s]' % (pMarketStatusField.MarketID, pMarketStatusField.MarketStatus)) except: pass def OnRspQrySecurity(self, pSecurityField: "CTORATstpSecurityField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int", bIsLast: "bool") -> "void": if bIsLast != 1: logger.info( 'OnRspQrySecurity[%d]: SecurityID[%s] SecurityName[%s] MarketID[%s] OrderUnit[%s] OpenDate[%s] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]' % (nRequestID, pSecurityField.SecurityID, pSecurityField.SecurityName, pSecurityField.MarketID, pSecurityField.OrderUnit, pSecurityField.OpenDate, pSecurityField.UpperLimitPrice, pSecurityField.LowerLimitPrice)) else: logger.info('查询合约结束[%d] ErrorID[%d] ErrorMsg[%s]' % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRspQryInvestor(self, pInvestorField: "CTORATstpInvestorField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int", bIsLast: "bool") -> "void": if bIsLast != 1: logger.info('OnRspQryInvestor[%d]: InvestorID[%s] InvestorName[%s] Operways[%s]' % (nRequestID, pInvestorField.InvestorID, pInvestorField.InvestorName, pInvestorField.Operways)) else: logger.info('查询投资者结束[%d] ErrorID[%d] ErrorMsg[%s]' % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRspQryShareholderAccount(self, pShareholderAccountField: "CTORATstpShareholderAccountField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int", bIsLast: "bool") -> "void": if bIsLast != 1: logger_local_huaxin_trade_debug.info( 'OnRspQryShareholderAccount[%d]: InvestorID[%s] ExchangeID[%s] ShareholderID[%s]' % (nRequestID, pShareholderAccountField.InvestorID, pShareholderAccountField.ExchangeID, pShareholderAccountField.ShareholderID)) else: logger.info('查询股东账户结束[%d] ErrorID[%d] ErrorMsg[%s]' % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRspQryTradingAccount(self, pTradingAccountField: "CTORATstpTradingAccountField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int", bIsLast: "bool") -> "void": try: if nRequestID not in self.__temp_money_account_list_dict: self.__temp_money_account_list_dict[nRequestID] = [] if bIsLast != 1: self.__temp_money_account_list_dict[nRequestID].append( {"departmentID": pTradingAccountField.DepartmentID, "investorID": pTradingAccountField.InvestorID, "accountID": pTradingAccountField.AccountID, "currencyID": pTradingAccountField.CurrencyID, "usefulMoney": round(pTradingAccountField.UsefulMoney, 2), "frozenCash": round(pTradingAccountField.FrozenCash, 2), "fetchLimit": round(pTradingAccountField.FetchLimit, 2), "preDeposit": round(pTradingAccountField.PreDeposit, 2), "commission": round(pTradingAccountField.Commission, 2) }) # logger.info( # 'OnRspQryTradingAccount[%d]: DepartmentID[%s] InvestorID[%s] AccountID[%s] CurrencyID[%s] UsefulMoney[%.2f] FetchLimit[%.2f]' # % (nRequestID, pTradingAccountField.DepartmentID, pTradingAccountField.InvestorID, # pTradingAccountField.AccountID, pTradingAccountField.CurrencyID, # pTradingAccountField.UsefulMoney, pTradingAccountField.FetchLimit)) else: results = self.__temp_money_account_list_dict.pop(nRequestID) self.call_back_thread_pool.submit(self.__data_callback, TYPE_LIST_MONEY, nRequestID, results) # logger.info('查询资金账号结束[%d] ErrorID[%d] ErrorMsg[%s]' # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) except: pass def OnRspQryOrder(self, pOrderField: "CTORATstpOrderField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int", bIsLast: "bool") -> "void": try: if nRequestID not in self.__temp_order_list_dict: self.__temp_order_list_dict[nRequestID] = [] if not bIsLast: # logger.info( # 'OnRspQryOrder[%d]: SecurityID[%s] OrderLocalID[%s] Direction[%s] OrderRef[%d] OrderSysID[%s] VolumeTraded[%d] OrderStatus[%s] OrderSubmitStatus[%s], StatusMsg[%s]' # % (nRequestID, pOrderField.SecurityID, pOrderField.OrderLocalID, pOrderField.Direction, # pOrderField.OrderRef, pOrderField.OrderSysID, # pOrderField.VolumeTraded, pOrderField.OrderStatus, pOrderField.OrderSubmitStatus, # pOrderField.StatusMsg)) self.__temp_order_list_dict[nRequestID].append( {"securityID": pOrderField.SecurityID, "orderLocalID": pOrderField.OrderLocalID, "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID, "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate, "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime, "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID, "turnover": pOrderField.Turnover, "orderRef": pOrderField.OrderRef, "volume": pOrderField.VolumeTotalOriginal, "volumeTraded": pOrderField.VolumeTraded, "orderStatus": pOrderField.OrderStatus, "orderSubmitStatus": pOrderField.OrderSubmitStatus, "statusMsg": pOrderField.StatusMsg}) else: # logger.info('查询报单结束[%d] ErrorID[%d] ErrorMsg[%s]' # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) self.call_back_thread_pool.submit(self.__data_callback, TYPE_LIST_DELEGATE, nRequestID, self.__temp_order_list_dict[nRequestID]) self.__temp_order_list_dict.pop(nRequestID) except: pass def OnRspQryPosition(self, pPositionField: "CTORATstpPositionField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int", bIsLast: "bool") -> "void": try: if nRequestID not in self.__temp_position_list_dict: self.__temp_position_list_dict[nRequestID] = [] if bIsLast != 1: self.__temp_position_list_dict[nRequestID].append( {"investorID": pPositionField.InvestorID, "tradingDay": pPositionField.TradingDay, "securityName": pPositionField.SecurityName, "securityID": pPositionField.SecurityID, "historyPos": pPositionField.HistoryPos, "historyPosFrozen": pPositionField.HistoryPosFrozen, "todayBSPos": pPositionField.TodayBSPos, "todayBSPosFrozen": pPositionField.TodayBSPosFrozen, "historyPosPrice": pPositionField.HistoryPosPrice, "totalPosCost": pPositionField.TotalPosCost, "prePosition": pPositionField.PrePosition, "availablePosition": pPositionField.AvailablePosition, "currentPosition": pPositionField.CurrentPosition, "openPosCost": pPositionField.OpenPosCost, "todayCommission": pPositionField.TodayCommission, "todayTotalBuyAmount": pPositionField.TodayTotalBuyAmount, "todayTotalSellAmount": pPositionField.TodayTotalSellAmount}) else: # logger.info('查询持仓结束[%d] ErrorID[%d] ErrorMsg[%s]' # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) self.call_back_thread_pool.submit(self.__data_callback, TYPE_LIST_POSITION, nRequestID, self.__temp_position_list_dict[nRequestID]) self.__temp_position_list_dict.pop(nRequestID) except: pass # 成交回报,参数pTradeField是一个CTORATstpTradeField类对象 def OnRtnTrade(self, pTradeField: "CTORATstpTradeField") -> "void": pass # logger.info("OnRtnTrade") # 查询成交响应,参数pTradeField是一个CTORATstpTradeField类对象 def OnRspQryTrade(self, pTradeField: "CTORATstpTradeField", pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int", bIsLast: "bool") -> "void": try: # logger.info("查询成交响应") pass if nRequestID not in self.__temp_order_list_dict: self.__temp_order_list_dict[nRequestID] = [] if not bIsLast: self.__temp_order_list_dict[nRequestID].append( {"tradeID": pTradeField.TradeID, "securityID": pTradeField.SecurityID, "orderLocalID": pTradeField.OrderLocalID, "direction": pTradeField.Direction, "orderSysID": pTradeField.OrderSysID, "price": pTradeField.Price, "tradeTime": pTradeField.TradeTime, "volume": pTradeField.Volume, "tradeDate": pTradeField.TradeDate, "tradingDay": pTradeField.TradingDay, "pbuID": pTradeField.PbuID, "accountID": pTradeField.AccountID}) else: self.call_back_thread_pool.submit(self.__data_callback, TYPE_LIST_TRADED, nRequestID, self.__temp_order_list_dict[nRequestID]) self.__temp_order_list_dict.pop(nRequestID) except: pass # 获取响应发送socket global req_rid_dict req_rid_dict = {} class MyTradeActionCallback(command_manager.TradeActionCallback): __tradeSimpleApi = TradeSimpleApi() trade_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30) def OnTrade(self, client_id, request_id, sk, type_, data): if type_ == 1: async_log_util.info(logger_local_huaxin_trade_debug, f"\n---------------------\n请求下单:client_id-{client_id} request_id-{request_id} data:{data}") # 下单 # 1-买 2-卖 direction = data["direction"] code = data["code"] sinfo = data["sinfo"] # 老版本下单 volume = data.get("volume") price = data.get("price") order_ref = data.get("order_ref") shadow_price = data.get("shadow_price") shadow_volume = data.get("shadow_volume") cancel_shadow = data.get("cancel_shadow") # 新版下单 order_info_list = data.get("order_info_list") blocking = data.get("blocking") if cancel_shadow is None: cancel_shadow = True if shadow_volume is None: shadow_volume = constant.SHADOW_ORDER_VOLUME if direction == 1: async_log_util.info(logger_trade, f"{code}华鑫本地开始下单") # 买 try: if blocking: if not order_info_list: req_rid_dict[sinfo] = (client_id, request_id, sk, order_ref) # threading.Thread(target=lambda: self.__tradeSimpleApi.buy(code, volume, price, sinfo, order_ref), # daemon=True).start() if not order_info_list: self.trade_thread_pool.submit(self.__tradeSimpleApi.buy, code, volume, price, sinfo, order_ref, shadow_price, cancel_shadow, shadow_volume) else: if order_info_list: for i in range(len(order_info_list)): order_info = order_info_list[i] order_info_list[i] = (order_info[0], order_info[1], order_info[2], f"{sinfo}_{i}") if blocking: for x in order_info_list: req_rid_dict[x[3]] = (client_id, request_id, sk, x[2]) self.trade_thread_pool.submit(self.__tradeSimpleApi.buy_new, code, order_info_list) async_log_util.info(logger_trade, f"{code}华鑫本地下单线程结束") except Exception as e: send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id, request_id) async_log_util.info(logger_local_huaxin_trade_debug, f"买入结束:code-{code} sinfo-{sinfo}") elif direction == 2: try: price_type = data["price_type"] if blocking: req_rid_dict[sinfo] = (client_id, request_id, sk) self.__tradeSimpleApi.sell(code, volume, price, price_type, sinfo, order_ref) print("sell", req_rid_dict) except Exception as e: logging.exception(e) send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id, request_id) elif type_ == 2: async_log_util.info(logger_local_huaxin_trade_debug, f"\n---------------------\n请求撤单:client_id-{client_id} request_id-{request_id} data-{data}") # 撤单 direction = data["direction"] code = data["code"] orderSysID = data.get("orderSysID") orderRef = data.get("orderRef") orderActionRef = data.get("orderActionRef") sinfo = data["sinfo"] if direction == 1: # 撤买 try: if not orderSysID and orderRef is None: raise Exception("没有找到系统订单号或者报单引用") req_rid_dict[sinfo] = (client_id, request_id, sk) self.trade_thread_pool.submit( lambda: self.__tradeSimpleApi.cancel_buy(code, sinfo, order_sys_id=orderSysID, order_ref=orderRef, order_action_ref=orderActionRef)) async_log_util.info(logger_local_huaxin_trade_debug, f"撤单结束:code-{code} order_sys_id-{orderSysID} sinfo-{sinfo}") except Exception as e: send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_CANCEL_ORDER, client_id, request_id) elif direction == 2: # 撤卖 try: req_rid_dict[sinfo] = (client_id, request_id, sk) self.__tradeSimpleApi.cancel_sell(code, orderSysID, sinfo) except Exception as e: send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_CANCEL_ORDER, client_id, request_id) def OnDealList(self, client_id, request_id, sk): async_log_util.info(logger_local_huaxin_trade_debug, f"请求成交列表:client_id-{client_id} request_id-{request_id}") try: # print("开始请求成交列表") req_id = self.__tradeSimpleApi.list_traded_orders() req_rid_dict[req_id] = (client_id, request_id, sk) except Exception as e: logging.exception(e) SendResponseSkManager.send_error_response("common", request_id, client_id, str(e)) def OnDelegateList(self, client_id, request_id, sk, is_cancel): async_log_util.info(logger_local_huaxin_trade_debug, f"请求委托列表:client_id-{client_id} request_id-{request_id}") try: req_id = self.__tradeSimpleApi.list_delegate_orders(is_cancel) req_rid_dict[req_id] = (client_id, request_id, sk) except Exception as e: send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id, request_id) def OnMoney(self, client_id, request_id, sk): async_log_util.info(logger_local_huaxin_trade_debug, f"请求账户:client_id-{client_id} request_id-{request_id}") try: req_id = self.__tradeSimpleApi.get_money_account() req_rid_dict[req_id] = (client_id, request_id, sk) except Exception as e: send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id, request_id) def OnPositionList(self, client_id, request_id, sk): async_log_util.info(logger_local_huaxin_trade_debug, f"请求持仓:client_id-{client_id} request_id-{request_id}") try: req_id = self.__tradeSimpleApi.list_positions() req_rid_dict[req_id] = (client_id, request_id, sk) except Exception as e: send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id, request_id) def OnTest(self, client_id, request_id, data, sk): send_response( json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, "request_id": request_id}), type, client_id, request_id, show_log=False, sk=sk) def __init_trade_data_server(): logger.info("初始化交易服务器") global api api = traderapi.CTORATstpTraderApi.CreateTstpTraderApi('./flow', False) # 创建回调对象 global spi spi = TraderSpi(traderapi_callback) # 注册回调接口 api.RegisterSpi(spi) # 注册多个交易前置服务地址,用逗号隔开 # api.RegisterFront('tcp://10.0.1.101:6500,tcp://10.0.1.101:26500') # 注册名字服务器地址,支持多服务地址逗号隔开 # api.RegisterNameServer('tcp://10.0.1.101:52370') # api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370') if 1: # 模拟环境,TCP 直连Front方式 # 注册单个交易前置服务地址 ##B类服务器## logger.info(f"注册交易地址:{FRONT_ADDRESS}/{FRONT_ADDRESS1}") api.RegisterFront(FRONT_ADDRESS) # 正式环境主地址 api.RegisterFront(FRONT_ADDRESS1) # 正式环境备用地址 ##A类服务器## # api.RegisterFront("tcp://10.224.123.143:6500") # 正式环境主地址 # api.RegisterFront("tcp://10.224.123.147:26500") # 正式环境备用地址 # TD_TCP_FrontAddress = "tcp://210.14.72.21:4400" # 仿真交易环境 # TD_TCP_FrontAddress = "tcp://210.14.72.15:4400" # 24小时环境A套 # TD_TCP_FrontAddress="tcp://210.14.72.16:9500" #24小时环境B套 # api.RegisterFront(TD_TCP_FrontAddress) # 注册多个交易前置服务地址,用逗号隔开 形如: api.RegisterFront("tcp://10.0.1.101:6500,tcp://10.0.1.101:26500") # print("TD_TCP_FensAddress[sim or 24H]::%s\n" % TD_TCP_FrontAddress) else: # 模拟环境,FENS名字服务器方式 TD_TCP_FensAddress = "tcp://210.14.72.21:42370"; # 模拟环境通用fens地址 '''******************************************************************************** * 注册 fens 地址前还需注册 fens 用户信息,包括环境编号、节点编号、Fens 用户代码等信息 * 使用名字服务器的好处是当券商系统部署方式发生调整时外围终端无需做任何前置地址修改 * *****************************************************************************''' fens_user_info_field = traderapi.CTORATstpFensUserInfoField() fens_user_info_field.FensEnvID = "stock" # 必填项,暂时固定为“stock”表示普通现货柜台 fens_user_info_field.FensNodeID = "sim" # 必填项,生产环境需按实际填写,仿真环境为sim fens_user_info_field.FensNodeID, = "24a" # 必填项,生产环境需按实际填写,24小时A套环境为24a # fens_user_info_field.FensNodeID="24b" #必填项,生产环境需按实际填写,24小时B套环境为24b api.RegisterFensUserInfo(fens_user_info_field) api.RegisterNameServer(TD_TCP_FensAddress) # 注册名字服务器地址,支持多服务地址逗号隔开 形如:api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370') print("TD_TCP_FensAddress[%s]::%s\n" % (fens_user_info_field.FensNodeID, TD_TCP_FensAddress)) # 订阅私有流 api.SubscribePrivateTopic(traderapi.TORA_TERT_QUICK) # 订阅公有流 api.SubscribePublicTopic(traderapi.TORA_TERT_QUICK) # 启动接口 api.Init() def __send_response(type, data_bytes): sk = SendResponseSkManager.create_send_response_sk() try: data_bytes = socket_util.load_header(data_bytes) sk.sendall(data_bytes) result, header_str = socket_util.recv_data(sk) result = json.loads(result) if result["code"] != 0: raise Exception(result['msg']) finally: sk.close() def send_response(data, type, _client_id, _request_id, show_log=True, sk=None): if show_log: async_log_util.debug(logger_local_huaxin_trade_debug, f"回调返回内容:{data}") if sk: # 采用的是socket通信 sk.sendall(socket_util.load_header(data.encode('utf-8'))) else: queue_strategy_trade_write.put_nowait(data) # 交易反馈回调 def __traderapi_callback(type, req_id, data): async_log_util.info(logger_local_huaxin_trade_debug, "回调:type-{} req_id-{}", type, req_id) key = req_id if type == TYPE_ORDER or type == TYPE_CANCEL_ORDER: key = data["sinfo"] try: if req_rid_dict and key in req_rid_dict: # TODO 处理批量下单 temp_params = req_rid_dict.pop(key) client_id, request_id = temp_params[0], temp_params[1] # 本地订单号-系统订单号映射 if len(temp_params) >= 4 and type == TYPE_ORDER: order_ref = temp_params[3] data["orderRef"] = order_ref async_log_util.info(logger_local_huaxin_trade_debug, "API回调 request_id-{}", request_id) # 测试 # send_response( # json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, # "request_id": request_id}), type, client_id, request_id, temp_params[2]) send_response( json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, "request_id": request_id}), type, client_id, request_id) async_log_util.info(logger_local_huaxin_trade_debug, "API回调结束 req_id-{} request_id-{}", req_id, request_id) else: async_log_util.info(logger_local_huaxin_trade_debug, "非API回调 req_id-{}", req_id) send_response( json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}), type, None, req_id) except Exception as e: logging.exception(e) # 采用异步回调 def traderapi_callback(type, req_id, data): __traderapi_callback(type, req_id, data) addr, port = constant.SERVER_IP, constant.SERVER_PORT def run(ipc_order_addr, ipc_cancel_order_addr, queue_strategy_trade_write_=None, queue_strategy_trade_read=None, queue_strategy_trade_read_for_read=None): """ 运行 @param ipc_order_addr: zmq下单命令ipc地址 @param ipc_cancel_order_addr: zmq撤单命令ipc地址 @param queue_strategy_trade_write_: @param queue_strategy_trade_read: @param queue_strategy_trade_read_for_read: @return: """ try: logger_system.info("交易进程ID:{}", os.getpid()) logger_system.info(f"trade 线程ID:{tool.get_thread_id()}") __init_trade_data_server() global queue_strategy_trade_write queue_strategy_trade_write = queue_strategy_trade_write_ # 运行日志同步 threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start() global tradeCommandManager tradeCommandManager = command_manager.TradeCommandManager() tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_trade_read, queue_strategy_trade_read_for_read) logger_system.info("华鑫交易服务启动") tradeCommandManager.run(ipc_order_addr, ipc_cancel_order_addr) except Exception as e: logger_system.exception(e) # 不需要运行命令解析 # tradeCommandManager.run() while True: time.sleep(2) if __name__ == "__main__": # 测试 # try: # ip_port = ("127.0.0.1", 10008) # server地址和端口号(最好是10000以后) # client = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) # 生成socket,连接server # client.connect(ip_port) # client.sendall("111111111111111111111111111111111111111".encode("utf-8")) # except Exception as e: # logging.exception(e) # print("远程服务器访问失败", str(e)) # # while True: # time.sleep(1) input()