# -*- coding: utf-8 -*- import json import logging import multiprocessing import os import threading import time from huaxin_client import socket_util, l1_subscript_codes_manager import xmdapi from huaxin_client import tool, constant from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug from third_data import custom_block_in_money_manager from utils import tool as out_tool ################B类################## ADDRESS = "udp://224.224.1.19:7880" ################A类################## if constant.IS_A: ADDRESS = "udp://224.224.1.9:7880" level1_data_dict = { } def __send_response(sk, msg): msg = socket_util.load_header(msg) sk.sendall(msg) result, header_str = socket_util.recv_data(sk) if result: result_json = json.loads(result) if result_json.get("code") == 0: return True return False class MdSpi(xmdapi.CTORATstpXMdSpi): def __init__(self, api, codes_sh, codes_sz): for i in range(3): try: self.codes_sh, self.codes_sz = codes_sh, codes_sz break except: time.sleep(2) xmdapi.CTORATstpXMdSpi.__init__(self) self.__api = api def OnFrontConnected(self): print("OnFrontConnected") # 请求登录,目前未校验登录用户,请求域置空即可 login_req = xmdapi.CTORATstpReqUserLoginField() self.__api.ReqUserLogin(login_req, 1) def subscribe_codes(self, codes_sh, codes_sz): # 重新订阅代码 if codes_sh: ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE) if ret != 0: # print('SubscribeMarketData fail, ret[%d]' % ret) pass else: # print('SubscribeMarketData success, ret[%d]' % ret) pass if codes_sz: ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE) if ret != 0: # print('SubscribeMarketData fail, ret[%d]' % ret) pass else: # print('SubscribeMarketData success, ret[%d]' % ret) pass def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID): if pRspInfoField.ErrorID == 0: # print('Login success! [%d]' % nRequestID) ''' 订阅行情 当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_SSE或TORA_TSTP_EXD_SZSE时,订阅单市场所有合约行情 当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_COMM时,订阅全市场所有合约行情 其它情况,订阅sub_arr集合中的合约行情 ''' self.subscribe_codes(self.codes_sh, self.codes_sz) # sub_arr = [b'600004'] # ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE) # if ret != 0: # print('UnSubscribeMarketData fail, ret[%d]' % ret) # else: # print('SubscribeMarketData success, ret[%d]' % ret) else: pass # print('Login fail!!! [%d] [%d] [%s]' # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField): if pRspInfoField.ErrorID == 0: # print('OnRspSubMarketData: OK!') pass else: # print('OnRspSubMarketData: Error! [%d] [%s]' # % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) pass def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField): if pRspInfoField.ErrorID == 0: # print('OnRspUnSubMarketData: OK!') pass else: pass # print('OnRspUnSubMarketData: Error! [%d] [%s]' # % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRtnMarketData(self, pMarketDataField): if pMarketDataField.SecurityName.find("S") == 0: return if pMarketDataField.SecurityName.find("ST") >= 0: return close_price = pMarketDataField.PreClosePrice lastPrice = pMarketDataField.LastPrice if pMarketDataField.BidPrice1: lastPrice = pMarketDataField.BidPrice1 rate = round((lastPrice - close_price) * 100 / close_price, 2) if out_tool.get_limit_up_rate(pMarketDataField.SecurityID) > 1.1001: # 涨停板20%以上的打折 rate = rate / 2 # (代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间) level1_data_dict[pMarketDataField.SecurityID] = ( pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(), pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2, pMarketDataField.BidVolume2, pMarketDataField.UpdateTime) __latest_subscript_codes = set() def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas): if not tool.is_trade_time() and not tool.is_pre_trade_time(): return # 上传数据 type_ = "set_target_codes" request_id = f"sb_{int(time.time() * 1000)}" fdata = json.dumps( {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)}) if queue_l1_w_strategy_r is not None: queue_l1_w_strategy_r.put_nowait(fdata) # 记录新增加的代码 codes = set([x[0] for x in datas]) add_codes = codes - __latest_subscript_codes __latest_subscript_codes.clear() for c in codes: __latest_subscript_codes.add(c) if add_codes: logger_local_huaxin_l1.info(f"({request_id})新增加订阅的代码:{add_codes}") # 重新订阅代码 def re_subscript(spi: MdSpi): try: codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes() if len(codes_sh) > 100 and len(codes_sz) > 100: logger_local_huaxin_l1.info(f"重新订阅 sh-{len(codes_sh)} sz-{len(codes_sz)}") spi.subscribe_codes(codes_sh, codes_sz) except: pass __position_codes = set() def __read_from_strategy(queue_l1_r_strategy_w: multiprocessing.Queue): while True: try: data = queue_l1_r_strategy_w.get() if type(data) == str: data = json.loads(data) if data["type"] == "set_position_codes": codes = set(data["data"]) global __position_codes __position_codes = codes logger_local_huaxin_l1.info(f"收到策略消息:{data}", ) except: pass finally: time.sleep(1) def __run_subscript_task(spi): """ 运行订阅任务,在9:19到9:29之间开始订阅 @return: """ is_re_subscript = False while True: try: # 判断是否需要重新订阅 if tool.is_pre_trade_time(): re_subscript(spi) is_re_subscript = True if is_re_subscript: break except: pass finally: time.sleep(3) def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fixed_codes=None): """ 运行l1订阅任务 @param queue_l1_w_strategy_r: L1方写,策略方读 @param queue_l1_r_strategy_w: L1方读,策略方写 @param fixed_codes: 固定要返回数据的代码 @return: """ if fixed_codes is None: fixed_codes = set() logger_local_huaxin_l1.info(f"运行l1订阅服务,固定代码:{fixed_codes}") codes_sh = [] codes_sz = [] for i in range(15): try: logger_local_huaxin_l1.info("开始获取目标代码") codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() logger_local_huaxin_l1.info(f"获取上证,深证代码数量:sh-{len(codes_sh)} sz-{len(codes_sz)}") break except Exception as e: logger_local_huaxin_l1.exception(e) time.sleep(4) logger_system.info(f"获取L1订阅目标票数量:sh-{len(codes_sh)} sz-{len(codes_sz)}") # 打印接口版本号 print(xmdapi.CTORATstpXMdApi_GetApiVersion()) # 创建接口对象 api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST) # 创建回调对象 spi = MdSpi(api, codes_sh, codes_sz) # 注册回调接口 api.RegisterSpi(spi) # 注册单个行情前置服务地址 # api.RegisterFront("tcp://210.14.72.16:9402") # 注册多个行情前置服务地址,用逗号隔开 # api.RegisterFront("tcp://10.0.1.101:6402,tcp://10.0.1.101:16402") # 注册名字服务器地址,支持多服务地址逗号隔开 # api.RegisterNameServer('tcp://224.224.3.19:7888') # api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370') # -------------------------正式地址B类------------------------------- api.RegisterMulticast(ADDRESS, None, "") # -------------------------正式地址A类------------------------------- # api.RegisterMulticast("udp://224.224.1.9:7880", None, "") # 启动接口 api.Init() logger_system.info("L1订阅服务启动成功") # 测试链路 # level1_data_dict["000969"] = ( # "000969", 9.46, 9.11, 771000*100, time.time()) # level1_data_dict["002292"] = ( # "002292", 8.06, 9.96, 969500 * 100, time.time()) threading.Thread(target=__read_from_strategy, args=(queue_l1_r_strategy_w,), daemon=True).start() threading.Thread(target=__run_subscript_task, args=(spi,), daemon=True).start() # 等待程序结束 while True: # print("数量", len(level1_data_dict)) try: if len(level1_data_dict) < 1: continue # 根据涨幅排序 # (代码,现价,涨幅,量,时间) list_ = [level1_data_dict[k] for k in level1_data_dict] flist = [] now_time_int = int(tool.get_now_time_str().replace(":", "")) threshold_rate = constant.L1_MIN_RATE for d in list_: if d[2] >= threshold_rate or d[0] in fixed_codes: # 涨幅小于3%的需要删除 flist.append(d) flist.sort(key=lambda x: x[2], reverse=True) # 将固定代码的排在最前 for code in fixed_codes: if code in level1_data_dict: flist.insert(0, level1_data_dict[code]) # 正式交易之前先处理比较少的数据,不然处理时间久造成数据拥堵 MAX_COUNT = 500 if now_time_int < int("092600"): MAX_COUNT = 200 elif now_time_int < int("092800"): MAX_COUNT = 300 elif now_time_int < int("092900"): MAX_COUNT = 400 datas = flist[:MAX_COUNT] if len(datas) > 0: logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas)) __upload_codes_info(queue_l1_w_strategy_r, datas) except Exception as e: logging.exception(e) logger_debug.exception(e) finally: time.sleep(3) # 释放接口对象 api.Release() def run_async(pipe_l2): logger_system.info("L1进程ID:{}", os.getpid()) t1 = threading.Thread(target=lambda: run(pipe_l2), daemon=True) t1.start() if __name__ == "__main__": pass