| | |
| | | |
| | | # 买入的大单订单号 |
| | | |
| | | def __init__(self, api, l2_data_upload_manager: L2DataUploadManager): |
| | | def __init__(self, api, l2_data_upload_manager: L2DataUploadManager, processor_index): |
| | | lev2mdapi.CTORATstpLev2MdSpi.__init__(self) |
| | | self.__api = api |
| | | self.is_login = False |
| | | self.l2_data_upload_manager = l2_data_upload_manager |
| | | self.codes_volume_and_price_dict = {} |
| | | self.processor_index = processor_index |
| | | |
| | | def __split_codes(self, codes): |
| | | szse_codes = [] |
| | |
| | | |
| | | def __subscribe(self, _codes): |
| | | sh, sz = self.__split_codes(_codes) |
| | | logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}") |
| | | logger_local_huaxin_l2_subscript.info(f"订阅上证({self.processor_index}):{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"订阅深证({self.processor_index}):{sz}") |
| | | if sh: |
| | | if ENABLE_NGST: |
| | | result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh({self.processor_index}):{result}") |
| | | else: |
| | | # 订阅逐笔委托 |
| | | result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh({self.processor_index}):{result}") |
| | | # 订阅逐笔成交 |
| | | result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh({self.processor_index}):{result}") |
| | | |
| | | result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh({self.processor_index}):{result}") |
| | | if sz: |
| | | result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz({self.processor_index}):{result}") |
| | | result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz({self.processor_index}):{result}") |
| | | result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz({self.processor_index}):{result}") |
| | | |
| | | def __process_codes_data(self, codes_data, from_cache=False, delay=0.0): |
| | | |
| | |
| | | codes.add(code) |
| | | self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5]) |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5]) |
| | | logger_l2_codes_subscript.info("华鑫L2订阅总数:{}", len(codes)) |
| | | logger_l2_codes_subscript.info("华鑫L2订阅总数({}):{}", self.processor_index, len(codes)) |
| | | add_codes = codes - self.subscripted_codes |
| | | del_codes = self.subscripted_codes - codes |
| | | print("add del codes", add_codes, del_codes) |
| | |
| | | self.__unsubscribe(del_codes) |
| | | |
| | | if add_codes: |
| | | logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}") |
| | | logger_system.info(f"新增L2订阅代码数量({self.processor_index}) ({'缓存' if from_cache else ''}):{len(add_codes)}") |
| | | for c in add_codes: |
| | | logger_l2_codes_subscript.info(f"l2委托数据过滤条件:{c} - {self.codes_volume_and_price_dict.get(c)}") |
| | | logger_l2_codes_subscript.info( |
| | | f"l2委托数据过滤条件({self.processor_index}):{c} - {self.codes_volume_and_price_dict.get(c)}") |
| | | |
| | | logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes)) |
| | | logger_l2_codes_subscript.info("华鑫L2订阅结束({}),add-{} del-{}", self.processor_index, len(add_codes), |
| | | len(del_codes)) |
| | | |
| | | # 设置最近的代码列表 |
| | | self.latest_codes_set = codes |
| | |
| | | # 保存一份最新的数据 |
| | | self.__set_latest_datas(codes_data) |
| | | |
| | | @classmethod |
| | | def __set_latest_datas(cls, codes_data): |
| | | def __set_latest_datas(self, codes_data): |
| | | path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}" |
| | | data_str = json.dumps([tool.get_now_date_str(), codes_data]) |
| | | with open(constant.L2_CODES_INFO_PATH, mode='w') as f: |
| | | with open(path_str, mode='w') as f: |
| | | f.write(data_str) |
| | | |
| | | @classmethod |
| | | def __get_latest_datas(cls): |
| | | if os.path.exists(constant.L2_CODES_INFO_PATH): |
| | | with open(constant.L2_CODES_INFO_PATH, mode='r') as f: |
| | | def __get_latest_datas(self): |
| | | path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}" |
| | | if os.path.exists(path_str): |
| | | with open(path_str, mode='r') as f: |
| | | str_ = f.readline() |
| | | data_json = json.loads(str_) |
| | | if data_json[0] == tool.get_now_date_str(): |
| | |
| | | if pRspInfo['ErrorID'] == 0: |
| | | print("----L2行情登录成功----") |
| | | self.is_login = True |
| | | logger_system.info(f"L2行情登录成功") |
| | | logger_system.info(f"L2行情登录成功({self.processor_index})") |
| | | # 初始设置值 |
| | | if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0: |
| | | threading.Thread( |
| | |
| | | def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspSubOrderDetail", pRspInfo) |
| | | # try: |
| | | print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"], |
| | | print(f"订阅结果({self.processor_index}):", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], |
| | | pRspInfo["ErrorID"], |
| | | pRspInfo["ErrorMsg"]) |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, |
| | | f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | f"订阅结果({self.processor_index}):{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | |
| | | |
| | | def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, |
| | | f"NGTS订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | f"NGTS订阅结果({self.processor_index}):{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | |
| | | def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | try: |
| | | code = pSpecificSecurity['SecurityID'] |
| | | logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}") |
| | | logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅({self.processor_index}):{code}") |
| | | self.subscripted_codes.discard(code) |
| | | if bIsLast == 1: |
| | | print("取消订阅响应结束", self.subscripted_codes) |
| | |
| | | logging.exception(e) |
| | | |
| | | |
| | | def __init_l2(l2_data_upload_manager): |
| | | def __init_l2(l2_data_upload_manager, processor_index): |
| | | print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) |
| | | # case 1: Tcp方式 |
| | | # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP |
| | |
| | | # case 2非缓存模式 |
| | | # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) |
| | | global spi |
| | | spi = Lev2MdSpi(api, l2_data_upload_manager) |
| | | spi = Lev2MdSpi(api, l2_data_upload_manager, processor_index) |
| | | api.RegisterSpi(spi) |
| | | # -------------------正式模式------------------------------------- |
| | | if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: |
| | |
| | | time.sleep(10) |
| | | |
| | | |
| | | def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list) -> None: |
| | | def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list, |
| | | processor_index) -> None: |
| | | """ |
| | | 运行 |
| | | @param queue_r: |
| | | @param queue_data_callback: 低频数据回调队列 |
| | | @param channel_list: [((编号,multiprocessing.Array, zmq_address),(编号, multiprocessing.Array, zmq_address))] |
| | | @param processor_index:处理器索引 |
| | | @return: |
| | | """ |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | |
| | | threading.Thread(target=SubscriptDefend.run, daemon=True).start() |
| | | # 初始化 |
| | | data_channel_distribute_manager = CodeDataChannelDistributeManager(channel_list) |
| | | l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager) |
| | | __init_l2(l2_data_upload_manager) |
| | | l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager, queue_data_callback) |
| | | __init_l2(l2_data_upload_manager, processor_index) |
| | | l2_data_manager_v2.run_upload_common() |
| | | l2_data_manager_v2.run_log() |
| | | # TODO 测试 |