# -*- coding: utf-8 -*-
|
import logging
|
import multiprocessing
|
import queue
|
import time
|
import lev2mdapi
|
from log_module import log
|
from log_module.log import logger_local_huaxin_l2_subscript, logger_system
|
from utils import tool
|
|
IS_TEST = False
|
|
###B类###
|
Front_Address = "tcp://10.0.1.101:6900"
|
Multicast_Address = "udp://224.224.2.19:7889"
|
Multicast_Address2 = "udp://224.224.224.234:7890"
|
Local_Interface_Address = "192.168.84.126"
|
|
###测试地址###
|
if IS_TEST:
|
Front_Address = "tcp://210.14.72.17:16900"
|
Multicast_Address = "udp://224.224.2.19:7889"
|
Multicast_Address2 = "udp://224.224.224.234:7890"
|
Local_Interface_Address = "192.168.84.126"
|
|
g_SubMarketData = False
|
g_SubTransaction = False
|
g_SubOrderDetail = False
|
g_SubXTSTick = False
|
g_SubXTSMarketData = False
|
g_SubNGTSTick = False
|
g_SubBondMarketData = False
|
g_SubBondTransaction = False
|
g_SubBondOrderDetail = False
|
set_codes_data_queue = queue.Queue(maxsize=10240)
|
market_code_dict = {}
|
|
ENABLE_NGST = True
|
|
|
class L2TransactionDataManager:
|
def __init__(self, code, accurate_buy=False):
|
"""
|
@param code:
|
@param accurate_buy: 是否需要精确的买单信息
|
"""
|
self.code = code
|
self.__latest_buy_order = None
|
self.__big_buy_orders = []
|
# 精确的买单信息,{买单号:订单信息}
|
self.__big_accurate_buy_order_dict = {}
|
self.__big_accurate_sell_order_dict = {}
|
self.__latest_sell_order = None
|
self.__big_sell_orders = []
|
self.big_accurate_buy_order_queue = queue.Queue(maxsize=10240)
|
self.big_accurate_sell_order_queue = queue.Queue(maxsize=10240)
|
self.big_buy_order_queue = queue.Queue(maxsize=10240)
|
self.big_sell_order_queue = queue.Queue(maxsize=10240)
|
self.accurate_buy = accurate_buy
|
self.__last_accurate_buy_count = 0
|
self.__last_accurate_sell_count = 0
|
|
def get_big_buy_orders(self):
|
return self.__big_buy_orders
|
|
def get_big_sell_orders(self):
|
return self.__big_sell_orders
|
|
def add_transaction_data_for_accurate(self, data):
|
"""
|
获取精确的买单信息
|
@param data:
|
@return:
|
"""
|
|
def format_timestamp(timestamp):
|
time_str = str(timestamp)
|
return int(time_str[:5] if time_str[0] == '9' else time_str[:6])
|
|
item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"])
|
# item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
|
# "TradeVolume": pTransaction['TradeVolume'],
|
# "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
|
# "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
|
# "SellNo": pTransaction['SellNo'],
|
# "ExecType": pTransaction['ExecType'].decode()}
|
money = round(item[2] * item[3])
|
volume = item[3]
|
price = item[2]
|
order_time = data["OrderTime"]
|
if item[0] not in self.__big_accurate_buy_order_dict:
|
# (买单号, 量, 金额, 时间, 最新成交价格)
|
self.__big_accurate_buy_order_dict[item[0]] = [item[0], 0, 0, order_time, price]
|
buy_order_info = self.__big_accurate_buy_order_dict[item[0]]
|
buy_order_info[1] += volume
|
buy_order_info[2] += money
|
buy_order_info[3] = order_time
|
buy_order_info[4] = price
|
# 将大单写入本地文件
|
if self.__latest_buy_order and self.__latest_buy_order[0] != item[0]:
|
# 有可能是大单成交完成, 判断上个订单是否是大单
|
last_buy_order = self.__big_accurate_buy_order_dict.get(self.__latest_buy_order[0])
|
try:
|
if last_buy_order[2] > 299e4:
|
self.big_accurate_buy_order_queue.put_nowait(last_buy_order)
|
except Exception as e:
|
print("数据:", last_buy_order, item)
|
raise e
|
# 如果数据过多需要移除过长时间的小金额数据
|
accurate_buy_count = len(self.__big_accurate_buy_order_dict.keys())
|
if accurate_buy_count > 10000 and accurate_buy_count - self.__last_accurate_buy_count > 2000:
|
# 超过1w条数据且新增2000条数据
|
# 超过1w条数据就要移除30分钟之前的数据
|
now_time_int = int(tool.trade_time_add_second(tool.get_now_time_str(), -3600).replace(":", ""))
|
try:
|
remove_order_nos = [x for x in self.__big_accurate_buy_order_dict if
|
now_time_int - format_timestamp(
|
self.__big_accurate_buy_order_dict[x][3]) > 0]
|
if remove_order_nos:
|
for order_no in remove_order_nos:
|
self.__big_accurate_buy_order_dict.pop(order_no)
|
finally:
|
self.__last_accurate_buy_count = len(self.__big_accurate_buy_order_dict)
|
|
# 统计卖单
|
if item[1] not in self.__big_accurate_sell_order_dict:
|
# (卖单号, 量, 金额, 时间, 最新成交价格)
|
self.__big_accurate_sell_order_dict[item[1]] = [item[1], 0, 0, order_time, price]
|
sell_order_info = self.__big_accurate_sell_order_dict[item[1]]
|
sell_order_info[1] += volume
|
sell_order_info[2] += money
|
sell_order_info[3] = order_time
|
sell_order_info[4] = price
|
if self.__latest_sell_order and self.__latest_sell_order[0] != item[1]:
|
# 有可能是大单成交完成, 判断上个订单是否是大单
|
last_sell_order = self.__big_accurate_sell_order_dict.get(self.__latest_sell_order[0])
|
if last_sell_order[2] > 299e4:
|
self.big_accurate_sell_order_queue.put_nowait(last_sell_order)
|
# 如果数据过多需要移除过长时间的小金额数据
|
accurate_sell_count = len(self.__big_accurate_sell_order_dict.keys())
|
if accurate_sell_count > 10000 and accurate_sell_count - self.__last_accurate_sell_count > 2000:
|
# 超过1w条数据且新增2000条数据
|
# 超过1w条数据就要移除30分钟之前的数据
|
now_time_int = int(tool.trade_time_add_second(tool.get_now_time_str(), -3600).replace(":", ""))
|
try:
|
remove_order_nos = [x for x in self.__big_accurate_sell_order_dict if
|
now_time_int - format_timestamp(
|
self.__big_accurate_sell_order_dict[x][3]) > 0]
|
if remove_order_nos:
|
for order_no in remove_order_nos:
|
self.__big_accurate_sell_order_dict.pop(order_no)
|
finally:
|
self.__last_accurate_sell_count = len(self.__big_accurate_sell_order_dict.keys())
|
|
def add_transaction_data(self, data):
|
item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"])
|
# item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
|
# "TradeVolume": pTransaction['TradeVolume'],
|
# "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
|
# "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
|
# "SellNo": pTransaction['SellNo'],
|
# "ExecType": pTransaction['ExecType'].decode()}
|
money = round(item[2] * item[3])
|
volume = item[3]
|
price = item[2]
|
order_time = data["OrderTime"]
|
|
if self.accurate_buy:
|
self.add_transaction_data_for_accurate(data)
|
|
if not self.__latest_buy_order:
|
# (买单号, 量, 金额, 时间, 最新成交价格)
|
self.__latest_buy_order = [item[0], 0, 0, order_time, price]
|
if self.__latest_buy_order[0] == item[0]:
|
self.__latest_buy_order[1] += volume
|
self.__latest_buy_order[2] += money
|
self.__latest_buy_order[3] = order_time
|
self.__latest_buy_order[4] = price
|
else:
|
if self.__latest_buy_order[2] > 1e6:
|
d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2],
|
self.__latest_buy_order[3], self.__latest_buy_order[4])
|
self.__big_buy_orders.append(d)
|
self.big_buy_order_queue.put_nowait(d)
|
|
self.__latest_buy_order = [item[0], volume, money, order_time, price]
|
|
if not self.__latest_sell_order:
|
self.__latest_sell_order = [item[1], 0, 0, order_time, price]
|
if self.__latest_sell_order[0] == item[1]:
|
self.__latest_sell_order[1] += volume
|
self.__latest_sell_order[2] += money
|
self.__latest_sell_order[3] = order_time
|
self.__latest_sell_order[4] = price
|
else:
|
if self.__latest_sell_order[2] > 1e6:
|
d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2],
|
self.__latest_sell_order[3], self.__latest_sell_order[4])
|
self.__big_sell_orders.append(d)
|
self.big_sell_order_queue.put_nowait(d)
|
self.__latest_sell_order = [item[1], volume, money, order_time, price]
|
|
|
# 买入的大单订单号
|
l2_transaction_data_dict = {}
|
|
|
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
|
latest_codes_set = set()
|
|
special_code_volume_for_order_dict = {}
|
# 已经订阅的代码
|
subscripted_codes = set()
|
# 代码的上次成交的订单唯一索引
|
__last_transaction_keys_dict = {}
|
|
def __init__(self, api, codes, special_codes):
|
lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
|
self.__api = api
|
self.is_login = False
|
self.codes = codes
|
self.codes_volume_and_price_dict = {}
|
self.special_codes = special_codes
|
|
def __split_codes(self, codes):
|
szse_codes = []
|
sse_codes = []
|
for code in codes:
|
market_type = tool.get_market_type(code)
|
if market_type == tool.MARKET_TYPE_SZSE:
|
szse_codes.append(code.encode())
|
elif market_type == tool.MARKET_TYPE_SSE:
|
sse_codes.append(code.encode())
|
return sse_codes, szse_codes
|
|
# 新增订阅
|
|
# 取消订阅
|
def __unsubscribe(self, _codes):
|
sh, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
|
logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
|
if sh:
|
if ENABLE_NGST:
|
result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
|
else:
|
# 取消订阅逐笔成交
|
self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
if sz:
|
self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
|
def __subscribe(self, _codes):
|
sh, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
|
logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
|
if sh:
|
if ENABLE_NGST:
|
result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
|
else:
|
# 订阅逐笔成交
|
result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
|
|
if sz:
|
result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
|
|
def OnFrontConnected(self):
|
print("OnFrontConnected")
|
logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
|
logout_req = lev2mdapi.CTORATstpUserLogoutField()
|
self.__api.ReqUserLogout(logout_req, 1)
|
time.sleep(1)
|
# 请求登录
|
login_req = lev2mdapi.CTORATstpReqUserLoginField()
|
self.__api.ReqUserLogin(login_req, 2)
|
|
def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
|
print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
|
pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
|
if pRspInfo['ErrorID'] == 0:
|
print("----L2行情登录成功----")
|
self.is_login = True
|
logger_system.info(f"L2行情登录成功")
|
self.__subscribe(self.codes)
|
|
def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubMarketData")
|
|
def OnRspSubIndex(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubIndex")
|
|
def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubTransaction")
|
if pRspInfo["ErrorID"] == 0:
|
print("订阅成功")
|
self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
|
if bIsLast == 1:
|
print("订阅响应结束", self.subscripted_codes)
|
|
def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
if pRspInfo["ErrorID"] == 0:
|
print("订阅成功")
|
self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
|
if bIsLast == 1:
|
print("订阅响应结束", self.subscripted_codes)
|
|
def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
try:
|
code = pSpecificSecurity['SecurityID']
|
logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}")
|
self.subscripted_codes.discard(code)
|
if bIsLast == 1:
|
print("取消订阅响应结束", self.subscripted_codes)
|
except Exception as e:
|
logging.exception(e)
|
|
def OnRtnTransaction(self, pTransaction):
|
code = str(pTransaction['SecurityID'])
|
# min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
|
# 输出逐笔成交数据
|
if pTransaction['ExecType'] == b"2":
|
pass
|
else:
|
item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
|
"TradeVolume": pTransaction['TradeVolume'],
|
"OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
|
"SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
|
"SellNo": pTransaction['SellNo'],
|
"ExecType": pTransaction['ExecType'].decode()}
|
if item["SecurityID"] not in l2_transaction_data_dict:
|
l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], item[
|
"SecurityID"] in self.special_codes)
|
l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
|
|
def OnRtnNGTSTick(self, pTick):
|
"""
|
上证股票的逐笔委托与成交
|
@param pTick:
|
@return:
|
"""
|
try:
|
if pTick['TickType'] == b'T':
|
# 成交
|
item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'],
|
"TradeVolume": pTick['Volume'],
|
"OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
|
"SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'],
|
"SellNo": pTick['SellNo'],
|
"ExecType": '1'}
|
if item["SecurityID"] not in l2_transaction_data_dict:
|
l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], item[
|
"SecurityID"] in self.special_codes)
|
l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
|
except Exception as e:
|
logger_local_huaxin_l2_subscript.exception(e)
|
|
|
def __init_l2(codes, special_codes):
|
print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
|
# case 1: Tcp方式
|
# g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
|
# case 2: 组播方式
|
g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
|
if IS_TEST:
|
g_SubMode = lev2mdapi.TORA_TSTP_MST_TCP
|
# case 1缓存模式
|
api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
|
# case 2非缓存模式
|
# api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
|
global spi
|
spi = Lev2MdSpi(api, codes, special_codes)
|
api.RegisterSpi(spi)
|
# -------------------正式模式-------------------------------------
|
if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
|
api.RegisterFront(Front_Address)
|
else:
|
# case 1 从一个组播地址收取行情
|
api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
|
# api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "")
|
|
# case 2:注册多个组播地址同时收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "");
|
# api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "");
|
|
# case 3:efvi模式收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True);
|
|
# case 1 不绑核运行
|
api.Init()
|
|
|
def run(codes, _queue: multiprocessing.Queue, accurate_buy_order_queue: multiprocessing.Queue, special_codes) -> None:
|
"""
|
运行订阅
|
@param accurate_buy_order_queue: 精确大单队列
|
@param codes: 订阅的代码
|
@param _queue: 数据传输的队列
|
@param special_codes: 需要确定完整大单的代码
|
@return:
|
"""
|
try:
|
log.close_print()
|
__init_l2(codes, special_codes)
|
logger_system.info(f"L2订阅服务启动成功:")
|
except Exception as e:
|
logger_system.exception(e)
|
while True:
|
try:
|
# 读取一遍
|
for code in l2_transaction_data_dict:
|
l2_transaction_data_manager: L2TransactionDataManager = l2_transaction_data_dict[code]
|
|
while True:
|
if not l2_transaction_data_manager.big_buy_order_queue.empty():
|
result = l2_transaction_data_manager.big_buy_order_queue.get(block=False)
|
if result:
|
_queue.put_nowait((code, 0, result))
|
else:
|
break
|
|
while True:
|
if not l2_transaction_data_manager.big_accurate_buy_order_queue.empty():
|
result = l2_transaction_data_manager.big_accurate_buy_order_queue.get(block=False)
|
if result:
|
accurate_buy_order_queue.put_nowait((code, 0, result))
|
else:
|
break
|
|
while True:
|
if not l2_transaction_data_manager.big_accurate_sell_order_queue.empty():
|
result = l2_transaction_data_manager.big_accurate_sell_order_queue.get(block=False)
|
if result:
|
accurate_buy_order_queue.put_nowait((code, 1, result))
|
else:
|
break
|
|
while True:
|
if not l2_transaction_data_manager.big_sell_order_queue.empty():
|
result = l2_transaction_data_manager.big_sell_order_queue.get(block=False)
|
if result:
|
_queue.put_nowait((code, 1, result))
|
else:
|
break
|
except:
|
pass
|
finally:
|
time.sleep(1)
|
|
|
if __name__ == "__main__":
|
pass
|