#!/usr/bin/python # -*- coding: UTF-8 -*- import json import logging import queue import threading import time import command_manager import constant import l2_data_manager import lev2mdapi from command_manager import L2ActionCallback from mylog import logger_l2_orderdetail, logger_l2_transaction, logger_l2_subscript from trade_client import SendResponseSkManager Front_Address = "tcp://10.0.1.101:6900" Multicast_Address = "udp://224.224.2.19:7889" Multicast_Address2 = "udp://224.224.224.234:7890" Local_Interface_Address = "192.168.84.75" Sender_Interface_Address = "10.0.1.101" g_SubMarketData = False g_SubTransaction = False g_SubOrderDetail = False g_SubXTSTick = False g_SubXTSMarketData = False g_SubNGTSTick = False g_SubBondMarketData = False g_SubBondTransaction = False g_SubBondOrderDetail = False SH_Securities = [b"603000", b"600225", b"600469", b"600616", b"600059", b"002849", b"605188", b"603630", b"600105", b"603773", b"603915", b"603569", b"603322", b"603798", b"605198", b"603079", b"600415", b"600601"] SH_XTS_Securities = [b"018003", b"113565"] SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725", b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] SZ_Bond_Securities = [b"100303", b"109559", b"112617"] spi = None set_codes_data_queue = queue.Queue() market_code_dict = {} class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): latest_codes_set = set() codes_volume_and_price_dict = {} special_code_volume_for_order_dict = {} # 已经订阅的代码 subscripted_codes = set() def __init__(self, api): lev2mdapi.CTORATstpLev2MdSpi.__init__(self) self.__api = api self.is_login = False def __split_codes(self, codes): szse_codes = [] sse_codes = [] for code in codes: if code.find("00") == 0: szse_codes.append(code.encode()) elif code.find("60") == 0: sse_codes.append(code.encode()) return sse_codes, szse_codes # 新增订阅 # 取消订阅 def __unsubscribe(self, _codes): sh, sz = self.__split_codes(_codes) logger_l2_subscript.info(f"取消订阅上证:{sh}") logger_l2_subscript.info(f"取消订阅深证:{sz}") if sh: # 取消订阅逐笔委托 self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) # 取消订阅逐笔成交 self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) if sz: self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) def __subscribe(self, _codes): sh, sz = self.__split_codes(_codes) logger_l2_subscript.info(f"订阅上证:{sh}") logger_l2_subscript.info(f"订阅深证:{sz}") if sh: # 订阅逐笔委托 result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_l2_subscript.info(f"逐笔委托订阅结果sh:{result}") # 订阅逐笔成交 result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_l2_subscript.info(f"逐笔成交订阅结果sh:{result}") result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_l2_subscript.info(f"市场订阅结果sh:{result}") if sz: result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) logger_l2_subscript.info(f"逐笔委托订阅结果sz:{result}") result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) logger_l2_subscript.info(f"逐笔成交订阅结果sz:{result}") result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) logger_l2_subscript.info(f"市场订阅结果sz:{result}") def __process_codes_data(self, codes_data): if not self.is_login and not constant.TEST: raise Exception("L2尚未登录") codes = set() for d in codes_data: code = d[0] codes.add(code) self.codes_volume_and_price_dict[code] = (d[1], d[2]) add_codes = codes - self.subscripted_codes del_codes = self.subscripted_codes - codes print("add del codes", add_codes, del_codes) for c in codes: l2_data_manager.target_codes.add(c) for c in del_codes: l2_data_manager.target_codes.discard(c) for c in add_codes: l2_data_manager.run_upload_task(c) self.__subscribe(add_codes) self.__unsubscribe(del_codes) # 设置最近的代码列表 self.latest_codes_set = codes # 订阅代码,[(代码,最低手数,涨停价)] def set_codes_data(self, codes_data): self.__process_codes_data(codes_data) def set_code_special_watch_volume(self, code, volume): # 有效期为3s self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3) def OnFrontConnected(self): print("OnFrontConnected") logout_req = lev2mdapi.CTORATstpUserLogoutField() self.__api.ReqUserLogout(logout_req, 1) time.sleep(1) # 请求登录 login_req = lev2mdapi.CTORATstpReqUserLoginField() self.__api.ReqUserLogin(login_req, 2) def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast): print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % ( pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast)) if pRspInfo['ErrorID'] == 0: print("----L2行情登录成功----") self.is_login = True # t1 = threading.Thread(target=lambda: self.__set_codes_data(), daemon=True) # # 后台运行 # t1.start() def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubMarketData") def OnRspSubIndex(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubIndex") def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubTransaction") def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubOrderDetail") # try: print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"], pRspInfo["ErrorMsg"]) logger_l2_subscript.info( f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") if pRspInfo["ErrorID"] == 0: print("订阅成功") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) if bIsLast == 1: t1 = threading.Thread(target=lambda: l2_data_manager.add_subscript_codes(self.subscripted_codes), daemon=True) # 后台运行 t1.start() def OnRspUnSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspUnSubOrderDetail") try: code = pSpecificSecurity['SecurityID'] self.subscripted_codes.discard(code) if bIsLast == 1: t1 = threading.Thread(target=lambda: l2_data_manager.add_subscript_codes(self.subscripted_codes), daemon=True) # 后台运行 t1.start() except Exception as e: logging.exception(e) def OnRspSubBondMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubBondMarketData") def OnRspSubBondTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubBondTransaction") def OnRspSubBondOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubBondOrderDetail") def OnRspSubXTSMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubXTSMarketData") def OnRspSubXTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubXTSTick") # 4.0.5版本接口 def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubNGTSTick") def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): # 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5 d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'], "lastPrice": pDepthMarketData['LastPrice'], "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'], "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']), (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']), (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']), (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']), (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])], "sell": [ (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']), (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']), (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']), (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) ]} market_code_dict[pDepthMarketData['SecurityID']] = time.time() t1 = threading.Thread(target=lambda: l2_data_manager.add_market_data(d), daemon=True) t1.start() # 输出行情快照数据 # print( # "OnRtnMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % ( # pDepthMarketData['SecurityID'], # pDepthMarketData['LastPrice'], # pDepthMarketData['TotalValueTrade'], # pDepthMarketData['TotalValueTrade'], # pDepthMarketData['BidPrice1'], # pDepthMarketData['BidVolume1'], # pDepthMarketData['AskPrice1'], # pDepthMarketData['AskVolume1'])) # # 输出一档价位买队列前50笔委托数量 # for buy_index in range(0, FirstLevelBuyNum): # print("first level buy [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index])) # # # 输出一档价位卖队列前50笔委托数量 # for sell_index in range(0, FirstLevelSellNum): # print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index])) def OnRtnIndex(self, pIndex): # 输出指数行情数据 print( "OnRtnIndex SecurityID[%s] LastIndex[%.2f] LowIndex[%.2f] HighIndex[%.2f] TotalVolumeTraded[%d] Turnover[%.2f]" % ( pIndex['SecurityID'], pIndex['LastIndex'], pIndex['LowIndex'], pIndex['HighIndex'], pIndex['TotalVolumeTraded'], pIndex['Turnover'])) def OnRtnTransaction(self, pTransaction): code = str(pTransaction['SecurityID']) min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) # 输出逐笔成交数据 if pTransaction['ExecType'] == b"2": if min_volume is None: # 默认筛选50w if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000: return elif pTransaction['TradeVolume'] < min_volume: return # 撤单 item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'], "Volume": pTransaction['TradeVolume'], "OrderType": "2", "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], "SubSeq": pTransaction['SubSeq'], "OrderStatus": "D"} buyNo = pTransaction['BuyNo'] sellNo = pTransaction['SellNo'] if buyNo > 0: # 买 item["OrderNO"] = buyNo item["Side"] = "1" elif sellNo > 0: # 卖 item["OrderNO"] = sellNo item["Side"] = "2" # 深证撤单 print("逐笔委托", item) l2_data_manager.add_l2_order_detail(item, True) else: if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201: # 涨停价 # 成交 item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], "TradeVolume": pTransaction['TradeVolume'], "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'], "ExecType": pTransaction['ExecType'].decode()} print("逐笔成交", item) l2_data_manager.add_transaction_detail(item) logger_l2_transaction.info( "OnRtnTransaction SecurityID[%s] TradePrice[%.2f] TradeVolume[%d] TradeTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d] ExecType[%s]" % ( pTransaction['SecurityID'], pTransaction['TradePrice'], pTransaction['TradeVolume'], pTransaction['TradeTime'], pTransaction['MainSeq'], pTransaction['SubSeq'], pTransaction['BuyNo'], pTransaction['SellNo'], pTransaction['ExecType'], )) def OnRtnOrderDetail(self, pOrderDetail): can_listen = False code = str(pOrderDetail['SecurityID']) if code in self.special_code_volume_for_order_dict and self.special_code_volume_for_order_dict[code][0] == \ pOrderDetail['Volume']: if self.special_code_volume_for_order_dict[code][1] > time.time(): # 特殊量监听 can_listen = True else: self.special_code_volume_for_order_dict.pop(code) if not can_listen: min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) if min_volume is None: # 默认筛选50w if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000: return elif pOrderDetail['Volume'] < min_volume: return # 输出逐笔委托数据 # 上证OrderStatus=b"D"表示撤单 item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], "Volume": pOrderDetail['Volume'], "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(), "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], "OrderStatus": pOrderDetail['OrderStatus'].decode()} print("逐笔委托", item) l2_data_manager.add_l2_order_detail(item) logger_l2_orderdetail.info( "OnRtnOrderDetail SecurityID[%s] Price[%.2f] Volume[%d] Side[%s] OrderType[%s] OrderTime[%d] MainSeq[%d] SubSeq[%d] OrderNO[%s] OrderStatus[%s] Info1[%d] Info2[%d] Info3[%d]" % ( pOrderDetail['SecurityID'], pOrderDetail['Price'], pOrderDetail['Volume'], pOrderDetail['Side'], pOrderDetail['OrderType'], pOrderDetail['OrderTime'], pOrderDetail['MainSeq'], pOrderDetail['SubSeq'], pOrderDetail['OrderNO'], pOrderDetail['OrderStatus'], pOrderDetail['Info1'], pOrderDetail['Info2'], pOrderDetail['Info3'] )) def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): # 输出行情快照数据 print( "OnRtnBondMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % ( pDepthMarketData['SecurityID'], pDepthMarketData['LastPrice'], pDepthMarketData['TotalValueTrade'], pDepthMarketData['TotalValueTrade'], pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1'], pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1'])) # 输出一档价位买队列前50笔委托数量 for buy_index in range(0, FirstLevelBuyNum): print("first level buy [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index])) # 输出一档价位卖队列前50笔委托数量 for sell_index in range(0, FirstLevelSellNum): print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index])) def OnRtnBondTransaction(self, pTransaction): # 输出逐笔成交数据 print( "OnRtnBondTransaction SecurityID[%s] TradePrice[%.2f] TradeVolume[%d] TradeTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d] ExecType[%d]" % ( pTransaction['SecurityID'], pTransaction['TradePrice'], pTransaction['TradeVolume'], pTransaction['TradeTime'], pTransaction['MainSeq'], pTransaction['SubSeq'], pTransaction['BuyNo'], pTransaction['SellNo'], pTransaction['ExecType'], )) def OnRtnBondOrderDetail(self, pOrderDetail): # 输出逐笔委托数据 print( "OnRtnBondOrderDetail SecurityID[%s] Price[%.2f] Volume[%d] Side[%s] OrderType[%s] OrderTime[%d] MainSeq[%d] SubSeq[%d]" % ( pOrderDetail['SecurityID'], pOrderDetail['Price'], pOrderDetail['Volume'], pOrderDetail['Side'], pOrderDetail['OrderType'], pOrderDetail['OrderTime'], pOrderDetail['MainSeq'], pOrderDetail['SubSeq'])) def OnRtnXTSMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): # 输出行情快照数据 print( "OnRtnXTSMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % ( pDepthMarketData['SecurityID'], pDepthMarketData['LastPrice'], pDepthMarketData['TotalValueTrade'], pDepthMarketData['TotalValueTrade'], pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1'], pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1'])) # 输出一档价位买队列前50笔委托数量 for buy_index in range(0, FirstLevelBuyNum): print("first level buy [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index])) # 输出一档价位卖队列前50笔委托数量 for sell_index in range(0, FirstLevelSellNum): print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index])) def OnRtnXTSTick(self, pTick): # 输出上海债券逐笔数据’ print( "OnXTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % ( pTick['TickType'], pTick['SecurityID'], pTick['Price'], pTick['Volume'], pTick['TickTime'], pTick['MainSeq'], pTick['SubSeq'], pTick['BuyNo'], pTick['SellNo'])) def OnRtnNGTSTick(self, pTick): # 输出上海股基逐笔数据’ print( "OnRtnNGTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % ( pTick['TickType'], pTick['SecurityID'], pTick['Price'], pTick['Volume'], pTick['TickTime'], pTick['MainSeq'], pTick['SubSeq'], pTick['BuyNo'], pTick['SellNo'])) class MyL2ActionCallback(L2ActionCallback): def OnSetL2Position(self, client_id, request_id, codes_data): print("接受到命令", client_id, request_id, codes_data) try: spi.set_codes_data(codes_data) SendResponseSkManager.send_normal_response("l2_cmd", SendResponseSkManager.load_response(client_id, request_id, {"code": 0, "msg": "设置成功"})) except Exception as e: logging.exception(e) SendResponseSkManager.send_error_response("common", request_id, client_id, str(e)) def __init_l2(): print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) # case 1: Tcp方式 # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP # case 2: 组播方式 g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST # case 1缓存模式 api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True) # case 2非缓存模式 # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) global spi spi = Lev2MdSpi(api) api.RegisterSpi(spi) if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: api.RegisterFront(Front_Address) else: # case 1 从一个组播地址收取行情 api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "") # case 2:注册多个组播地址同时收行情 # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, Sender_Interface_Address); # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, Sender_Interface_Address); # case 3:efvi模式收行情 # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, Sender_Interface_Address, "enp101s0f0",4096, True); # case 1 不绑核运行 api.Init() def __receive_from_pipe(pipe): while True: try: value = pipe.recv() if value: value = value.decode("utf-8") data = json.loads(value) if data["type"] == "listen_volume": volume = data["data"]["volume"] code = data["data"]["code"] spi.set_code_special_watch_volume(code, volume) except: pass def run(pipe=None): __init_l2() if pipe is not None: t1 = threading.Thread(target=lambda: __receive_from_pipe(pipe)) # 后台运行 t1.setDaemon(True) t1.start() global l2CommandManager l2CommandManager = command_manager.L2CommandManager() l2CommandManager.init(constant.SERVER_IP, constant.SERVER_PORT, MyL2ActionCallback()) l2CommandManager.run() print("l2_client启动成功") if __name__ == "__main__": run() # spi.set_codes_data([("000333", 12000)]) input()