# -*- coding: utf-8 -*- import logging import multiprocessing import queue import time import lev2mdapi from l2.huaxin import l2_huaxin_util from log_module import log from log_module.log import logger_local_huaxin_l2_subscript, logger_system from utils import tool IS_TEST = False ###B类### Front_Address = "tcp://10.0.1.101:6900" Multicast_Address = "udp://224.224.2.19:7889" Multicast_Address2 = "udp://224.224.224.234:7890" Local_Interface_Address = "192.168.84.126" ###测试地址### if IS_TEST: Front_Address = "tcp://210.14.72.17:16900" Multicast_Address = "udp://224.224.2.19:7889" Multicast_Address2 = "udp://224.224.224.234:7890" Local_Interface_Address = "192.168.84.126" g_SubMarketData = False g_SubTransaction = False g_SubOrderDetail = False g_SubXTSTick = False g_SubXTSMarketData = False g_SubNGTSTick = False g_SubBondMarketData = False g_SubBondTransaction = False g_SubBondOrderDetail = False set_codes_data_queue = queue.Queue(maxsize=10240) market_code_dict = {} ENABLE_NGST = True class L2TransactionDataManager: def __init__(self, code, accurate_buy=False): """ @param code: @param accurate_buy: 是否需要精确的买单信息 """ self.code = code self.__latest_buy_order = None self.__big_buy_orders = [] # 精确的买单信息,{买单号:订单信息} self.__big_accurate_buy_order_dict = {} self.__big_accurate_sell_order_dict = {} self.__latest_sell_order = None self.__big_sell_orders = [] self.big_accurate_buy_order_queue = queue.Queue(maxsize=10240) self.big_accurate_sell_order_queue = queue.Queue(maxsize=10240) self.big_buy_order_queue = queue.Queue(maxsize=10240) self.big_sell_order_queue = queue.Queue(maxsize=10240) self.accurate_buy = accurate_buy self.__last_accurate_buy_count = 0 self.__last_accurate_sell_count = 0 def get_big_buy_orders(self): return self.__big_buy_orders def get_big_sell_orders(self): return self.__big_sell_orders def add_transaction_data_for_accurate(self, item, big_order_money_threshold=299e4): """ 获取精确的买单信息 @param big_order_money_threshold: 大单阈值 @param data: @return: """ def format_timestamp(timestamp): time_str = str(timestamp) return int(time_str[:5] if time_str[0] == '9' else time_str[:6]) money = round(item[2] * item[3]) volume = item[3] price = item[2] order_time = item[4] if item[0] not in self.__big_accurate_buy_order_dict: # (买单号, 量, 金额, 时间, 最新成交价格, 开始成交时间, 开始成交价格) self.__big_accurate_buy_order_dict[item[0]] = [item[0], 0, 0, order_time, price, order_time, price] buy_order_info = self.__big_accurate_buy_order_dict[item[0]] buy_order_info[1] += volume buy_order_info[2] += money buy_order_info[3] = order_time buy_order_info[4] = price # 将大单写入本地文件 if self.__latest_buy_order and self.__latest_buy_order[0] != item[0]: # 有可能是大单成交完成, 判断上个订单是否是大单 last_buy_order = self.__big_accurate_buy_order_dict.get(self.__latest_buy_order[0]) if last_buy_order[2] > big_order_money_threshold: self.big_accurate_buy_order_queue.put_nowait(last_buy_order) # 如果数据过多需要移除过长时间的小金额数据 accurate_buy_count = len(self.__big_accurate_buy_order_dict.keys()) if accurate_buy_count > 10000 and accurate_buy_count - self.__last_accurate_buy_count > 2000: # 超过1w条数据且新增2000条数据 # 超过1w条数据就要移除30分钟之前的数据 now_time_int = int( tool.trade_time_add_second(l2_huaxin_util.convert_time(order_time), -3600).replace(":", "")) try: remove_order_nos = [x for x in self.__big_accurate_buy_order_dict if format_timestamp(self.__big_accurate_buy_order_dict[x][3]) < now_time_int] if remove_order_nos: for order_no in remove_order_nos: self.__big_accurate_buy_order_dict.pop(order_no) finally: self.__last_accurate_buy_count = len(self.__big_accurate_buy_order_dict) # 统计卖单 if item[1] not in self.__big_accurate_sell_order_dict: # (卖单号, 量, 金额, 时间, 最新成交价格, 开始成交时间, 开始成交价格) self.__big_accurate_sell_order_dict[item[1]] = [item[1], 0, 0, order_time, price, order_time, price] sell_order_info = self.__big_accurate_sell_order_dict[item[1]] sell_order_info[1] += volume sell_order_info[2] += money sell_order_info[3] = order_time sell_order_info[4] = price if self.__latest_sell_order and self.__latest_sell_order[0] != item[1]: # 有可能是大单成交完成, 判断上个订单是否是大单 last_sell_order = self.__big_accurate_sell_order_dict.get(self.__latest_sell_order[0]) if last_sell_order[2] > big_order_money_threshold: self.big_accurate_sell_order_queue.put_nowait(last_sell_order) # 如果数据过多需要移除过长时间的小金额数据 accurate_sell_count = len(self.__big_accurate_sell_order_dict.keys()) if accurate_sell_count > 10000 and accurate_sell_count - self.__last_accurate_sell_count > 2000: # 超过1w条数据且新增2000条数据 # 超过1w条数据就要移除30分钟之前的数据 now_time_int = int( tool.trade_time_add_second(l2_huaxin_util.convert_time(order_time), -3600).replace(":", "")) try: remove_order_nos = [x for x in self.__big_accurate_sell_order_dict if now_time_int > format_timestamp( self.__big_accurate_sell_order_dict[x][3])] if remove_order_nos: for order_no in remove_order_nos: self.__big_accurate_sell_order_dict.pop(order_no) finally: self.__last_accurate_sell_count = len(self.__big_accurate_sell_order_dict.keys()) def add_transaction_data(self, data, big_order_money_threshold=299e4): item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"], data["OrderTime"]) # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], # "TradeVolume": pTransaction['TradeVolume'], # "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], # "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], # "SellNo": pTransaction['SellNo'], # "ExecType": pTransaction['ExecType'].decode()} money = round(item[2] * item[3]) volume = item[3] price = item[2] order_time = item[4] if self.accurate_buy: self.add_transaction_data_for_accurate(item, big_order_money_threshold=100e4) if not self.__latest_buy_order: # (买单号, 量, 金额, 时间, 最新成交价格) self.__latest_buy_order = [item[0], 0, 0, order_time, price] if self.__latest_buy_order[0] == item[0]: self.__latest_buy_order[1] += volume self.__latest_buy_order[2] += money self.__latest_buy_order[3] = order_time self.__latest_buy_order[4] = price else: if self.__latest_buy_order[2] > 1e6: d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2], self.__latest_buy_order[3], self.__latest_buy_order[4]) self.__big_buy_orders.append(d) self.big_buy_order_queue.put_nowait(d) self.__latest_buy_order = [item[0], volume, money, order_time, price] if not self.__latest_sell_order: self.__latest_sell_order = [item[1], 0, 0, order_time, price] if self.__latest_sell_order[0] == item[1]: self.__latest_sell_order[1] += volume self.__latest_sell_order[2] += money self.__latest_sell_order[3] = order_time self.__latest_sell_order[4] = price else: if self.__latest_sell_order[2] > 1e6: d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2], self.__latest_sell_order[3], self.__latest_sell_order[4]) self.__big_sell_orders.append(d) self.big_sell_order_queue.put_nowait(d) self.__latest_sell_order = [item[1], volume, money, order_time, price] # 买入的大单订单号 l2_transaction_data_dict = {} class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): latest_codes_set = set() special_code_volume_for_order_dict = {} # 已经订阅的代码 subscripted_codes = set() # 代码的上次成交的订单唯一索引 __last_transaction_keys_dict = {} def __init__(self, api, codes, special_codes): lev2mdapi.CTORATstpLev2MdSpi.__init__(self) self.__api = api self.is_login = False self.codes = codes self.codes_volume_and_price_dict = {} self.special_codes = special_codes def __split_codes(self, codes): szse_codes = [] sse_codes = [] for code in codes: market_type = tool.get_market_type(code) if market_type == tool.MARKET_TYPE_SZSE: szse_codes.append(code.encode()) elif market_type == tool.MARKET_TYPE_SSE: sse_codes.append(code.encode()) return sse_codes, szse_codes # 新增订阅 # 取消订阅 def __unsubscribe(self, _codes): sh, sz = self.__split_codes(_codes) logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}") logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}") if sh: if ENABLE_NGST: result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}") else: # 取消订阅逐笔成交 self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) if sz: self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) def __subscribe(self, _codes): sh, sz = self.__split_codes(_codes) logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}") logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}") if sh: if ENABLE_NGST: result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}") else: # 订阅逐笔成交 result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}") if sz: result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}") def OnFrontConnected(self): print("OnFrontConnected") logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}") logout_req = lev2mdapi.CTORATstpUserLogoutField() self.__api.ReqUserLogout(logout_req, 1) time.sleep(1) # 请求登录 login_req = lev2mdapi.CTORATstpReqUserLoginField() self.__api.ReqUserLogin(login_req, 2) def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast): print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % ( pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast)) if pRspInfo['ErrorID'] == 0: print("----L2行情登录成功----") self.is_login = True logger_system.info(f"L2行情登录成功") self.__subscribe(self.codes) def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubMarketData") def OnRspSubIndex(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubIndex") def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubTransaction") if pRspInfo["ErrorID"] == 0: print("订阅成功") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) if bIsLast == 1: print("订阅响应结束", self.subscripted_codes) def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): if pRspInfo["ErrorID"] == 0: print("订阅成功") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) if bIsLast == 1: print("订阅响应结束", self.subscripted_codes) def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): try: code = pSpecificSecurity['SecurityID'] logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}") self.subscripted_codes.discard(code) if bIsLast == 1: print("取消订阅响应结束", self.subscripted_codes) except Exception as e: logging.exception(e) def OnRtnTransaction(self, pTransaction): code = str(pTransaction['SecurityID']) # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) # 输出逐笔成交数据 if pTransaction['ExecType'] == b"2": pass else: item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], "TradeVolume": pTransaction['TradeVolume'], "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'], "ExecType": pTransaction['ExecType'].decode()} if item["SecurityID"] not in l2_transaction_data_dict: l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], item[ "SecurityID"] in self.special_codes) l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item) def OnRtnNGTSTick(self, pTick): """ 上证股票的逐笔委托与成交 @param pTick: @return: """ try: if pTick['TickType'] == b'T': # 成交 item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'], "TradeVolume": pTick['Volume'], "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'], "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'], "SellNo": pTick['SellNo'], "ExecType": '1'} if item["SecurityID"] not in l2_transaction_data_dict: l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], item[ "SecurityID"] in self.special_codes) l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item) except Exception as e: logger_local_huaxin_l2_subscript.exception(e) def __init_l2(codes, special_codes): print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) # case 1: Tcp方式 # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP # case 2: 组播方式 g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST if IS_TEST: g_SubMode = lev2mdapi.TORA_TSTP_MST_TCP # case 1缓存模式 api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True) # case 2非缓存模式 # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) global spi spi = Lev2MdSpi(api, codes, special_codes) api.RegisterSpi(spi) # -------------------正式模式------------------------------------- if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: api.RegisterFront(Front_Address) else: # case 1 从一个组播地址收取行情 api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "") # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "") # case 2:注册多个组播地址同时收行情 # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, ""); # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, ""); # case 3:efvi模式收行情 # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True); # case 1 不绑核运行 api.Init() def run(codes, _queue: multiprocessing.Queue, accurate_buy_order_queue: multiprocessing.Queue, special_codes) -> None: """ 运行订阅 @param accurate_buy_order_queue: 精确大单队列 @param codes: 订阅的代码 @param _queue: 数据传输的队列 @param special_codes: 需要确定完整大单的代码 @return: """ try: log.close_print() __init_l2(codes, special_codes) logger_system.info(f"L2订阅服务启动成功:") except Exception as e: logger_system.exception(e) while True: try: # 读取一遍 for code in l2_transaction_data_dict: l2_transaction_data_manager: L2TransactionDataManager = l2_transaction_data_dict[code] while True: if not l2_transaction_data_manager.big_buy_order_queue.empty(): result = l2_transaction_data_manager.big_buy_order_queue.get(block=False) if result: _queue.put_nowait((code, 0, result)) else: break while True: if not l2_transaction_data_manager.big_accurate_buy_order_queue.empty(): result = l2_transaction_data_manager.big_accurate_buy_order_queue.get(block=False) if result: accurate_buy_order_queue.put_nowait((code, 0, result)) else: break while True: if not l2_transaction_data_manager.big_accurate_sell_order_queue.empty(): result = l2_transaction_data_manager.big_accurate_sell_order_queue.get(block=False) if result: accurate_buy_order_queue.put_nowait((code, 1, result)) else: break while True: if not l2_transaction_data_manager.big_sell_order_queue.empty(): result = l2_transaction_data_manager.big_sell_order_queue.get(block=False) if result: _queue.put_nowait((code, 1, result)) else: break except: pass finally: time.sleep(1) if __name__ == "__main__": pass