# -*- coding: utf-8 -*-
|
import collections
|
import json
|
import multiprocessing
|
import threading
|
import time
|
|
import xmdapi
|
from huaxin_client import constant
|
from log_module.log import logger_system, logger_local_huaxin_l1, logger_local_huaxin_l1_trade_info, printlog
|
|
################B类##################
|
from utils import socket_util, tool
|
|
ADDRESS = "udp://224.224.1.19:7880"
|
|
level1_data_dict = {
|
|
}
|
|
|
def __send_response(sk, msg):
|
msg = socket_util.load_header(msg)
|
sk.sendall(msg)
|
result, header_str = socket_util.recv_data(sk)
|
if result:
|
result_json = json.loads(result)
|
if result_json.get("code") == 0:
|
return True
|
return False
|
|
|
class MdSpi(xmdapi.CTORATstpXMdSpi):
|
l1_data_queue = collections.deque()
|
__subscribed_codes = set()
|
|
def __init__(self, api):
|
xmdapi.CTORATstpXMdSpi.__init__(self)
|
self.__api = api
|
|
def OnFrontConnected(self):
|
printlog("OnFrontConnected")
|
|
# 请求登录,目前未校验登录用户,请求域置空即可
|
login_req = xmdapi.CTORATstpReqUserLoginField()
|
self.__api.ReqUserLogin(login_req, 1)
|
|
def __seperate_codes(self, codes):
|
codes_sh = []
|
codes_sz = []
|
for code in codes:
|
if tool.is_sh_code(code):
|
codes_sh.append(code.encode("utf-8"))
|
elif tool.is_sz_code(code):
|
codes_sz.append(code.encode("utf-8"))
|
return codes_sh, codes_sz
|
|
# 订阅代码
|
def subscribe(self, codes: set):
|
del_codes = self.__subscribed_codes - codes
|
add_codes = codes - self.__subscribed_codes
|
if codes:
|
codes_sh, codes_sz = self.__seperate_codes(codes)
|
logger_local_huaxin_l1.info(f"新增订阅:{codes_sh} {codes_sz}")
|
if codes_sh:
|
ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
|
if ret != 0:
|
logger_local_huaxin_l1.info('SubscribeMarketData fail, ret[%d]' % ret)
|
else:
|
logger_local_huaxin_l1.info('SubscribeMarketData success, ret[%d]' % ret)
|
if codes_sz:
|
ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
|
if ret != 0:
|
logger_local_huaxin_l1.info('SubscribeMarketData fail, ret[%d]' % ret)
|
else:
|
logger_local_huaxin_l1.info('SubscribeMarketData success, ret[%d]' % ret)
|
|
if del_codes:
|
codes_sh, codes_sz = self.__seperate_codes(del_codes)
|
if codes_sh:
|
self.__api.UnSubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
|
if codes_sz:
|
self.__api.UnSubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
|
|
self.__subscribed_codes = set(codes)
|
|
def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID):
|
if pRspInfoField.ErrorID == 0:
|
logger_local_huaxin_l1.info('Login success! [%d]' % nRequestID)
|
|
'''
|
订阅行情
|
当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_SSE或TORA_TSTP_EXD_SZSE时,订阅单市场所有合约行情
|
当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_COMM时,订阅全市场所有合约行情
|
其它情况,订阅sub_arr集合中的合约行情
|
'''
|
# sub_arr = [b'600004']
|
# ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE)
|
# if ret != 0:
|
# printlog('UnSubscribeMarketData fail, ret[%d]' % ret)
|
# else:
|
# printlog('SubscribeMarketData success, ret[%d]' % ret)
|
|
|
else:
|
logger_local_huaxin_l1.info('Login fail!!! [%d] [%d] [%s]'
|
% (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
|
|
def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField):
|
if pRspInfoField.ErrorID == 0:
|
printlog('OnRspSubMarketData: OK!')
|
else:
|
logger_local_huaxin_l1.info('OnRspSubMarketData: Error! [%d] [%s]'
|
% (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
|
|
def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField):
|
if pRspInfoField.ErrorID == 0:
|
printlog('OnRspUnSubMarketData: OK!')
|
else:
|
printlog('OnRspUnSubMarketData: Error! [%d] [%s]'
|
% (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
|
|
def OnRtnMarketData(self, pMarketDataField):
|
if pMarketDataField.SecurityName.find("S") == 0:
|
return
|
if pMarketDataField.SecurityName.find("ST") >= 0:
|
return
|
rate = 0
|
item = (
|
pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume,
|
pMarketDataField.UpdateTime,
|
pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2,
|
pMarketDataField.BidVolume2)
|
self.l1_data_queue.append(item)
|
logger_local_huaxin_l1_trade_info.info(f"获取到L1数据:{item}")
|
|
# printlog(
|
# "SecurityID[%s] SecurityName[%s] LastPrice[%.2f] Volume[%d] Turnover[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]"
|
# % (pMarketDataField.SecurityID, pMarketDataField.SecurityName, pMarketDataField.LastPrice,
|
# pMarketDataField.Volume,
|
# pMarketDataField.Turnover, pMarketDataField.BidPrice1, pMarketDataField.BidVolume1,
|
# pMarketDataField.AskPrice1,
|
# pMarketDataField.AskVolume1, pMarketDataField.UpperLimitPrice, pMarketDataField.LowerLimitPrice))
|
|
|
def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas):
|
# 上传数据
|
type_ = "upload_l1_trade_datas"
|
request_id = f"sb_{int(time.time() * 1000)}"
|
fdata = json.dumps(
|
{"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)})
|
if queue_l1_w_strategy_r is not None:
|
queue_l1_w_strategy_r.put_nowait(fdata)
|
|
|
def __read_from_strategy(queue_l1_r_strategy_w: multiprocessing.Queue):
|
while True:
|
try:
|
data = queue_l1_r_strategy_w.get()
|
if type(data) == str:
|
data = json.loads(data)
|
if data["type"] == "set_target_codes":
|
codes = set(data["data"])
|
spi.subscribe(codes)
|
logger_local_huaxin_l1.info(f"收到策略消息:{data}", )
|
except Exception as e:
|
logger_local_huaxin_l1.exception(e)
|
finally:
|
time.sleep(1)
|
|
|
def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w):
|
logger_local_huaxin_l1.info("运行l1_for_trade订阅服务")
|
# 打印接口版本号
|
printlog(xmdapi.CTORATstpXMdApi_GetApiVersion())
|
|
# 创建接口对象
|
api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST)
|
# api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_TCP) # 测试
|
|
# 创建回调对象
|
global spi
|
spi = MdSpi(api)
|
|
# 注册回调接口
|
api.RegisterSpi(spi)
|
|
# 注册单个行情前置服务地址
|
# api.RegisterFront("tcp://210.14.72.16:9402")
|
# 注册多个行情前置服务地址,用逗号隔开
|
# api.RegisterFront("tcp://10.0.1.101:6402,tcp://10.0.1.101:16402")
|
# 注册名字服务器地址,支持多服务地址逗号隔开
|
# api.RegisterNameServer('tcp://224.224.3.19:7888')
|
# api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370')
|
|
# -------------------------正式地址B类-------------------------------
|
api.RegisterMulticast(ADDRESS, None, "")
|
# api.RegisterFront("tcp://210.14.72.16:9402") # 测试地址
|
|
# -------------------------正式地址A类-------------------------------
|
# api.RegisterMulticast("udp://224.224.1.9:7880", None, "")
|
|
# 启动接口
|
api.Init()
|
|
logger_system.info("L1订阅服务启动成功")
|
# TODO 测试链路
|
# spi.l1_data_queue.append((
|
# "000969", 9.46, 9.11, 771000*100, time.time(),9.46,10000))
|
# spi.l1_data_queue.append((
|
# "002292", 8.06, 9.96, 969500 * 100, time.time(),8.06,10000))
|
|
threading.Thread(target=__read_from_strategy, args=(queue_l1_trade_r_strategy_w,), daemon=True).start()
|
# 等待程序结束
|
while True:
|
try:
|
temp_datas = []
|
while len(spi.l1_data_queue) > 0:
|
data = spi.l1_data_queue.popleft()
|
temp_datas.append(data)
|
if temp_datas:
|
# 上传代码数据
|
__upload_codes_info(queue_l1_trade_w_strategy_r, temp_datas)
|
except Exception as e:
|
logger_local_huaxin_l1.exception(e)
|
finally:
|
time.sleep(0.01)
|
# 释放接口对象
|
api.Release()
|
|
|
def test_run():
|
def test_sub():
|
time.sleep(5)
|
queue_l1_trade_r_strategy_w.put_nowait(
|
json.dumps({"type": "set_target_codes", "data": ["603990"]}))
|
|
def read_data():
|
while True:
|
val = queue_l1_trade_w_strategy_r.get()
|
printlog(val)
|
|
queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w = multiprocessing.Queue(), multiprocessing.Queue()
|
threading.Thread(target=test_sub, daemon=True).start()
|
threading.Thread(target=read_data, daemon=True).start()
|
run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w)
|