# -*- coding: utf-8 -*-
|
import json
|
import logging
|
import os
|
import queue
|
import threading
|
import time
|
|
from huaxin_client import command_manager, l2_data_transform_protocol
|
from huaxin_client import constant
|
from huaxin_client import l2_data_manager
|
import lev2mdapi
|
from huaxin_client.command_manager import L2ActionCallback
|
from log_module import log, async_log_util
|
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
|
logger_local_huaxin_g_cancel, logger_l2_codes_subscript
|
from utils import tool
|
|
Front_Address = "tcp://10.0.1.101:6900"
|
Multicast_Address = "udp://224.224.2.19:7889"
|
Multicast_Address2 = "udp://224.224.224.234:7890"
|
Local_Interface_Address = "192.168.84.75"
|
Sender_Interface_Address = "10.0.1.101"
|
|
g_SubMarketData = False
|
g_SubTransaction = False
|
g_SubOrderDetail = False
|
g_SubXTSTick = False
|
g_SubXTSMarketData = False
|
g_SubNGTSTick = False
|
g_SubBondMarketData = False
|
g_SubBondTransaction = False
|
g_SubBondOrderDetail = False
|
|
SH_Securities = [b"603000", b"600225", b"600469", b"600616", b"600059", b"002849", b"605188", b"603630", b"600105",
|
b"603773", b"603915", b"603569", b"603322", b"603798", b"605198", b"603079", b"600415", b"600601"]
|
SH_XTS_Securities = [b"018003", b"113565"]
|
|
SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
|
b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
|
SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
|
spi = None
|
set_codes_data_queue = queue.Queue()
|
market_code_dict = {}
|
|
|
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
|
latest_codes_set = set()
|
codes_volume_and_price_dict = {}
|
special_code_volume_for_order_dict = {}
|
# 已经订阅的代码
|
subscripted_codes = set()
|
# 代码的上次成交的订单唯一索引
|
__last_transaction_keys_dict = {}
|
|
# 买入的大单订单号
|
|
def __init__(self, api):
|
lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
|
self.__api = api
|
self.is_login = False
|
|
def __split_codes(self, codes):
|
szse_codes = []
|
sse_codes = []
|
for code in codes:
|
if code.find("00") == 0:
|
szse_codes.append(code.encode())
|
elif code.find("60") == 0:
|
sse_codes.append(code.encode())
|
return sse_codes, szse_codes
|
|
# 新增订阅
|
|
# 取消订阅
|
def __unsubscribe(self, _codes):
|
sh, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
|
logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
|
if sh:
|
# 取消订阅逐笔委托
|
self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
# 取消订阅逐笔成交
|
self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
if sz:
|
self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
|
def __subscribe(self, _codes):
|
sh, sz = self.__split_codes(_codes)
|
logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
|
logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
|
if sh:
|
# 订阅逐笔委托
|
result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}")
|
# 订阅逐笔成交
|
result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
|
|
result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
|
logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}")
|
if sz:
|
result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz:{result}")
|
result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
|
result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
|
logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}")
|
|
def __process_codes_data(self, codes_data):
|
|
if not self.is_login and not constant.TEST:
|
raise Exception("L2尚未登录")
|
|
codes = set()
|
for d in codes_data:
|
code = d[0]
|
codes.add(code)
|
self.codes_volume_and_price_dict[code] = (d[1], d[2])
|
add_codes = codes - self.subscripted_codes
|
del_codes = self.subscripted_codes - codes
|
print("add del codes", add_codes, del_codes)
|
for c in codes:
|
l2_data_manager.add_target_code(c)
|
for c in del_codes:
|
l2_data_manager.del_target_code(c)
|
for c in add_codes:
|
l2_data_manager.run_upload_task(c, l2_data_callback)
|
self.__subscribe(add_codes)
|
self.__unsubscribe(del_codes)
|
|
if add_codes:
|
logger_system.info(f"新增L2订阅代码数量:{len(add_codes)}")
|
|
logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes))
|
|
# 设置最近的代码列表
|
self.latest_codes_set = codes
|
|
# 订阅代码,[(代码,最低手数,涨停价)]
|
def set_codes_data(self, codes_data):
|
try:
|
self.__process_codes_data(codes_data)
|
except Exception as e:
|
logger_l2_codes_subscript.exception(e)
|
|
def set_code_special_watch_volume(self, code, volume):
|
# 有效期为2s
|
self.special_code_volume_for_order_dict[code] = (volume, time.time() + 2)
|
async_log_util.info(logger_local_huaxin_l2_subscript,f"设置下单量监听:{code}-{volume}")
|
|
|
|
def OnFrontConnected(self):
|
print("OnFrontConnected")
|
|
logout_req = lev2mdapi.CTORATstpUserLogoutField()
|
self.__api.ReqUserLogout(logout_req, 1)
|
time.sleep(1)
|
# 请求登录
|
login_req = lev2mdapi.CTORATstpReqUserLoginField()
|
self.__api.ReqUserLogin(login_req, 2)
|
|
def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
|
print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
|
pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
|
if pRspInfo['ErrorID'] == 0:
|
print("----L2行情登录成功----")
|
self.is_login = True
|
# t1 = threading.Thread(target=lambda: self.__set_codes_data(), daemon=True)
|
# # 后台运行
|
# t1.start()
|
|
def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubMarketData")
|
|
def OnRspSubIndex(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubIndex")
|
|
def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubTransaction")
|
|
def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubOrderDetail", pRspInfo)
|
# try:
|
print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
|
pRspInfo["ErrorMsg"])
|
logger_local_huaxin_l2_subscript.info(
|
f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
|
if pRspInfo["ErrorID"] == 0:
|
print("订阅成功")
|
self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
|
if bIsLast == 1:
|
print("订阅响应结束", self.subscripted_codes)
|
l2_data_manager.add_subscript_codes(self.subscripted_codes)
|
|
def OnRspUnSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspUnSubOrderDetail", bIsLast)
|
try:
|
code = pSpecificSecurity['SecurityID']
|
self.subscripted_codes.discard(code)
|
if bIsLast == 1:
|
print("取消订阅响应结束", self.subscripted_codes)
|
l2_data_manager.add_subscript_codes(self.subscripted_codes)
|
except Exception as e:
|
logging.exception(e)
|
|
def OnRspSubBondMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubBondMarketData")
|
|
def OnRspSubBondTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubBondTransaction")
|
|
def OnRspSubBondOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubBondOrderDetail")
|
|
def OnRspSubXTSMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubXTSMarketData")
|
|
def OnRspSubXTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubXTSTick")
|
|
# 4.0.5版本接口
|
def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
|
print("OnRspSubNGTSTick")
|
|
def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
|
FirstLevelSellOrderVolumes):
|
# 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5
|
d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
|
"lastPrice": pDepthMarketData['LastPrice'],
|
"totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'],
|
"buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
|
(pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
|
(pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
|
(pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
|
(pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
|
"sell": [
|
(pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
|
(pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
|
(pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
|
(pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
|
(pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
|
]}
|
market_code_dict[pDepthMarketData['SecurityID']] = time.time()
|
|
l2_data_manager.add_market_data(d)
|
|
# 输出行情快照数据
|
# print(
|
# "OnRtnMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % (
|
# pDepthMarketData['SecurityID'],
|
# pDepthMarketData['LastPrice'],
|
# pDepthMarketData['TotalValueTrade'],
|
# pDepthMarketData['TotalValueTrade'],
|
# pDepthMarketData['BidPrice1'],
|
# pDepthMarketData['BidVolume1'],
|
# pDepthMarketData['AskPrice1'],
|
# pDepthMarketData['AskVolume1']))
|
# # 输出一档价位买队列前50笔委托数量
|
# for buy_index in range(0, FirstLevelBuyNum):
|
# print("first level buy [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index]))
|
#
|
# # 输出一档价位卖队列前50笔委托数量
|
# for sell_index in range(0, FirstLevelSellNum):
|
# print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
|
|
def OnRtnIndex(self, pIndex):
|
# 输出指数行情数据
|
print(
|
"OnRtnIndex SecurityID[%s] LastIndex[%.2f] LowIndex[%.2f] HighIndex[%.2f] TotalVolumeTraded[%d] Turnover[%.2f]" % (
|
pIndex['SecurityID'],
|
pIndex['LastIndex'],
|
pIndex['LowIndex'],
|
pIndex['HighIndex'],
|
pIndex['TotalVolumeTraded'],
|
pIndex['Turnover']))
|
|
def OnRtnTransaction(self, pTransaction):
|
code = str(pTransaction['SecurityID'])
|
min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
|
# 输出逐笔成交数据
|
if pTransaction['ExecType'] == b"2":
|
transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code)
|
if transaction_big_order_nos and pTransaction['BuyNo'] in transaction_big_order_nos:
|
# 正在成交的订单撤单了
|
l2_data_manager.trading_order_canceled(code, pTransaction['BuyNo'])
|
async_log_util.info(logger_local_huaxin_g_cancel, f"G撤撤单:{code} - {pTransaction['BuyNo']}")
|
if min_volume is None:
|
# 默认筛选50w
|
if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
|
return
|
elif pTransaction['TradeVolume'] < min_volume:
|
return
|
# 撤单
|
item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'],
|
"Volume": pTransaction['TradeVolume'],
|
"OrderType": "2",
|
"OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
|
"SubSeq": pTransaction['SubSeq'],
|
"OrderStatus": "D"}
|
buyNo = pTransaction['BuyNo']
|
sellNo = pTransaction['SellNo']
|
if buyNo > 0:
|
# 买
|
item["OrderNO"] = buyNo
|
item["Side"] = "1"
|
elif sellNo > 0:
|
# 卖
|
item["OrderNO"] = sellNo
|
item["Side"] = "2"
|
# 深证撤单
|
print("逐笔委托", item)
|
|
l2_data_manager.add_l2_order_detail(item, True)
|
else:
|
if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201:
|
# 涨停价
|
# 成交
|
item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
|
"TradeVolume": pTransaction['TradeVolume'],
|
"OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
|
"SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
|
"SellNo": pTransaction['SellNo'],
|
"ExecType": pTransaction['ExecType'].decode()}
|
# 暂时注释掉同1单号至多上传1次
|
# key = f"{item['SecurityID']}_{item['TradePrice']}_{item['BuyNo']}"
|
# if self.__last_transaction_keys_dict.get(code) == key:
|
# return
|
# self.__last_transaction_keys_dict[code] = key
|
# print("逐笔成交", item)
|
l2_data_manager.add_transaction_detail(item)
|
|
# logger_local_huaxin_l2_transaction.info(
|
# "OnRtnTransaction SecurityID[%s] TradePrice[%.2f] TradeVolume[%d] TradeTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d] ExecType[%s]" % (
|
# pTransaction['SecurityID'],
|
# pTransaction['TradePrice'],
|
# pTransaction['TradeVolume'],
|
# pTransaction['TradeTime'],
|
# pTransaction['MainSeq'],
|
# pTransaction['SubSeq'],
|
# pTransaction['BuyNo'],
|
# pTransaction['SellNo'],
|
# pTransaction['ExecType'],
|
# ))
|
|
def OnRtnOrderDetail(self, pOrderDetail):
|
can_listen = False
|
code = str(pOrderDetail['SecurityID'])
|
if code in self.special_code_volume_for_order_dict and self.special_code_volume_for_order_dict[code][0] == \
|
pOrderDetail['Volume']:
|
if self.special_code_volume_for_order_dict[code][1] > time.time():
|
# 特殊量监听
|
can_listen = True
|
else:
|
self.special_code_volume_for_order_dict.pop(code)
|
if not can_listen:
|
|
if pOrderDetail['OrderStatus'] == b'D':
|
transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code)
|
if transaction_big_order_nos and pOrderDetail['OrderNO'] in transaction_big_order_nos:
|
# 正在成交的订单撤单了
|
l2_data_manager.trading_order_canceled(code, pOrderDetail['OrderNO'])
|
async_log_util.info(logger_local_huaxin_g_cancel, f"G撤撤单:{code} - {pOrderDetail['OrderNO']}")
|
|
min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
|
if min_volume is None:
|
# 默认筛选50w
|
if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000:
|
return
|
elif pOrderDetail['Volume'] < min_volume:
|
return
|
# 输出逐笔委托数据
|
# 上证OrderStatus=b"D"表示撤单
|
item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
|
"Volume": pOrderDetail['Volume'],
|
"Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(),
|
"OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
|
"SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
|
"OrderStatus": pOrderDetail['OrderStatus'].decode()}
|
print("逐笔委托", item)
|
l2_data_manager.add_l2_order_detail(item)
|
|
# logger_local_huaxin_l2_orderdetail.info(
|
# "OnRtnOrderDetail SecurityID[%s] Price[%.2f] Volume[%d] Side[%s] OrderType[%s] OrderTime[%d] MainSeq[%d] SubSeq[%d] OrderNO[%s] OrderStatus[%s] Info1[%d] Info2[%d] Info3[%d]" % (
|
# pOrderDetail['SecurityID'],
|
# pOrderDetail['Price'],
|
# pOrderDetail['Volume'],
|
# pOrderDetail['Side'],
|
# pOrderDetail['OrderType'],
|
# pOrderDetail['OrderTime'],
|
# pOrderDetail['MainSeq'],
|
# pOrderDetail['SubSeq'],
|
# pOrderDetail['OrderNO'],
|
# pOrderDetail['OrderStatus'],
|
# pOrderDetail['Info1'],
|
# pOrderDetail['Info2'],
|
# pOrderDetail['Info3']
|
# ))
|
|
def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
|
FirstLevelSellOrderVolumes):
|
# 输出行情快照数据
|
print(
|
"OnRtnBondMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % (
|
pDepthMarketData['SecurityID'],
|
pDepthMarketData['LastPrice'],
|
pDepthMarketData['TotalValueTrade'],
|
pDepthMarketData['TotalValueTrade'],
|
pDepthMarketData['BidPrice1'],
|
pDepthMarketData['BidVolume1'],
|
pDepthMarketData['AskPrice1'],
|
pDepthMarketData['AskVolume1']))
|
|
# 输出一档价位买队列前50笔委托数量
|
for buy_index in range(0, FirstLevelBuyNum):
|
print("first level buy [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index]))
|
|
# 输出一档价位卖队列前50笔委托数量
|
for sell_index in range(0, FirstLevelSellNum):
|
print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
|
|
def OnRtnBondTransaction(self, pTransaction):
|
# 输出逐笔成交数据
|
print(
|
"OnRtnBondTransaction SecurityID[%s] TradePrice[%.2f] TradeVolume[%d] TradeTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d] ExecType[%d]" % (
|
pTransaction['SecurityID'],
|
pTransaction['TradePrice'],
|
pTransaction['TradeVolume'],
|
pTransaction['TradeTime'],
|
pTransaction['MainSeq'],
|
pTransaction['SubSeq'],
|
pTransaction['BuyNo'],
|
pTransaction['SellNo'],
|
pTransaction['ExecType'],
|
))
|
|
def OnRtnBondOrderDetail(self, pOrderDetail):
|
# 输出逐笔委托数据
|
print(
|
"OnRtnBondOrderDetail SecurityID[%s] Price[%.2f] Volume[%d] Side[%s] OrderType[%s] OrderTime[%d] MainSeq[%d] SubSeq[%d]" % (
|
pOrderDetail['SecurityID'],
|
pOrderDetail['Price'],
|
pOrderDetail['Volume'],
|
pOrderDetail['Side'],
|
pOrderDetail['OrderType'],
|
pOrderDetail['OrderTime'],
|
pOrderDetail['MainSeq'],
|
pOrderDetail['SubSeq']))
|
|
def OnRtnXTSMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
|
FirstLevelSellOrderVolumes):
|
# 输出行情快照数据
|
print(
|
"OnRtnXTSMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % (
|
pDepthMarketData['SecurityID'],
|
pDepthMarketData['LastPrice'],
|
pDepthMarketData['TotalValueTrade'],
|
pDepthMarketData['TotalValueTrade'],
|
pDepthMarketData['BidPrice1'],
|
pDepthMarketData['BidVolume1'],
|
pDepthMarketData['AskPrice1'],
|
pDepthMarketData['AskVolume1']))
|
|
# 输出一档价位买队列前50笔委托数量
|
for buy_index in range(0, FirstLevelBuyNum):
|
print("first level buy [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index]))
|
|
# 输出一档价位卖队列前50笔委托数量
|
for sell_index in range(0, FirstLevelSellNum):
|
print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
|
|
def OnRtnXTSTick(self, pTick):
|
# 输出上海债券逐笔数据’
|
print(
|
"OnXTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
|
pTick['TickType'],
|
pTick['SecurityID'],
|
pTick['Price'],
|
pTick['Volume'],
|
pTick['TickTime'],
|
pTick['MainSeq'],
|
pTick['SubSeq'],
|
pTick['BuyNo'],
|
pTick['SellNo']))
|
|
def OnRtnNGTSTick(self, pTick):
|
# 输出上海股基逐笔数据’
|
print(
|
"OnRtnNGTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
|
pTick['TickType'],
|
pTick['SecurityID'],
|
pTick['Price'],
|
pTick['Volume'],
|
pTick['TickTime'],
|
pTick['MainSeq'],
|
pTick['SubSeq'],
|
pTick['BuyNo'],
|
pTick['SellNo']))
|
|
|
class MyL2ActionCallback(L2ActionCallback):
|
|
def OnSetL2Position(self, client_id, request_id, codes_data):
|
print("L2订阅数量:", len(codes_data))
|
logger_l2_codes_subscript.info("华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data))
|
try:
|
spi.set_codes_data(codes_data)
|
except Exception as e:
|
logging.exception(e)
|
|
|
def __init_l2():
|
print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
|
# case 1: Tcp方式
|
# g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
|
# case 2: 组播方式
|
g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
|
|
# case 1缓存模式
|
api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
|
# case 2非缓存模式
|
# api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
|
global spi
|
spi = Lev2MdSpi(api)
|
api.RegisterSpi(spi)
|
# -------------------正式模式-------------------------------------
|
if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
|
api.RegisterFront(Front_Address)
|
else:
|
# case 1 从一个组播地址收取行情
|
api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
|
|
# case 2:注册多个组播地址同时收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, Sender_Interface_Address);
|
# api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, Sender_Interface_Address);
|
|
# case 3:efvi模式收行情
|
# api.RegisterMulticast(Multicast_Address, Local_Interface_Address, Sender_Interface_Address, "enp101s0f0",4096, True);
|
|
# case 1 不绑核运行
|
api.Init()
|
|
|
def __receive_from_pipe_trade(pipe):
|
while True:
|
try:
|
value = pipe.recv()
|
if value:
|
value = value.decode("utf-8")
|
data = json.loads(value)
|
if data["type"] == "listen_volume":
|
volume = data["data"]["volume"]
|
code = data["data"]["code"]
|
spi.set_code_special_watch_volume(code, volume)
|
except Exception as e:
|
logging.exception(e)
|
|
|
def __receive_from_pipe_strategy(pipe_):
|
while True:
|
# print("__receive_from_pipe_strategy")
|
try:
|
val = pipe_.recv()
|
if val:
|
print("L2客户端接受到数据")
|
data = json.loads(val)
|
if data["data"]["type"] == "l2_cmd":
|
l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
|
except Exception as e:
|
logging.exception(e)
|
|
|
pipe_strategy = None
|
|
|
def run(pipe_trade, _pipe_strategy, _l2_data_callback: l2_data_transform_protocol.L2DataCallBack)->None:
|
logger_system.info("L2进程ID:{}", os.getpid())
|
logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
|
try:
|
log.close_print()
|
if pipe_trade is not None:
|
t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(pipe_trade), daemon=True)
|
t1.start()
|
if _pipe_strategy is not None:
|
global pipe_strategy
|
pipe_strategy = _pipe_strategy
|
t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True)
|
t1.start()
|
__init_l2()
|
|
global l2_data_callback
|
l2_data_callback = _l2_data_callback
|
l2_data_manager.run_upload_common(l2_data_callback)
|
l2_data_manager.run_upload_trading_canceled(l2_data_callback)
|
l2_data_manager.run_log()
|
l2_data_manager.run_upload_daemon(l2_data_callback)
|
# l2_data_manager.run_test(l2_data_callback)
|
global l2CommandManager
|
l2CommandManager = command_manager.L2CommandManager()
|
l2CommandManager.init(MyL2ActionCallback())
|
logger_system.info("L2订阅服务启动成功")
|
except Exception as e:
|
logger_system.exception(e)
|
while True:
|
time.sleep(2)
|
|
|
if __name__ == "__main__":
|
run(None, None, None)
|
# spi.set_codes_data([("000333", 12000)])
|
input()
|