New file |
| | |
| | | # -*- coding: utf-8 -*- |
| | | import logging |
| | | import queue |
| | | import time |
| | | import lev2mdapi |
| | | from log_module import log |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system |
| | | from utils import tool |
| | | |
| | | IS_TEST = True |
| | | |
| | | ###B类### |
| | | Front_Address = "tcp://10.0.1.101:6900" |
| | | Multicast_Address = "udp://224.224.2.19:7889" |
| | | Multicast_Address2 = "udp://224.224.224.234:7890" |
| | | Local_Interface_Address = "192.168.84.126" |
| | | |
| | | ###测试地址### |
| | | if IS_TEST: |
| | | Front_Address = "tcp://210.14.72.17:16900" |
| | | Multicast_Address = "udp://224.224.2.19:7889" |
| | | Multicast_Address2 = "udp://224.224.224.234:7890" |
| | | Local_Interface_Address = "192.168.84.126" |
| | | |
| | | g_SubMarketData = False |
| | | g_SubTransaction = False |
| | | g_SubOrderDetail = False |
| | | g_SubXTSTick = False |
| | | g_SubXTSMarketData = False |
| | | g_SubNGTSTick = False |
| | | g_SubBondMarketData = False |
| | | g_SubBondTransaction = False |
| | | g_SubBondOrderDetail = False |
| | | set_codes_data_queue = queue.Queue() |
| | | market_code_dict = {} |
| | | |
| | | ENABLE_NGST = True |
| | | |
| | | |
| | | class L2TransactionDataManager: |
| | | def __init__(self, code): |
| | | self.code = code |
| | | self.__latest_buy_order = None |
| | | self.__big_buy_orders = [] |
| | | self.__latest_sell_order = None |
| | | self.__big_sell_orders = [] |
| | | |
| | | def get_big_buy_orders(self): |
| | | return self.__big_buy_orders |
| | | |
| | | def get_big_sell_orders(self): |
| | | return self.__big_sell_orders |
| | | |
| | | def add_transaction_data(self, data): |
| | | item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"]) |
| | | # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], |
| | | # "TradeVolume": pTransaction['TradeVolume'], |
| | | # "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], |
| | | # "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], |
| | | # "SellNo": pTransaction['SellNo'], |
| | | # "ExecType": pTransaction['ExecType'].decode()} |
| | | money = round(item[2] * item[3]) |
| | | volume = item[3] |
| | | if not self.__latest_buy_order: |
| | | self.__latest_buy_order = [item[0], 0, 0] |
| | | if self.__latest_buy_order[0] == item[0]: |
| | | self.__latest_buy_order[1] += volume |
| | | self.__latest_buy_order[2] += money |
| | | else: |
| | | if self.__latest_buy_order[2] > 1e6: |
| | | self.__big_buy_orders.append((self.__latest_buy_order[0],self.__latest_buy_order[1], self.__latest_buy_order[2])) |
| | | self.__latest_buy_order = [item[0],volume, money] |
| | | |
| | | if not self.__latest_sell_order: |
| | | self.__latest_sell_order = [item[1], 0, 0] |
| | | if self.__latest_sell_order[0] == item[1]: |
| | | self.__latest_sell_order[1] += volume |
| | | self.__latest_sell_order[2] += money |
| | | else: |
| | | if self.__latest_sell_order[2] > 1e6: |
| | | self.__big_sell_orders.append((self.__latest_sell_order[0],self.__latest_sell_order[1], self.__latest_sell_order[2])) |
| | | self.__latest_sell_order = [item[1], volume, money] |
| | | |
| | | |
| | | class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): |
| | | latest_codes_set = set() |
| | | |
| | | special_code_volume_for_order_dict = {} |
| | | # 已经订阅的代码 |
| | | subscripted_codes = set() |
| | | # 代码的上次成交的订单唯一索引 |
| | | __last_transaction_keys_dict = {} |
| | | |
| | | # 买入的大单订单号 |
| | | __l2_transaction_data_dict = {} |
| | | |
| | | def __init__(self, api, codes): |
| | | lev2mdapi.CTORATstpLev2MdSpi.__init__(self) |
| | | self.__api = api |
| | | self.is_login = False |
| | | self.codes = codes |
| | | self.codes_volume_and_price_dict = {} |
| | | |
| | | def __split_codes(self, codes): |
| | | szse_codes = [] |
| | | sse_codes = [] |
| | | for code in codes: |
| | | market_type = tool.get_market_type(code) |
| | | if market_type == tool.MARKET_TYPE_SZSE: |
| | | szse_codes.append(code.encode()) |
| | | elif market_type == tool.MARKET_TYPE_SSE: |
| | | sse_codes.append(code.encode()) |
| | | return sse_codes, szse_codes |
| | | |
| | | # 新增订阅 |
| | | |
| | | # 取消订阅 |
| | | def __unsubscribe(self, _codes): |
| | | sh, sz = self.__split_codes(_codes) |
| | | logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}") |
| | | if sh: |
| | | if ENABLE_NGST: |
| | | result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}") |
| | | else: |
| | | # 取消订阅逐笔成交 |
| | | self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | if sz: |
| | | self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | |
| | | def __subscribe(self, _codes): |
| | | sh, sz = self.__split_codes(_codes) |
| | | logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}") |
| | | if sh: |
| | | if ENABLE_NGST: |
| | | result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}") |
| | | else: |
| | | # 订阅逐笔成交 |
| | | result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}") |
| | | |
| | | if sz: |
| | | result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}") |
| | | |
| | | def OnFrontConnected(self): |
| | | print("OnFrontConnected") |
| | | logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}") |
| | | logout_req = lev2mdapi.CTORATstpUserLogoutField() |
| | | self.__api.ReqUserLogout(logout_req, 1) |
| | | time.sleep(1) |
| | | # 请求登录 |
| | | login_req = lev2mdapi.CTORATstpReqUserLoginField() |
| | | self.__api.ReqUserLogin(login_req, 2) |
| | | |
| | | def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % ( |
| | | pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast)) |
| | | if pRspInfo['ErrorID'] == 0: |
| | | print("----L2行情登录成功----") |
| | | self.is_login = True |
| | | logger_system.info(f"L2行情登录成功") |
| | | self.__subscribe(self.codes) |
| | | |
| | | def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspSubMarketData") |
| | | |
| | | def OnRspSubIndex(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspSubIndex") |
| | | |
| | | def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspSubTransaction") |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | | if bIsLast == 1: |
| | | print("订阅响应结束", self.subscripted_codes) |
| | | |
| | | def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | | if bIsLast == 1: |
| | | print("订阅响应结束", self.subscripted_codes) |
| | | |
| | | def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | try: |
| | | code = pSpecificSecurity['SecurityID'] |
| | | logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}") |
| | | self.subscripted_codes.discard(code) |
| | | if bIsLast == 1: |
| | | print("取消订阅响应结束", self.subscripted_codes) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | def OnRtnTransaction(self, pTransaction): |
| | | code = str(pTransaction['SecurityID']) |
| | | # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | # 输出逐笔成交数据 |
| | | if pTransaction['ExecType'] == b"2": |
| | | pass |
| | | else: |
| | | item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], |
| | | "TradeVolume": pTransaction['TradeVolume'], |
| | | "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], |
| | | "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], |
| | | "SellNo": pTransaction['SellNo'], |
| | | "ExecType": pTransaction['ExecType'].decode()} |
| | | if item["SecurityID"] not in self.__l2_transaction_data_dict: |
| | | self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"]) |
| | | self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item) |
| | | |
| | | def OnRtnNGTSTick(self, pTick): |
| | | """ |
| | | 上证股票的逐笔委托与成交 |
| | | @param pTick: |
| | | @return: |
| | | """ |
| | | try: |
| | | if pTick['TickType'] == b'T': |
| | | # 成交 |
| | | item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'], |
| | | "TradeVolume": pTick['Volume'], |
| | | "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'], |
| | | "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'], |
| | | "SellNo": pTick['SellNo'], |
| | | "ExecType": '1'} |
| | | if item["SecurityID"] not in self.__l2_transaction_data_dict: |
| | | self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"]) |
| | | self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item) |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_subscript.exception(e) |
| | | |
| | | |
| | | def __init_l2(codes): |
| | | print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) |
| | | # case 1: Tcp方式 |
| | | # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP |
| | | # case 2: 组播方式 |
| | | g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST |
| | | if IS_TEST: |
| | | g_SubMode = lev2mdapi.TORA_TSTP_MST_TCP |
| | | # case 1缓存模式 |
| | | api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True) |
| | | # case 2非缓存模式 |
| | | # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) |
| | | global spi |
| | | spi = Lev2MdSpi(api, codes) |
| | | api.RegisterSpi(spi) |
| | | # -------------------正式模式------------------------------------- |
| | | if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: |
| | | api.RegisterFront(Front_Address) |
| | | else: |
| | | # case 1 从一个组播地址收取行情 |
| | | api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "") |
| | | # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "") |
| | | |
| | | # case 2:注册多个组播地址同时收行情 |
| | | # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, ""); |
| | | # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, ""); |
| | | |
| | | # case 3:efvi模式收行情 |
| | | # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True); |
| | | |
| | | # case 1 不绑核运行 |
| | | api.Init() |
| | | |
| | | |
| | | def run(codes) -> None: |
| | | try: |
| | | log.close_print() |
| | | __init_l2(codes) |
| | | logger_system.info(f"L2订阅服务启动成功:") |
| | | except Exception as e: |
| | | logger_system.exception(e) |
| | | while True: |
| | | time.sleep(2) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | run({"000009", "601618"}) |
| | | input() |