# -*- coding: utf-8 -*-
|
import decimal
|
import json
|
import logging
|
import multiprocessing
|
import os
|
import queue
|
import threading
|
import time
|
import concurrent.futures
|
from huaxin_client import constant
|
import lev2mdapi
|
from log_module.async_log_util import huaxin_l2_log
|
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, printlog
|
from utils import tool
|
|
###B类###
|
Front_Address = "tcp://10.0.1.101:6900"
|
Multicast_Address = "udp://224.224.2.19:7889"
|
Multicast_Address2 = "udp://224.224.224.234:7890"
|
Local_Interface_Address = "192.168.84.71"
|
|
set_codes_data_queue = queue.Queue()
|
market_code_dict = {}
|
|
|
class L2MarketDataCallback:
|
def on_markets(self, datas):
|
"""
|
市场行情回调
|
:param datas:[(代码,昨日收盘价,最新价,总成交量,总成交额,买五档(价格,成交额),卖五档(价格,成交额),更新时间)]
|
:return:
|
"""
|
|
|
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
|
latest_codes_set = set()
|
|
special_code_volume_for_order_dict = {}
|
# 已经订阅的代码
|
subscripted_codes = set()
|
|
def __init__(self, api, codes):
|
lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
|
self.__api = api
|
self.is_login = False
|
# 行情信息
|
self.__market_info_dict = {}
|
self.__codes = codes
|
|
def __split_codes(self, codes):
|
szse_codes = []
|
sse_codes = []
|
for code in codes:
|
if tool.is_sz_code(code):
|
szse_codes.append(code.encode())
|
elif tool.is_sh_code(code):
|
sse_codes.append(code.encode())
|
return sse_codes, szse_codes
|
|
def __unsubscribe(self, _codes):
|
sh, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
|
logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
|
if sh:
|
# 取消订阅逐笔委托
|
self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
if sz:
|
self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
|
def __subscribe(self, _codes):
|
sh, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
|
logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
|
if sh:
|
# 订阅逐笔委托
|
result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}")
|
if sz:
|
result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}")
|
|
def __process_codes_data(self, codes):
|
codes = set(codes)
|
if not self.is_login and not constant.TEST:
|
raise Exception("L2尚未登录")
|
add_codes = codes - self.subscripted_codes
|
del_codes = self.subscripted_codes - codes
|
printlog("add del codes", add_codes, del_codes)
|
self.__subscribe(add_codes)
|
self.__unsubscribe(del_codes)
|
# 设置最近的代码列表
|
self.latest_codes_set = codes
|
|
# 订阅代码,[代码,...]
|
def set_codes_data(self, codes):
|
printlog("订阅代码数量:", len(codes))
|
try:
|
self.__process_codes_data(codes)
|
except Exception as e:
|
logging.exception(e)
|
logger_l2_codes_subscript.exception(e)
|
finally:
|
# 保存一份最新的数据
|
pass
|
|
def get_market_info_dict(self):
|
return self.__market_info_dict
|
|
@classmethod
|
def __set_latest_datas(cls, codes_data):
|
data_str = json.dumps([tool.get_now_date_str(), codes_data])
|
with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
|
f.write(data_str)
|
|
@classmethod
|
def __get_latest_datas(cls):
|
if os.path.exists(constant.L2_CODES_INFO_PATH):
|
with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
|
str_ = f.readline()
|
data_json = json.loads(str_)
|
if data_json[0] == tool.get_now_date_str():
|
return data_json[1]
|
return []
|
|
def OnFrontConnected(self):
|
printlog("OnFrontConnected")
|
logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
|
logout_req = lev2mdapi.CTORATstpUserLogoutField()
|
self.__api.ReqUserLogout(logout_req, 1)
|
time.sleep(1)
|
# 请求登录
|
login_req = lev2mdapi.CTORATstpReqUserLoginField()
|
self.__api.ReqUserLogin(login_req, 2)
|
|
def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
|
printlog("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
|
pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
|
if pRspInfo['ErrorID'] == 0:
|
printlog("----L2行情登录成功----")
|
self.is_login = True
|
logger_system.info(f"L2行情登录成功")
|
# 初始设置值
|
# threading.Thread(
|
# target=lambda: self.__process_codes_data(self.__get_latest_datas()),
|
# daemon=True).start()
|
# 订阅L2
|
codes_sh, codes_sz = self.__split_codes(self.__codes)
|
codes = set()
|
for code in codes_sh:
|
codes.add(code.decode("utf-8"))
|
for code in codes_sz:
|
codes.add(code.decode("utf-8"))
|
self.set_codes_data(codes)
|
|
def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
|
FirstLevelSellOrderVolumes):
|
# 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5
|
try:
|
d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
|
"preClosePrice": pDepthMarketData['PreClosePrice'],
|
"lastPrice": pDepthMarketData['LastPrice'],
|
"totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'],
|
"totalValueTrade": pDepthMarketData['TotalValueTrade'],
|
"totalBidVolume": pDepthMarketData['TotalBidVolume'],
|
"avgBidPrice": pDepthMarketData['AvgBidPrice'],
|
"totalAskVolume": pDepthMarketData['TotalAskVolume'],
|
"avgAskPrice": pDepthMarketData["AvgAskPrice"],
|
"buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
|
(pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
|
(pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
|
(pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
|
(pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
|
"sell": [
|
(pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
|
(pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
|
(pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
|
(pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
|
(pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
|
]
|
}
|
self.__market_info_dict[d['securityID']] = d
|
except:
|
pass
|
|
|
def __init_l2(codes):
|
printlog(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
|
# case 1: Tcp方式
|
# g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
|
# case 2: 组播方式
|
g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
|
|
# case 1缓存模式
|
api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
|
# case 2非缓存模式
|
# api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
|
global spi
|
spi = Lev2MdSpi(api, codes)
|
api.RegisterSpi(spi)
|
# -------------------正式模式-------------------------------------
|
if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
|
api.RegisterFront(Front_Address)
|
else:
|
# case 1 从一个组播地址收取行情
|
api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
|
# api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "")
|
|
# case 2:注册多个组播地址同时收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "");
|
# api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "");
|
|
# case 3:efvi模式收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True);
|
|
# case 1 不绑核运行
|
api.Init()
|
|
|
__l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
|
|
pipe_strategy = None
|
|
|
def run(subscript_codes, market_data_call_back:L2MarketDataCallback) -> None:
|
"""
|
运行
|
:param market_data_call_back: 数据回调
|
:param subscript_codes:需要订阅的代码
|
:return:
|
"""
|
logger_system.info("L2进程ID:{}", os.getpid())
|
logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
|
try:
|
# log.close_print()
|
# 初始化
|
# data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
|
# l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)
|
__init_l2(subscript_codes)
|
threading.Thread(target=huaxin_l2_log.run_sync, daemon=True).start()
|
except Exception as e:
|
logger_system.exception(e)
|
while True:
|
try:
|
info_dict = spi.get_market_info_dict()
|
# 内容转换
|
fdatas = []
|
for code in info_dict:
|
d = info_dict[code]
|
#(代码,昨日收盘价,最新价,总成交量,总成交额,买五档(价格,成交额),卖五档(价格,成交额),更新时间)
|
fdata = (d["securityID"], d["preClosePrice"], d['lastPrice'], d['totalVolumeTrade'],
|
d['totalValueTrade'], d['buy'], d['sell'], d['dataTimeStamp'])
|
fdatas.append(fdata)
|
if market_data_call_back:
|
market_data_call_back.on_markets(fdatas)
|
except:
|
pass
|
finally:
|
time.sleep(2)
|
|
|
if __name__ == "__main__":
|
subscript_codes = {"000333"}
|
run(subscript_codes)
|