# -*- coding: utf-8 -*-
|
import logging
|
import queue
|
import time
|
import lev2mdapi
|
from log_module import log
|
from log_module.log import logger_local_huaxin_l2_subscript, logger_system
|
from utils import tool
|
|
IS_TEST = False
|
|
###B类###
|
Front_Address = "tcp://10.0.1.101:6900"
|
Multicast_Address = "udp://224.224.2.19:7889"
|
Multicast_Address2 = "udp://224.224.224.234:7890"
|
Local_Interface_Address = "192.168.84.126"
|
|
###测试地址###
|
if IS_TEST:
|
Front_Address = "tcp://210.14.72.17:16900"
|
Multicast_Address = "udp://224.224.2.19:7889"
|
Multicast_Address2 = "udp://224.224.224.234:7890"
|
Local_Interface_Address = "192.168.84.126"
|
|
g_SubMarketData = False
|
g_SubTransaction = False
|
g_SubOrderDetail = False
|
g_SubXTSTick = False
|
g_SubXTSMarketData = False
|
g_SubNGTSTick = False
|
g_SubBondMarketData = False
|
g_SubBondTransaction = False
|
g_SubBondOrderDetail = False
|
set_codes_data_queue = queue.Queue()
|
market_code_dict = {}
|
|
ENABLE_NGST = True
|
|
|
class L2TransactionDataManager:
|
def __init__(self, code):
|
self.code = code
|
self.__latest_buy_order = None
|
self.__big_buy_orders = []
|
self.__latest_sell_order = None
|
self.__big_sell_orders = []
|
|
def get_big_buy_orders(self):
|
return self.__big_buy_orders
|
|
def get_big_sell_orders(self):
|
return self.__big_sell_orders
|
|
def add_transaction_data(self, data):
|
item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"])
|
# item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
|
# "TradeVolume": pTransaction['TradeVolume'],
|
# "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
|
# "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
|
# "SellNo": pTransaction['SellNo'],
|
# "ExecType": pTransaction['ExecType'].decode()}
|
money = round(item[2] * item[3])
|
volume = item[3]
|
if not self.__latest_buy_order:
|
self.__latest_buy_order = [item[0], 0, 0]
|
if self.__latest_buy_order[0] == item[0]:
|
self.__latest_buy_order[1] += volume
|
self.__latest_buy_order[2] += money
|
else:
|
if self.__latest_buy_order[2] > 1e6:
|
self.__big_buy_orders.append((self.__latest_buy_order[0],self.__latest_buy_order[1], self.__latest_buy_order[2]))
|
self.__latest_buy_order = [item[0],volume, money]
|
|
if not self.__latest_sell_order:
|
self.__latest_sell_order = [item[1], 0, 0]
|
if self.__latest_sell_order[0] == item[1]:
|
self.__latest_sell_order[1] += volume
|
self.__latest_sell_order[2] += money
|
else:
|
if self.__latest_sell_order[2] > 1e6:
|
self.__big_sell_orders.append((self.__latest_sell_order[0],self.__latest_sell_order[1], self.__latest_sell_order[2]))
|
self.__latest_sell_order = [item[1], volume, money]
|
|
|
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
|
latest_codes_set = set()
|
|
special_code_volume_for_order_dict = {}
|
# 已经订阅的代码
|
subscripted_codes = set()
|
# 代码的上次成交的订单唯一索引
|
__last_transaction_keys_dict = {}
|
|
# 买入的大单订单号
|
__l2_transaction_data_dict = {}
|
|
def __init__(self, api, codes):
|
lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
|
self.__api = api
|
self.is_login = False
|
self.codes = codes
|
self.codes_volume_and_price_dict = {}
|
|
def __split_codes(self, codes):
|
szse_codes = []
|
sse_codes = []
|
for code in codes:
|
market_type = tool.get_market_type(code)
|
if market_type == tool.MARKET_TYPE_SZSE:
|
szse_codes.append(code.encode())
|
elif market_type == tool.MARKET_TYPE_SSE:
|
sse_codes.append(code.encode())
|
return sse_codes, szse_codes
|
|
# 新增订阅
|
|
# 取消订阅
|
def __unsubscribe(self, _codes):
|
sh, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
|
logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
|
if sh:
|
if ENABLE_NGST:
|
result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
|
else:
|
# 取消订阅逐笔成交
|
self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
if sz:
|
self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
|
def __subscribe(self, _codes):
|
sh, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
|
logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
|
if sh:
|
if ENABLE_NGST:
|
result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
|
else:
|
# 订阅逐笔成交
|
result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
|
|
if sz:
|
result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
|
|
def OnFrontConnected(self):
|
print("OnFrontConnected")
|
logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
|
logout_req = lev2mdapi.CTORATstpUserLogoutField()
|
self.__api.ReqUserLogout(logout_req, 1)
|
time.sleep(1)
|
# 请求登录
|
login_req = lev2mdapi.CTORATstpReqUserLoginField()
|
self.__api.ReqUserLogin(login_req, 2)
|
|
def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
|
print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
|
pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
|
if pRspInfo['ErrorID'] == 0:
|
print("----L2行情登录成功----")
|
self.is_login = True
|
logger_system.info(f"L2行情登录成功")
|
self.__subscribe(self.codes)
|
|
def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubMarketData")
|
|
def OnRspSubIndex(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubIndex")
|
|
def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubTransaction")
|
if pRspInfo["ErrorID"] == 0:
|
print("订阅成功")
|
self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
|
if bIsLast == 1:
|
print("订阅响应结束", self.subscripted_codes)
|
|
def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
if pRspInfo["ErrorID"] == 0:
|
print("订阅成功")
|
self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
|
if bIsLast == 1:
|
print("订阅响应结束", self.subscripted_codes)
|
|
def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
try:
|
code = pSpecificSecurity['SecurityID']
|
logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}")
|
self.subscripted_codes.discard(code)
|
if bIsLast == 1:
|
print("取消订阅响应结束", self.subscripted_codes)
|
except Exception as e:
|
logging.exception(e)
|
|
def OnRtnTransaction(self, pTransaction):
|
code = str(pTransaction['SecurityID'])
|
# min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
|
# 输出逐笔成交数据
|
if pTransaction['ExecType'] == b"2":
|
pass
|
else:
|
item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
|
"TradeVolume": pTransaction['TradeVolume'],
|
"OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
|
"SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
|
"SellNo": pTransaction['SellNo'],
|
"ExecType": pTransaction['ExecType'].decode()}
|
if item["SecurityID"] not in self.__l2_transaction_data_dict:
|
self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
|
self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
|
|
def OnRtnNGTSTick(self, pTick):
|
"""
|
上证股票的逐笔委托与成交
|
@param pTick:
|
@return:
|
"""
|
try:
|
if pTick['TickType'] == b'T':
|
# 成交
|
item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'],
|
"TradeVolume": pTick['Volume'],
|
"OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
|
"SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'],
|
"SellNo": pTick['SellNo'],
|
"ExecType": '1'}
|
if item["SecurityID"] not in self.__l2_transaction_data_dict:
|
self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
|
self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
|
except Exception as e:
|
logger_local_huaxin_l2_subscript.exception(e)
|
|
|
def __init_l2(codes):
|
print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
|
# case 1: Tcp方式
|
# g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
|
# case 2: 组播方式
|
g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
|
if IS_TEST:
|
g_SubMode = lev2mdapi.TORA_TSTP_MST_TCP
|
# case 1缓存模式
|
api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
|
# case 2非缓存模式
|
# api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
|
global spi
|
spi = Lev2MdSpi(api, codes)
|
api.RegisterSpi(spi)
|
# -------------------正式模式-------------------------------------
|
if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
|
api.RegisterFront(Front_Address)
|
else:
|
# case 1 从一个组播地址收取行情
|
api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
|
# api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "")
|
|
# case 2:注册多个组播地址同时收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "");
|
# api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "");
|
|
# case 3:efvi模式收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True);
|
|
# case 1 不绑核运行
|
api.Init()
|
|
|
def run(codes) -> None:
|
try:
|
log.close_print()
|
__init_l2(codes)
|
logger_system.info(f"L2订阅服务启动成功:")
|
except Exception as e:
|
logger_system.exception(e)
|
while True:
|
time.sleep(2)
|
|
|
if __name__ == "__main__":
|
run({"000009", "601618"})
|
input()
|