# -*- coding: utf-8 -*- import json import logging import os import queue import threading import time import xmdapi from huaxin_client import l1_subscript_codes_manager from log_module.log import logger_system, logger_local_huaxin_l1, printlog ################B类################## from utils import socket_util, tool ADDRESS = "udp://224.224.1.19:7880" SERVER_HOST = '43.138.167.68' SERVER_PORT = 10008 level1_data_queue = queue.Queue() def __send_response(sk, msg): msg = socket_util.load_header(msg) sk.sendall(msg) result, header_str = socket_util.recv_data(sk) if result: result_json = json.loads(result) if result_json.get("code") == 0: return True return False class MdSpi(xmdapi.CTORATstpXMdSpi): def __init__(self, api, codes_sh, codes_sz): for i in range(3): try: self.codes_sh, self.codes_sz = codes_sh, codes_sz break except: time.sleep(2) xmdapi.CTORATstpXMdSpi.__init__(self) self.__api = api def OnFrontConnected(self): printlog("OnFrontConnected") # 请求登录,目前未校验登录用户,请求域置空即可 login_req = xmdapi.CTORATstpReqUserLoginField() self.__api.ReqUserLogin(login_req, 1) def subscribe_codes(self, codes_sh, codes_sz): # 重新订阅代码 printlog(f"订阅数量:sh-{len(codes_sh)} sz-{len(codes_sz)}") logger_system.info(f"订阅数量:sh-{len(codes_sh)} sz-{len(codes_sz)}") if codes_sh: ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE) if ret != 0: printlog('SubscribeMarketData fail, ret[%d]' % ret) else: printlog('SubscribeMarketData success, ret[%d]' % ret) if codes_sz: ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE) if ret != 0: printlog('SubscribeMarketData fail, ret[%d]' % ret) else: printlog('SubscribeMarketData success, ret[%d]' % ret) def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID): if pRspInfoField.ErrorID == 0: printlog('Login success! [%d]' % nRequestID) logger_system.info('Login success! [%d]' % nRequestID) ''' 订阅行情 当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_SSE或TORA_TSTP_EXD_SZSE时,订阅单市场所有合约行情 当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_COMM时,订阅全市场所有合约行情 其它情况,订阅sub_arr集合中的合约行情 ''' self.subscribe_codes(self.codes_sh, self.codes_sz) # sub_arr = [b'600004'] # ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE) # if ret != 0: # printlog('UnSubscribeMarketData fail, ret[%d]' % ret) # else: # printlog('SubscribeMarketData success, ret[%d]' % ret) else: logger_system.info('Login fail!!! [%d] [%d] [%s]' % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) printlog('Login fail!!! [%d] [%d] [%s]' % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField): if pRspInfoField.ErrorID == 0: printlog('OnRspSubMarketData: OK!') else: printlog('OnRspSubMarketData: Error! [%d] [%s]' % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField): if pRspInfoField.ErrorID == 0: printlog('OnRspUnSubMarketData: OK!') else: printlog('OnRspUnSubMarketData: Error! [%d] [%s]' % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRtnMarketData(self, pMarketDataField): if pMarketDataField.SecurityName.find("S") == 0: return if pMarketDataField.SecurityName.find("ST") >= 0: return """ (代码,昨日收盘价,最新价,总成交量,总成交额,更新时间) """ level1_data_queue.put_nowait(( pMarketDataField.SecurityID, pMarketDataField.PreClosePrice, pMarketDataField.LastPrice, pMarketDataField.Volume, pMarketDataField.Turnover, [(pMarketDataField.BidPrice1, pMarketDataField.BidVolume1), (pMarketDataField.BidPrice2, pMarketDataField.BidVolume2), (pMarketDataField.BidPrice3, pMarketDataField.BidVolume3), (pMarketDataField.BidPrice4, pMarketDataField.BidVolume4), (pMarketDataField.BidPrice5, pMarketDataField.BidVolume5)], [(pMarketDataField.AskPrice1, pMarketDataField.AskVolume1), (pMarketDataField.AskPrice2, pMarketDataField.AskVolume2), (pMarketDataField.AskPrice3, pMarketDataField.AskVolume3), (pMarketDataField.AskPrice4, pMarketDataField.AskVolume4), (pMarketDataField.AskPrice5, pMarketDataField.AskVolume5)], pMarketDataField.UpdateTime)) def __upload_codes_info(datas): printlog("上传数据数量", len(datas)) logger_system.info("上传数据数量:{}", len(datas)) # if not tool.is_trade_time(): # return data_bytes = socket_util.load_header(json.dumps({"type": "l1_data", "data": datas}).encode("utf-8")) # 上传数据 sk = socket_util.create_socket(SERVER_HOST, SERVER_PORT) try: sk.sendall(data_bytes) finally: sk.close() # 上传数据 def __get_target_codes(): data_bytes = socket_util.load_header(json.dumps({"type": "get_l1_target_codes", "data": {}}).encode("utf-8")) # 上传数据 sk = socket_util.create_socket(SERVER_HOST, SERVER_PORT) try: sk.sendall(data_bytes) datas_str, header_str = socket_util.recv_data(sk) data = json.loads(datas_str) if data['code'] == 0: return data['data'] finally: sk.close() return None is_re_subscript = False # 重新订阅代码 def re_subscript(spi: MdSpi): try: global is_re_subscript if is_re_subscript: return is_re_subscript = True codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes() if len(codes_sh) > 100 and len(codes_sz) > 100: logger_local_huaxin_l1.info(f"重新订阅 sh-{len(codes_sh)} sz-{len(codes_sz)}") spi.subscribe_codes(codes_sh, codes_sz) except: pass __position_codes = set() def test_add_datas(): while True: printlog("发送测试数据") level1_data_queue.put_nowait(("000948", 12.91, 14.20, int(34.60 * 10000), int(4.9 * 1e8), [(12.91, 100), (12.90, 100), (12.89, 100), (12.88, 100), (12.87, 100)], [(12.91, 100), (12.90, 100), (12.89, 100), (12.88, 100), (12.87, 100)], tool.get_now_time_str())) time.sleep(3) def run(): logger_local_huaxin_l1.info("运行l1订阅服务") codes_sh = [] codes_sz = [] for i in range(15): # 拉取数据 try: codes = __get_target_codes() if codes: # 分离代码 for code in codes: market = tool.get_market_type(code) if market == tool.MARKET_TYPE_SZSE: codes_sz.append(code.encode('utf-8')) elif market == tool.MARKET_TYPE_SSE: codes_sh.append(code.encode('utf-8')) break except Exception as e: logger_local_huaxin_l1.exception(e) time.sleep(4) logger_system.info(f"获取L1订阅目标票数量:sh-{len(codes_sh)} sz-{len(codes_sz)}") # 打印接口版本号 printlog(xmdapi.CTORATstpXMdApi_GetApiVersion()) # 创建接口对象 api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST) # 创建回调对象 spi = MdSpi(api, codes_sh, codes_sz) # 注册回调接口 api.RegisterSpi(spi) # 注册单个行情前置服务地址 # api.RegisterFront("tcp://210.14.72.16:9402") # 注册多个行情前置服务地址,用逗号隔开 # api.RegisterFront("tcp://10.0.1.101:6402,tcp://10.0.1.101:16402") # 注册名字服务器地址,支持多服务地址逗号隔开 # api.RegisterNameServer('tcp://224.224.3.19:7888') # api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370') # -------------------------正式地址B类------------------------------- api.RegisterMulticast(ADDRESS, None, "") # -------------------------正式地址A类------------------------------- # api.RegisterMulticast("udp://224.224.1.9:7880", None, "") # 启动接口 api.Init() logger_system.info("L1订阅服务启动成功") # 测试链路 # level1_data_dict["000969"] = ( # "000969", 9.46, 9.11, 771000*100, time.time()) # level1_data_dict["002292"] = ( # "002292", 8.06, 9.96, 969500 * 100, time.time()) # 测试 # threading.Thread(target= lambda: test_add_datas(), daemon=True).start() # 等待程序结束 while True: try: # (代码,现价,涨幅,量,买5,卖5, 时间) datas = [] while True: if not level1_data_queue.empty(): data = level1_data_queue.get() datas.append(data) else: break if datas: __upload_codes_info(datas) except Exception as e: logging.exception(e) finally: time.sleep(0.5) # 释放接口对象 api.Release() def run_async(): logger_system.info("L1进程ID:{}", os.getpid()) t1 = threading.Thread(target=lambda: run(), daemon=True) t1.start() if __name__ == "__main__": pass