# -*- coding: utf-8 -*-
|
import json
|
import logging
|
import os
|
import queue
|
import threading
|
import time
|
|
import xmdapi
|
from huaxin_client import l1_subscript_codes_manager
|
from log_module.log import logger_system, logger_local_huaxin_l1, printlog
|
|
################B类##################
|
from utils import socket_util, tool
|
|
ADDRESS = "udp://224.224.1.19:7880"
|
|
SERVER_HOST = '43.138.167.68'
|
SERVER_PORT = 10008
|
|
level1_data_queue = queue.Queue()
|
|
|
def __send_response(sk, msg):
|
msg = socket_util.load_header(msg)
|
sk.sendall(msg)
|
result, header_str = socket_util.recv_data(sk)
|
if result:
|
result_json = json.loads(result)
|
if result_json.get("code") == 0:
|
return True
|
return False
|
|
|
class MdSpi(xmdapi.CTORATstpXMdSpi):
|
def __init__(self, api, codes_sh, codes_sz):
|
for i in range(3):
|
try:
|
self.codes_sh, self.codes_sz = codes_sh, codes_sz
|
break
|
except:
|
time.sleep(2)
|
xmdapi.CTORATstpXMdSpi.__init__(self)
|
self.__api = api
|
|
def OnFrontConnected(self):
|
printlog("OnFrontConnected")
|
|
# 请求登录,目前未校验登录用户,请求域置空即可
|
login_req = xmdapi.CTORATstpReqUserLoginField()
|
self.__api.ReqUserLogin(login_req, 1)
|
|
def subscribe_codes(self, codes_sh, codes_sz):
|
# 重新订阅代码
|
printlog(f"订阅数量:sh-{len(codes_sh)} sz-{len(codes_sz)}")
|
logger_system.info(f"订阅数量:sh-{len(codes_sh)} sz-{len(codes_sz)}")
|
if codes_sh:
|
ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
|
if ret != 0:
|
printlog('SubscribeMarketData fail, ret[%d]' % ret)
|
else:
|
printlog('SubscribeMarketData success, ret[%d]' % ret)
|
|
if codes_sz:
|
ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
|
if ret != 0:
|
printlog('SubscribeMarketData fail, ret[%d]' % ret)
|
else:
|
printlog('SubscribeMarketData success, ret[%d]' % ret)
|
|
def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID):
|
if pRspInfoField.ErrorID == 0:
|
printlog('Login success! [%d]' % nRequestID)
|
logger_system.info('Login success! [%d]' % nRequestID)
|
|
'''
|
订阅行情
|
当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_SSE或TORA_TSTP_EXD_SZSE时,订阅单市场所有合约行情
|
当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_COMM时,订阅全市场所有合约行情
|
其它情况,订阅sub_arr集合中的合约行情
|
'''
|
|
self.subscribe_codes(self.codes_sh, self.codes_sz)
|
# sub_arr = [b'600004']
|
# ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE)
|
# if ret != 0:
|
# printlog('UnSubscribeMarketData fail, ret[%d]' % ret)
|
# else:
|
# printlog('SubscribeMarketData success, ret[%d]' % ret)
|
|
|
else:
|
logger_system.info('Login fail!!! [%d] [%d] [%s]'
|
% (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
|
printlog('Login fail!!! [%d] [%d] [%s]'
|
% (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
|
|
def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField):
|
if pRspInfoField.ErrorID == 0:
|
printlog('OnRspSubMarketData: OK!')
|
else:
|
printlog('OnRspSubMarketData: Error! [%d] [%s]'
|
% (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
|
|
def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField):
|
if pRspInfoField.ErrorID == 0:
|
printlog('OnRspUnSubMarketData: OK!')
|
else:
|
printlog('OnRspUnSubMarketData: Error! [%d] [%s]'
|
% (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
|
|
def OnRtnMarketData(self, pMarketDataField):
|
if pMarketDataField.SecurityName.find("S") == 0:
|
return
|
if pMarketDataField.SecurityName.find("ST") >= 0:
|
return
|
"""
|
(代码,昨日收盘价,最新价,总成交量,总成交额,更新时间)
|
"""
|
level1_data_queue.put_nowait((
|
pMarketDataField.SecurityID, pMarketDataField.PreClosePrice, pMarketDataField.LastPrice,
|
pMarketDataField.Volume, pMarketDataField.Turnover,
|
[(pMarketDataField.BidPrice1, pMarketDataField.BidVolume1),
|
(pMarketDataField.BidPrice2, pMarketDataField.BidVolume2),
|
(pMarketDataField.BidPrice3, pMarketDataField.BidVolume3),
|
(pMarketDataField.BidPrice4, pMarketDataField.BidVolume4),
|
(pMarketDataField.BidPrice5, pMarketDataField.BidVolume5)],
|
[(pMarketDataField.AskPrice1, pMarketDataField.AskVolume1),
|
(pMarketDataField.AskPrice2, pMarketDataField.AskVolume2),
|
(pMarketDataField.AskPrice3, pMarketDataField.AskVolume3),
|
(pMarketDataField.AskPrice4, pMarketDataField.AskVolume4),
|
(pMarketDataField.AskPrice5, pMarketDataField.AskVolume5)],
|
pMarketDataField.UpdateTime))
|
|
|
def __upload_codes_info(datas):
|
printlog("上传数据数量", len(datas))
|
logger_system.info("上传数据数量:{}", len(datas))
|
# if not tool.is_trade_time():
|
# return
|
data_bytes = socket_util.load_header(json.dumps({"type": "l1_data", "data": datas}).encode("utf-8"))
|
# 上传数据
|
sk = socket_util.create_socket(SERVER_HOST, SERVER_PORT)
|
try:
|
sk.sendall(data_bytes)
|
finally:
|
sk.close()
|
# 上传数据
|
|
|
def __get_target_codes():
|
data_bytes = socket_util.load_header(json.dumps({"type": "get_l1_target_codes", "data": {}}).encode("utf-8"))
|
# 上传数据
|
sk = socket_util.create_socket(SERVER_HOST, SERVER_PORT)
|
try:
|
sk.sendall(data_bytes)
|
datas_str, header_str = socket_util.recv_data(sk)
|
data = json.loads(datas_str)
|
if data['code'] == 0:
|
return data['data']
|
finally:
|
sk.close()
|
return None
|
|
|
is_re_subscript = False
|
|
|
# 重新订阅代码
|
def re_subscript(spi: MdSpi):
|
try:
|
global is_re_subscript
|
if is_re_subscript:
|
return
|
is_re_subscript = True
|
codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes()
|
if len(codes_sh) > 100 and len(codes_sz) > 100:
|
logger_local_huaxin_l1.info(f"重新订阅 sh-{len(codes_sh)} sz-{len(codes_sz)}")
|
spi.subscribe_codes(codes_sh, codes_sz)
|
except:
|
pass
|
|
|
__position_codes = set()
|
|
|
def test_add_datas():
|
while True:
|
printlog("发送测试数据")
|
level1_data_queue.put_nowait(("000948", 12.91, 14.20, int(34.60 * 10000), int(4.9 * 1e8),
|
[(12.91, 100), (12.90, 100), (12.89, 100), (12.88, 100), (12.87, 100)],
|
[(12.91, 100), (12.90, 100), (12.89, 100), (12.88, 100), (12.87, 100)],
|
tool.get_now_time_str()))
|
time.sleep(3)
|
|
|
def run():
|
logger_local_huaxin_l1.info("运行l1订阅服务")
|
codes_sh = []
|
codes_sz = []
|
for i in range(15):
|
# 拉取数据
|
try:
|
codes = __get_target_codes()
|
if codes:
|
# 分离代码
|
for code in codes:
|
market = tool.get_market_type(code)
|
if market == tool.MARKET_TYPE_SZSE:
|
codes_sz.append(code.encode('utf-8'))
|
elif market == tool.MARKET_TYPE_SSE:
|
codes_sh.append(code.encode('utf-8'))
|
break
|
except Exception as e:
|
logger_local_huaxin_l1.exception(e)
|
time.sleep(4)
|
logger_system.info(f"获取L1订阅目标票数量:sh-{len(codes_sh)} sz-{len(codes_sz)}")
|
# 打印接口版本号
|
printlog(xmdapi.CTORATstpXMdApi_GetApiVersion())
|
|
# 创建接口对象
|
api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST)
|
|
# 创建回调对象
|
spi = MdSpi(api, codes_sh, codes_sz)
|
|
# 注册回调接口
|
api.RegisterSpi(spi)
|
|
# 注册单个行情前置服务地址
|
# api.RegisterFront("tcp://210.14.72.16:9402")
|
# 注册多个行情前置服务地址,用逗号隔开
|
# api.RegisterFront("tcp://10.0.1.101:6402,tcp://10.0.1.101:16402")
|
# 注册名字服务器地址,支持多服务地址逗号隔开
|
# api.RegisterNameServer('tcp://224.224.3.19:7888')
|
# api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370')
|
|
# -------------------------正式地址B类-------------------------------
|
api.RegisterMulticast(ADDRESS, None, "")
|
|
# -------------------------正式地址A类-------------------------------
|
# api.RegisterMulticast("udp://224.224.1.9:7880", None, "")
|
|
# 启动接口
|
api.Init()
|
|
logger_system.info("L1订阅服务启动成功")
|
# 测试链路
|
# level1_data_dict["000969"] = (
|
# "000969", 9.46, 9.11, 771000*100, time.time())
|
# level1_data_dict["002292"] = (
|
# "002292", 8.06, 9.96, 969500 * 100, time.time())
|
# 测试
|
# threading.Thread(target= lambda: test_add_datas(), daemon=True).start()
|
# 等待程序结束
|
while True:
|
try:
|
# (代码,现价,涨幅,量,买5,卖5, 时间)
|
datas = []
|
while True:
|
if not level1_data_queue.empty():
|
data = level1_data_queue.get()
|
datas.append(data)
|
else:
|
break
|
if datas:
|
__upload_codes_info(datas)
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
time.sleep(0.5)
|
# 释放接口对象
|
api.Release()
|
|
|
def run_async():
|
logger_system.info("L1进程ID:{}", os.getpid())
|
t1 = threading.Thread(target=lambda: run(), daemon=True)
|
t1.start()
|
|
|
if __name__ == "__main__":
|
pass
|