# -*- coding: utf-8 -*- import decimal import json import logging import multiprocessing import os import queue import time import concurrent.futures from huaxin_client import l1_subscript_codes_manager from huaxin_client import constant import lev2mdapi from huaxin_client.l2_data_manager import L2DataUploadManager from log_module.async_log_util import huaxin_l2_log from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, \ hx_logger_l2_market_data_before_open, hx_logger_l2_debug from utils import tool ###B类### Front_Address = "tcp://10.0.1.101:6900" Multicast_Address = "udp://224.224.2.19:7889" Multicast_Address2 = "udp://224.224.224.234:7890" Local_Interface_Address = constant.LOCAL_IP set_codes_data_queue = queue.Queue(maxsize=1000) market_code_dict = {} class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): latest_codes_set = set() special_code_volume_for_order_dict = {} # 已经订阅的代码 subscripted_codes = set() # 涨停代码 __limit_up_codes = set() # 买入的大单订单号 def __init__(self, api, l2_data_upload_manager: L2DataUploadManager): lev2mdapi.CTORATstpLev2MdSpi.__init__(self) self.__api = api self.is_login = False self.l2_data_upload_manager = l2_data_upload_manager def __split_codes(self, codes): szse_codes = [] sse_codes = [] for code in codes: market_type = tool.get_market_type(code) if market_type == tool.MARKET_TYPE_SSE: sse_codes.append(code.encode()) elif market_type == tool.MARKET_TYPE_SZSE: szse_codes.append(code.encode()) return sse_codes, szse_codes def __unsubscribe(self, _codes): sh, sz = self.__split_codes(_codes) logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}") logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}") if sh: # 取消订阅逐笔委托 self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) if sz: self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) def __subscribe(self, _codes): sh, sz = self.__split_codes(_codes) logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}") logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}") if sh: # 订阅逐笔委托 result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}") if sz: result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}") def __process_codes_data(self, codes): codes = set(codes) if not self.is_login and not constant.TEST: raise Exception("L2尚未登录") add_codes = codes - self.subscripted_codes del_codes = self.subscripted_codes - codes self.__subscribe(add_codes) self.__unsubscribe(del_codes) # 设置最近的代码列表 self.latest_codes_set = codes # 订阅代码,[代码,...] def set_codes_data(self, codes): try: self.__process_codes_data(codes) except Exception as e: logging.exception(e) logger_l2_codes_subscript.exception(e) finally: # 保存一份最新的数据 pass @classmethod def __set_latest_datas(cls, codes_data): data_str = json.dumps([tool.get_now_date_str(), codes_data]) with open(constant.L2_CODES_INFO_PATH, mode='w') as f: f.write(data_str) @classmethod def __get_latest_datas(cls): if os.path.exists(constant.L2_CODES_INFO_PATH): with open(constant.L2_CODES_INFO_PATH, mode='r') as f: str_ = f.readline() data_json = json.loads(str_) if data_json[0] == tool.get_now_date_str(): return data_json[1] return [] def OnFrontConnected(self): logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}") logout_req = lev2mdapi.CTORATstpUserLogoutField() self.__api.ReqUserLogout(logout_req, 1) time.sleep(1) # 请求登录 login_req = lev2mdapi.CTORATstpReqUserLoginField() self.__api.ReqUserLogin(login_req, 2) def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast): if pRspInfo['ErrorID'] == 0: self.is_login = True logger_system.info(f"L2行情登录成功") # 初始设置值 # threading.Thread( # target=lambda: self.__process_codes_data(self.__get_latest_datas()), # daemon=True).start() # 订阅L2 codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() codes = set() for code in codes_sh: codes.add(code.decode("utf-8")) for code in codes_sz: codes.add(code.decode("utf-8")) self.set_codes_data(codes) def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): # 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5 try: d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'], "preClosePrice": pDepthMarketData['PreClosePrice'], "lastPrice": pDepthMarketData['LastPrice'], "totalBidVolume": pDepthMarketData['TotalBidVolume'], "avgBidPrice": pDepthMarketData['AvgBidPrice'], "totalAskVolume": pDepthMarketData['TotalAskVolume'], "avgAskPrice": pDepthMarketData["AvgAskPrice"] # "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']), # (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']), # (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']), # (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']), # (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])], # "sell": [ # (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']), # (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']), # (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']), # (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), # (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) # ] } limit_up_count = len(self.__limit_up_codes) # 获取是否涨停价 limit_up_price = float( tool.to_price(decimal.Decimal(str(pDepthMarketData['PreClosePrice'])) * decimal.Decimal( tool.get_limit_up_rate(pDepthMarketData['SecurityID'])))) if abs(limit_up_price - pDepthMarketData['LastPrice']) < 0.001 or abs( limit_up_price - pDepthMarketData['BidPrice1']) < 0.001: huaxin_l2_log.info(hx_logger_l2_market_data_before_open, f"{d}") self.__limit_up_codes.add(pDepthMarketData['SecurityID']) else: self.__limit_up_codes.discard(pDepthMarketData['SecurityID']) if pDepthMarketData.SecurityID in self.__limit_up_codes: market_code_dict[pDepthMarketData.SecurityID] = ( pDepthMarketData.SecurityID, pDepthMarketData.BidPrice1, 0.1, pDepthMarketData.TotalBidVolume, time.time(), pDepthMarketData.BidPrice1, pDepthMarketData.BidVolume1, pDepthMarketData.BidPrice2, pDepthMarketData.BidVolume2, pDepthMarketData.UpdateTime, pDepthMarketData.PreClosePrice) else: if pDepthMarketData.SecurityID in market_code_dict: market_code_dict.pop(pDepthMarketData.SecurityID) if limit_up_count != len(self.__limit_up_codes): huaxin_l2_log.info(hx_logger_l2_market_data_before_open, f"涨停代码:{self.__limit_up_codes}") except: pass def __init_l2(l2_data_upload_manager): # print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) # case 1: Tcp方式 # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP # case 2: 组播方式 g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST # case 1缓存模式 api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True) # case 2非缓存模式 # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) global spi spi = Lev2MdSpi(api, l2_data_upload_manager) api.RegisterSpi(spi) # -------------------正式模式------------------------------------- if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: api.RegisterFront(Front_Address) else: # case 1 从一个组播地址收取行情 api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "") # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "") # case 2:注册多个组播地址同时收行情 # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, ""); # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, ""); # case 3:efvi模式收行情 # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True); # case 1 不绑核运行 api.Init() __l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3) pipe_strategy = None __latest_subscript_codes = set() def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas): if not tool.is_trade_time(): return # 上传数据 type_ = "set_target_codes" request_id = f"sb_{int(time.time() * 1000)}" fdata = json.dumps( {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)}) if queue_l1_w_strategy_r is not None: queue_l1_w_strategy_r.put_nowait(fdata) # 记录新增加的代码 codes = set([x[0] for x in datas]) add_codes = codes - __latest_subscript_codes __latest_subscript_codes.clear() for c in codes: __latest_subscript_codes.add(c) if add_codes: hx_logger_l2_market_data_before_open.info(f"({request_id})新增加订阅的代码:{add_codes}") def run(queue_l1_w_strategy_r) -> None: logger_system.info("L2进程ID:{}", os.getpid()) logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}") try: # log.close_print() # 初始化 # data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) # l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager) __init_l2(None) except Exception as e: logger_system.exception(e) while True: if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") >= 0: # 只读竞价数据 break # 只读9:20-9:25的数据 if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") < 0: continue try: # (代码,现价,涨幅,量,时间) list_ = [market_code_dict[k] for k in market_code_dict] flist = [] plist = [] for d in list_: if d[2] >= constant.L1_MIN_RATE: # 涨幅小于5%的需要删除 flist.append(d) flist.sort(key=lambda x: x[2], reverse=True) datas = flist[:1000] hx_logger_l2_debug.info(f"集合竞价涨停:{datas}") # 将持仓股加入进去 datas.extend(plist) __upload_codes_info(queue_l1_w_strategy_r, datas) except Exception as e: pass finally: time.sleep(2) if __name__ == "__main__": run(None)