# -*- coding: utf-8 -*- import json import logging import multiprocessing import os import queue import threading import time import concurrent.futures from huaxin_client import command_manager from huaxin_client import constant from huaxin_client import l2_data_manager import lev2mdapi from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager from huaxin_client.command_manager import L2ActionCallback from huaxin_client.l2_data_manager import L2DataUploadManager from log_module import log, async_log_util from log_module.async_log_util import huaxin_l2_log from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug from utils import tool ###B类### Front_Address = "tcp://10.0.1.101:6900" Multicast_Address = "udp://224.224.2.19:7889" Multicast_Address2 = "udp://224.224.224.234:7890" Local_Interface_Address = constant.LOCAL_IP ###A类### if constant.IS_A: Front_Address = "tcp://10.0.1.101:6900" Multicast_Address = "udp://224.224.22.3:8889" Multicast_Address2 = "udp://224.224.224.231:4889" Local_Interface_Address = "172.16.22.111" g_SubMarketData = False g_SubTransaction = False g_SubOrderDetail = False g_SubXTSTick = False g_SubXTSMarketData = False g_SubNGTSTick = False g_SubBondMarketData = False g_SubBondTransaction = False g_SubBondOrderDetail = False SH_Securities = [b"603000", b"600225", b"600469", b"600616", b"600059", b"002849", b"605188", b"603630", b"600105", b"603773", b"603915", b"603569", b"603322", b"603798", b"605198", b"603079", b"600415", b"600601"] SH_XTS_Securities = [b"018003", b"113565"] SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725", b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] SZ_Bond_Securities = [b"100303", b"109559", b"112617"] set_codes_data_queue = queue.Queue(maxsize=1000) ENABLE_NGST = True class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): latest_codes_set = set() special_code_volume_for_order_dict = {} # 已经订阅的代码 subscripted_codes = set() # 代码的上次成交的订单唯一索引 __last_transaction_keys_dict = {} # 买入的大单订单号 def __init__(self, api, l2_data_upload_manager: L2DataUploadManager): lev2mdapi.CTORATstpLev2MdSpi.__init__(self) self.__api = api self.is_login = False self.l2_data_upload_manager = l2_data_upload_manager self.codes_volume_and_price_dict = {} def __split_codes(self, codes): szse_codes = [] sse_codes = [] for code in codes: market_type = tool.get_market_type(code) if market_type == tool.MARKET_TYPE_SZSE: szse_codes.append(code.encode()) elif market_type == tool.MARKET_TYPE_SSE: sse_codes.append(code.encode()) return sse_codes, szse_codes # 新增订阅 # 取消订阅 def __unsubscribe(self, _codes): sh, sz = self.__split_codes(_codes) logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}") logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}") if sh: if ENABLE_NGST: result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}") else: # 取消订阅逐笔委托 self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) # 取消订阅逐笔成交 self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) if sz: self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) def subscribe_codes(self, _codes): self.__subscribe(_codes) def __subscribe(self, _codes): sh, sz = self.__split_codes(_codes) logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}") logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}") if sh: if ENABLE_NGST: result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}") else: # 订阅逐笔委托 result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}") # 订阅逐笔成交 result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}") result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}") if sz: result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz:{result}") result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}") result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}") def __process_codes_data(self, codes_data, from_cache=False, delay=0.0): if from_cache and self.codes_volume_and_price_dict: return if not self.is_login and not constant.TEST: raise Exception("L2尚未登录") if delay > 0: time.sleep(delay) codes = set() for d in codes_data: code = d[0] codes.add(code) self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5]) self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5]) logger_l2_codes_subscript.info("华鑫L2订阅总数:{}", len(codes)) add_codes = codes - self.subscripted_codes del_codes = self.subscripted_codes - codes print("add del codes", add_codes, del_codes) try: for c in del_codes: self.l2_data_upload_manager.release_distributed_upload_queue(c) l2_data_manager.del_target_code(c) for c in codes: self.l2_data_upload_manager.distribute_upload_queue(c, codes) l2_data_manager.add_target_code(c) except Exception as e: # TODO 清除原来还没释放掉的数据 logger_system.error(f"L2代码分配上传队列出错:{str(e)}") logger_system.exception(e) self.__subscribe(add_codes) self.__unsubscribe(del_codes) if add_codes: logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}") for c in add_codes: logger_l2_codes_subscript.info(f"l2委托数据过滤条件:{c} - {self.codes_volume_and_price_dict.get(c)}") logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes)) # 设置最近的代码列表 self.latest_codes_set = codes # 订阅代码,[(代码,最低手数,涨停价)] def set_codes_data(self, codes_data): try: self.__process_codes_data(codes_data) except Exception as e: logger_l2_codes_subscript.exception(e) finally: # 保存一份最新的数据 self.__set_latest_datas(codes_data) @classmethod def __set_latest_datas(cls, codes_data): data_str = json.dumps([tool.get_now_date_str(), codes_data]) with open(constant.L2_CODES_INFO_PATH, mode='w') as f: f.write(data_str) @classmethod def __get_latest_datas(cls): if os.path.exists(constant.L2_CODES_INFO_PATH): with open(constant.L2_CODES_INFO_PATH, mode='r') as f: str_ = f.readline() data_json = json.loads(str_) if data_json[0] == tool.get_now_date_str(): return data_json[1] return [] def OnFrontConnected(self): print("OnFrontConnected") logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}") logout_req = lev2mdapi.CTORATstpUserLogoutField() self.__api.ReqUserLogout(logout_req, 1) time.sleep(1) # 请求登录 login_req = lev2mdapi.CTORATstpReqUserLoginField() self.__api.ReqUserLogin(login_req, 2) def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast): print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % ( pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast)) if pRspInfo['ErrorID'] == 0: print("----L2行情登录成功----") self.is_login = True logger_system.info(f"L2行情登录成功") # 初始设置值 if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0: threading.Thread( target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=60), daemon=True).start() def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubMarketData") def OnRspSubIndex(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubIndex") def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubTransaction") def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubOrderDetail", pRspInfo) # try: print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"], pRspInfo["ErrorMsg"]) async_log_util.info(logger_local_huaxin_l2_subscript, f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") if pRspInfo["ErrorID"] == 0: print("订阅成功") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) # 初始化 SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) if bIsLast == 1: print("订阅响应结束", self.subscripted_codes) l2_data_manager.add_subscript_codes(self.subscripted_codes) def OnRspUnSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspUnSubOrderDetail", bIsLast) try: code = pSpecificSecurity['SecurityID'] self.subscripted_codes.discard(code) if bIsLast == 1: print("取消订阅响应结束", self.subscripted_codes) l2_data_manager.add_subscript_codes(self.subscripted_codes) except Exception as e: logging.exception(e) def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): async_log_util.info(logger_local_huaxin_l2_subscript, f"NGTS订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") if pRspInfo["ErrorID"] == 0: print("订阅成功") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) # 初始化 SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) if bIsLast == 1: print("订阅响应结束", self.subscripted_codes) l2_data_manager.add_subscript_codes(self.subscripted_codes) def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): try: code = pSpecificSecurity['SecurityID'] logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}") self.subscripted_codes.discard(code) if bIsLast == 1: print("取消订阅响应结束", self.subscripted_codes) l2_data_manager.add_subscript_codes(self.subscripted_codes) except Exception as e: logging.exception(e) def OnRspSubBondMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubBondMarketData") def OnRspSubBondTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubBondTransaction") def OnRspSubBondOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubBondOrderDetail") def OnRspSubXTSMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubXTSMarketData") def OnRspSubXTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubXTSTick") def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): # 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5 try: buys = [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']), (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']), (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']), (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']), (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])] for i in range(6, 11): if not pDepthMarketData[f"BidVolume{i}"]: break buys.append((pDepthMarketData[f'BidPrice{i}'], pDepthMarketData[f'BidVolume{i}'])) sells = [ (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']), (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']), (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']), (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) ] for i in range(6, 11): if not pDepthMarketData[f"AskVolume{i}"]: break sells.append((pDepthMarketData[f'AskPrice{i}'], pDepthMarketData[f'AskVolume{i}'])) d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'], "lastPrice": pDepthMarketData['LastPrice'], "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'], "totalValueTrade": pDepthMarketData['TotalValueTrade'], "totalAskVolume": pDepthMarketData['TotalAskVolume'], "avgAskPrice": pDepthMarketData["AvgAskPrice"], "buy": buys, "sell": sells} self.l2_data_upload_manager.add_market_data(d) SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID']) except: pass def OnRtnIndex(self, pIndex): # 输出指数行情数据 print( "OnRtnIndex SecurityID[%s] LastIndex[%.2f] LowIndex[%.2f] HighIndex[%.2f] TotalVolumeTraded[%d] Turnover[%.2f]" % ( pIndex['SecurityID'], pIndex['LastIndex'], pIndex['LowIndex'], pIndex['HighIndex'], pIndex['TotalVolumeTraded'], pIndex['Turnover'])) def OnRtnTransaction(self, pTransaction): code = str(pTransaction['SecurityID']) # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) # 输出逐笔成交数据 if pTransaction['ExecType'] == b"2": # 撤单 item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'], "Volume": pTransaction['TradeVolume'], "OrderType": "2", "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], "SubSeq": pTransaction['SubSeq'], "OrderStatus": "D"} buyNo = pTransaction['BuyNo'] sellNo = pTransaction['SellNo'] if buyNo > 0: # 买 item["OrderNO"] = buyNo item["Side"] = "1" elif sellNo > 0: # 卖 item["OrderNO"] = sellNo item["Side"] = "2" self.l2_data_upload_manager.add_l2_order_detail(item, 0, True) else: # if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201: # 涨停价 # 成交 item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], "TradeVolume": pTransaction['TradeVolume'], "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'], "ExecType": pTransaction['ExecType'].decode()} # 暂时注释掉同1单号至多上传1次 # key = f"{item['SecurityID']}_{item['TradePrice']}_{item['BuyNo']}" # if self.__last_transaction_keys_dict.get(code) == key: # return # self.__last_transaction_keys_dict[code] = key # print("逐笔成交", item) self.l2_data_upload_manager.add_transaction_detail(item) def OnRtnOrderDetail(self, pOrderDetail): # 上证OrderStatus=b"D"表示撤单 item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], "Volume": pOrderDetail['Volume'], "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(), "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], "OrderStatus": pOrderDetail['OrderStatus'].decode()} self.l2_data_upload_manager.add_l2_order_detail(item, 0) def OnRtnNGTSTick(self, pTick): """ 上证股票的逐笔委托与成交 @param pTick: @return: """ try: if pTick['TickType'] == b'T': # 成交 item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'], "TradeVolume": pTick['Volume'], "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'], "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'], "SellNo": pTick['SellNo'], "ExecType": '1'} self.l2_data_upload_manager.add_transaction_detail(item) elif pTick['TickType'] == b'A' or pTick['TickType'] == b'D': # 撤单 item = {"SecurityID": pTick['SecurityID'], "Price": pTick['Price'], "Volume": pTick['Volume'], "Side": pTick['Side'].decode(), "OrderType": pTick['TickType'].decode(), "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'], "SubSeq": pTick['SubSeq'], "OrderNO": '', "OrderStatus": pTick['TickType'].decode()} if pTick['Side'] == b'1': item['OrderNO'] = pTick['BuyNo'] elif pTick['Side'] == b'2': item['OrderNO'] = pTick['SellNo'] self.l2_data_upload_manager.add_l2_order_detail(item, 0) except Exception as e: logger_local_huaxin_l2_subscript.exception(e) def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): # 输出行情快照数据 print( "OnRtnBondMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % ( pDepthMarketData['SecurityID'], pDepthMarketData['LastPrice'], pDepthMarketData['TotalValueTrade'], pDepthMarketData['TotalValueTrade'], pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1'], pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1'])) # 输出一档价位买队列前50笔委托数量 for buy_index in range(0, FirstLevelBuyNum): print("first level buy [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index])) # 输出一档价位卖队列前50笔委托数量 for sell_index in range(0, FirstLevelSellNum): print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index])) def OnRtnBondTransaction(self, pTransaction): # 输出逐笔成交数据 print( "OnRtnBondTransaction SecurityID[%s] TradePrice[%.2f] TradeVolume[%d] TradeTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d] ExecType[%d]" % ( pTransaction['SecurityID'], pTransaction['TradePrice'], pTransaction['TradeVolume'], pTransaction['TradeTime'], pTransaction['MainSeq'], pTransaction['SubSeq'], pTransaction['BuyNo'], pTransaction['SellNo'], pTransaction['ExecType'], )) def OnRtnBondOrderDetail(self, pOrderDetail): # 输出逐笔委托数据 print( "OnRtnBondOrderDetail SecurityID[%s] Price[%.2f] Volume[%d] Side[%s] OrderType[%s] OrderTime[%d] MainSeq[%d] SubSeq[%d]" % ( pOrderDetail['SecurityID'], pOrderDetail['Price'], pOrderDetail['Volume'], pOrderDetail['Side'], pOrderDetail['OrderType'], pOrderDetail['OrderTime'], pOrderDetail['MainSeq'], pOrderDetail['SubSeq'])) def OnRtnXTSMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): # 输出行情快照数据 print( "OnRtnXTSMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % ( pDepthMarketData['SecurityID'], pDepthMarketData['LastPrice'], pDepthMarketData['TotalValueTrade'], pDepthMarketData['TotalValueTrade'], pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1'], pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1'])) # 输出一档价位买队列前50笔委托数量 for buy_index in range(0, FirstLevelBuyNum): print("first level buy [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index])) # 输出一档价位卖队列前50笔委托数量 for sell_index in range(0, FirstLevelSellNum): print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index])) class SubscriptDefend: """ 订阅守护 定义:当订阅的代码超过一定时间没有回调数据时重新订阅 """ __l2_market_update_time = {} @classmethod def set_l2_market_update(cls, code): cls.__l2_market_update_time[code] = time.time() @classmethod def run(cls): while True: try: now_time = tool.get_now_time_as_int() if now_time < int("093015"): continue if int("112945") < now_time < int("130015"): continue if int("145645") < now_time: continue if spi.subscripted_codes: codes = [] for code in spi.subscripted_codes: # 获取上次更新时间 update_time = cls.__l2_market_update_time.get(code) if update_time and time.time() - update_time > 15: # 需要重新订阅 codes.append(code) if codes: logger_debug.info(f"重新订阅:{codes}") spi.subscribe_codes(codes) except: pass finally: time.sleep(15) class MyL2ActionCallback(L2ActionCallback): def OnSetL2Position(self, codes_data): huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data)) try: spi.set_codes_data(codes_data) except Exception as e: logging.exception(e) def __init_l2(l2_data_upload_manager): print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) # case 1: Tcp方式 # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP # case 2: 组播方式 g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST # case 1缓存模式 api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True) # case 2非缓存模式 # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) global spi spi = Lev2MdSpi(api, l2_data_upload_manager) api.RegisterSpi(spi) # -------------------正式模式------------------------------------- if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: api.RegisterFront(Front_Address) else: # case 1 从一个组播地址收取行情 api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "") # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "") # case 2:注册多个组播地址同时收行情 # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, ""); # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, ""); # case 3:efvi模式收行情 # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True); # case 1 不绑核运行 api.Init() __l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3) def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue): logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}") while True: try: value = queue_trade_w_l2_r.get() if value: if type(value) == bytes: value = value.decode("utf-8") data = json.loads(value) _type = data["type"] if _type == "l2_cmd": __start_time = time.time() # 线程池 __l2_cmd_thread_pool.submit( lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)) use_time = time.time() - __start_time if use_time > 0.005: huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"l2_cmd耗时:{use_time}s") except Exception as e: logging.exception(e) pipe_strategy = None def test_add_codes(queue_r): time.sleep(10) # if value: # if type(value) == bytes: # value = value.decode("utf-8") # data = json.loads(value) # _type = data["type"] # if _type == "listen_volume": # volume = data["data"]["volume"] # code = data["data"]["code"] # spi.set_code_special_watch_volume(code, volume) # elif _type == "l2_cmd": # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35, 6.00, 200), ("002654", int(50 * 10000 / 15.59), 15.59, 15.3, 200), ("603701", int(50 * 10000 / 14.28), 14.28, 14.00, 200), ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)] queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) time.sleep(10) while True: try: spi.l2_data_upload_manager.add_l2_order_detail( {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', 'OrderTime': '13000015', 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) spi.l2_data_upload_manager.add_l2_order_detail( {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', 'OrderTime': '13000015', 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) time.sleep(0.1) spi.l2_data_upload_manager.add_l2_order_detail( {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', 'OrderTime': '13000015', 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) except Exception as e: logging.exception(e) finally: time.sleep(10) def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None: logger_system.info("L2进程ID:{}", os.getpid()) logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}") try: log.close_print() if queue_r is not None: t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) t1.start() # 订阅守护 threading.Thread(target=SubscriptDefend.run, daemon=True).start() # 初始化 data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager) __init_l2(l2_data_upload_manager) l2_data_manager.run_upload_common() l2_data_manager.run_log() # TODO 测试 # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start() global l2CommandManager l2CommandManager = command_manager.L2CommandManager() l2CommandManager.init(MyL2ActionCallback()) logger_system.info("L2订阅服务启动成功") except Exception as e: logger_system.exception(e) while True: time.sleep(2) def test(): def test_add_codes(): time.sleep(5) # if value: # if type(value) == bytes: # value = value.decode("utf-8") # data = json.loads(value) # _type = data["type"] # if _type == "listen_volume": # volume = data["data"]["volume"] # code = data["data"]["code"] # spi.set_code_special_watch_volume(code, volume) # elif _type == "l2_cmd": # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59), ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)] queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) time.sleep(1) spi.l2_data_upload_manager.add_l2_order_detail( {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', 'OrderTime': '13000015', 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) spi.l2_data_upload_manager.add_l2_order_detail( {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', 'OrderTime': '13000015', 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) time.sleep(0.1) spi.l2_data_upload_manager.add_l2_order_detail( {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', 'OrderTime': '13000015', 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) queue_r = multiprocessing.Queue(maxsize=1024) order_queues = [] transaction_queues = [] market_queue = multiprocessing.Queue(maxsize=1024) for i in range(20): order_queues.append(multiprocessing.Queue(maxsize=1024)) transaction_queues.append(multiprocessing.Queue(maxsize=1024)) threading.Thread(target=test_add_codes).start() run(queue_r, order_queues, transaction_queues, market_queue) if __name__ == "__main__": input()