# -*- coding: utf-8 -*-
|
import decimal
|
import json
|
import logging
|
import multiprocessing
|
import os
|
import pickle
|
import queue
|
import time
|
import concurrent.futures
|
|
from huaxin_client import l1_subscript_codes_manager
|
from huaxin_client import constant
|
import lev2mdapi
|
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_contact_debug
|
from utils import tool
|
|
###B类###
|
Front_Address = "tcp://10.0.1.101:6900"
|
Multicast_Address = "udp://224.224.2.19:7889"
|
Multicast_Address2 = "udp://224.224.224.234:7890"
|
Local_Interface_Address = constant.LOCAL_IP
|
|
set_codes_data_queue = queue.Queue(maxsize=1000)
|
market_code_dict = {}
|
|
|
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
|
latest_codes_set = set()
|
|
special_code_volume_for_order_dict = {}
|
# 已经订阅的代码
|
subscripted_codes = set()
|
|
def __init__(self, api):
|
lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
|
self.__api = api
|
self.is_login = False
|
|
def __split_codes(self, codes):
|
szse_codes = []
|
sse_codes = []
|
for code in codes:
|
market_type = tool.get_market_type(code)
|
if market_type == tool.MARKET_TYPE_SSE:
|
sse_codes.append(code.encode())
|
elif market_type == tool.MARKET_TYPE_SZSE:
|
szse_codes.append(code.encode())
|
return sse_codes, szse_codes
|
|
def __unsubscribe(self, _codes):
|
sh, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
|
logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
|
if sh:
|
# 取消订阅逐笔委托
|
self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
if sz:
|
self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
|
def __subscribe(self, _codes):
|
sh, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
|
logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
|
if sh:
|
# 订阅逐笔委托
|
result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}")
|
if sz:
|
result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}")
|
|
def __process_codes_data(self, codes):
|
codes = set(codes)
|
if not self.is_login and not constant.TEST:
|
raise Exception("L2尚未登录")
|
add_codes = codes - self.subscripted_codes
|
del_codes = self.subscripted_codes - codes
|
self.__subscribe(add_codes)
|
self.__unsubscribe(del_codes)
|
# 设置最近的代码列表
|
self.latest_codes_set = codes
|
|
# 订阅代码,[代码,...]
|
def set_codes_data(self, codes):
|
try:
|
self.__process_codes_data(codes)
|
except Exception as e:
|
logging.exception(e)
|
logger_local_huaxin_l2_subscript.exception(e)
|
finally:
|
# 保存一份最新的数据
|
pass
|
|
@classmethod
|
def __set_latest_datas(cls, codes_data):
|
data_str = json.dumps([tool.get_now_date_str(), codes_data])
|
with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
|
f.write(data_str)
|
|
@classmethod
|
def __get_latest_datas(cls):
|
if os.path.exists(constant.L2_CODES_INFO_PATH):
|
with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
|
str_ = f.readline()
|
data_json = json.loads(str_)
|
if data_json[0] == tool.get_now_date_str():
|
return data_json[1]
|
return []
|
|
def OnFrontConnected(self):
|
logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
|
logout_req = lev2mdapi.CTORATstpUserLogoutField()
|
self.__api.ReqUserLogout(logout_req, 1)
|
time.sleep(1)
|
# 请求登录
|
login_req = lev2mdapi.CTORATstpReqUserLoginField()
|
self.__api.ReqUserLogin(login_req, 2)
|
|
def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
|
if pRspInfo['ErrorID'] == 0:
|
self.is_login = True
|
logger_system.info(f"L2行情登录成功")
|
# 初始设置值
|
# threading.Thread(
|
# target=lambda: self.__process_codes_data(self.__get_latest_datas()),
|
# daemon=True).start()
|
# 订阅L2
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
|
codes = set()
|
for code in codes_sh:
|
codes.add(code.decode("utf-8"))
|
for code in codes_sz:
|
codes.add(code.decode("utf-8"))
|
self.set_codes_data(codes)
|
|
def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
|
FirstLevelSellOrderVolumes):
|
d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
|
"lastPrice": pDepthMarketData['LastPrice'],
|
"totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'],
|
"totalValueTrade": pDepthMarketData['TotalValueTrade'],
|
"totalAskVolume": pDepthMarketData['TotalAskVolume'],
|
"avgAskPrice": pDepthMarketData["AvgAskPrice"],
|
"buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
|
(pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
|
(pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
|
(pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
|
(pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
|
"sell": [
|
(pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
|
(pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
|
(pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
|
(pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
|
(pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
|
]
|
}
|
# (代码, 时间戳, 价格, 总交易量, 总交易额, 买5, 卖5)
|
data = (
|
d["securityID"], d["dataTimeStamp"], d["lastPrice"], d["totalVolumeTrade"], d["totalValueTrade"], d["buy"],
|
d["sell"])
|
market_code_dict[d["securityID"]] = data
|
|
|
def __init_l2():
|
# print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
|
# case 1: Tcp方式
|
# g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
|
# case 2: 组播方式
|
g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
|
|
# case 1缓存模式
|
api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
|
# case 2非缓存模式
|
# api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
|
global spi
|
spi = Lev2MdSpi(api)
|
api.RegisterSpi(spi)
|
# -------------------正式模式-------------------------------------
|
if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
|
api.RegisterFront(Front_Address)
|
else:
|
# case 1 从一个组播地址收取行情
|
api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
|
# api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "")
|
|
# case 2:注册多个组播地址同时收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "");
|
# api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "");
|
|
# case 3:efvi模式收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True);
|
|
# case 1 不绑核运行
|
api.Init()
|
|
|
__l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
|
|
pipe_strategy = None
|
|
__latest_subscript_codes = set()
|
|
|
def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas):
|
if not tool.is_trade_time():
|
return
|
# 上传数据
|
type_ = "set_target_codes"
|
request_id = f"sb_{int(time.time() * 1000)}"
|
fdata = pickle.dumps(
|
{"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)}, protocol=pickle.HIGHEST_PROTOCOL)
|
if queue_l1_w_strategy_r is not None:
|
queue_l1_w_strategy_r.put_nowait(fdata)
|
# 记录新增加的代码
|
# codes = set([x[0] for x in datas])
|
# add_codes = codes - __latest_subscript_codes
|
# __latest_subscript_codes.clear()
|
# for c in codes:
|
# __latest_subscript_codes.add(c)
|
# if add_codes:
|
# hx_logger_l2_market_data_before_open.info(f"({request_id})新增加订阅的代码:{add_codes}")
|
|
|
def run(queue_l1_w_strategy_r) -> None:
|
logger_system.info("L2进程ID:{}", os.getpid())
|
logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
|
try:
|
# log.close_print()
|
__init_l2()
|
except Exception as e:
|
logger_system.exception(e)
|
while True:
|
# 只读9:30之后的数据
|
if tool.get_now_time_str() < '09:24:59':
|
continue
|
try:
|
# (代码,现价,涨幅,量,时间)
|
list_ = [market_code_dict[k] for k in market_code_dict]
|
__upload_codes_info(queue_l1_w_strategy_r, list_)
|
except Exception as e:
|
pass
|
finally:
|
time.sleep(2)
|