New file |
| | |
| | | # -*- coding: utf-8 -*- |
| | | import json |
| | | import logging |
| | | import os |
| | | import queue |
| | | import threading |
| | | import time |
| | | |
| | | import xmdapi |
| | | from huaxin_client import l1_subscript_codes_manager |
| | | from log_module.log import logger_system, logger_local_huaxin_l1 |
| | | |
| | | ################B类################## |
| | | from utils import socket_util, tool |
| | | |
| | | ADDRESS = "udp://224.224.1.19:7880" |
| | | |
| | | SERVER_HOST = '43.138.167.68' |
| | | SERVER_PORT = 12881 |
| | | |
| | | level1_data_queue = queue.Queue() |
| | | |
| | | |
| | | def __send_response(sk, msg): |
| | | msg = socket_util.load_header(msg) |
| | | sk.sendall(msg) |
| | | result, header_str = socket_util.recv_data(sk) |
| | | if result: |
| | | result_json = json.loads(result) |
| | | if result_json.get("code") == 0: |
| | | return True |
| | | return False |
| | | |
| | | |
| | | class MdSpi(xmdapi.CTORATstpXMdSpi): |
| | | def __init__(self, api, codes_sh, codes_sz): |
| | | for i in range(3): |
| | | try: |
| | | self.codes_sh, self.codes_sz = codes_sh, codes_sz |
| | | break |
| | | except: |
| | | time.sleep(2) |
| | | xmdapi.CTORATstpXMdSpi.__init__(self) |
| | | self.__api = api |
| | | |
| | | def OnFrontConnected(self): |
| | | print("OnFrontConnected") |
| | | |
| | | # 请求登录,目前未校验登录用户,请求域置空即可 |
| | | login_req = xmdapi.CTORATstpReqUserLoginField() |
| | | self.__api.ReqUserLogin(login_req, 1) |
| | | |
| | | def subscribe_codes(self, codes_sh, codes_sz): |
| | | # 重新订阅代码 |
| | | print(f"订阅数量:sh-{len(codes_sh)} sz-{len(codes_sz)}") |
| | | if codes_sh: |
| | | ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE) |
| | | if ret != 0: |
| | | print('SubscribeMarketData fail, ret[%d]' % ret) |
| | | else: |
| | | print('SubscribeMarketData success, ret[%d]' % ret) |
| | | |
| | | if codes_sz: |
| | | ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE) |
| | | if ret != 0: |
| | | print('SubscribeMarketData fail, ret[%d]' % ret) |
| | | else: |
| | | print('SubscribeMarketData success, ret[%d]' % ret) |
| | | |
| | | def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID): |
| | | if pRspInfoField.ErrorID == 0: |
| | | print('Login success! [%d]' % nRequestID) |
| | | |
| | | ''' |
| | | 订阅行情 |
| | | 当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_SSE或TORA_TSTP_EXD_SZSE时,订阅单市场所有合约行情 |
| | | 当sub_arr中只有一个"00000000"的合约且ExchangeID填TORA_TSTP_EXD_COMM时,订阅全市场所有合约行情 |
| | | 其它情况,订阅sub_arr集合中的合约行情 |
| | | ''' |
| | | |
| | | self.subscribe_codes(self.codes_sh, self.codes_sz) |
| | | # sub_arr = [b'600004'] |
| | | # ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE) |
| | | # if ret != 0: |
| | | # print('UnSubscribeMarketData fail, ret[%d]' % ret) |
| | | # else: |
| | | # print('SubscribeMarketData success, ret[%d]' % ret) |
| | | |
| | | |
| | | else: |
| | | print('Login fail!!! [%d] [%d] [%s]' |
| | | % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | |
| | | def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField): |
| | | if pRspInfoField.ErrorID == 0: |
| | | print('OnRspSubMarketData: OK!') |
| | | else: |
| | | print('OnRspSubMarketData: Error! [%d] [%s]' |
| | | % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | |
| | | def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField): |
| | | if pRspInfoField.ErrorID == 0: |
| | | print('OnRspUnSubMarketData: OK!') |
| | | else: |
| | | print('OnRspUnSubMarketData: Error! [%d] [%s]' |
| | | % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | |
| | | def OnRtnMarketData(self, pMarketDataField): |
| | | if pMarketDataField.SecurityName.find("S") == 0: |
| | | return |
| | | if pMarketDataField.SecurityName.find("ST") >= 0: |
| | | return |
| | | """ |
| | | (代码,昨日收盘价,最新价,总成交量,总成交额,更新时间) |
| | | """ |
| | | level1_data_queue.put_nowait(( |
| | | pMarketDataField.SecurityID, pMarketDataField.PreClosePrice, pMarketDataField.LastPrice, |
| | | pMarketDataField.Volume, pMarketDataField.Turnover, pMarketDataField.UpdateTime)) |
| | | |
| | | |
| | | def __upload_codes_info(datas): |
| | | if not tool.is_trade_time(): |
| | | return |
| | | data_bytes = socket_util.load_header(json.dumps({"type": "l1_data", "data": datas})) |
| | | # 上传数据 |
| | | sk = socket_util.create_socket(SERVER_HOST, SERVER_PORT) |
| | | try: |
| | | sk.sendall(data_bytes) |
| | | finally: |
| | | sk.close() |
| | | # 上传数据 |
| | | |
| | | |
| | | is_re_subscript = False |
| | | |
| | | |
| | | # 重新订阅代码 |
| | | def re_subscript(spi: MdSpi): |
| | | try: |
| | | global is_re_subscript |
| | | if is_re_subscript: |
| | | return |
| | | is_re_subscript = True |
| | | codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes() |
| | | if len(codes_sh) > 100 and len(codes_sz) > 100: |
| | | logger_local_huaxin_l1.info(f"重新订阅 sh-{len(codes_sh)} sz-{len(codes_sz)}") |
| | | spi.subscribe_codes(codes_sh, codes_sz) |
| | | except: |
| | | pass |
| | | |
| | | |
| | | __position_codes = set() |
| | | |
| | | |
| | | def test_add_datas(): |
| | | while True: |
| | | level1_data_queue.put_nowait(("000948", 12.91, 14.20, int(34.60 * 10000), 4.9 * 1e8, tool.get_now_time_str())) |
| | | time.sleep(3) |
| | | |
| | | |
| | | def run(): |
| | | logger_local_huaxin_l1.info("运行l1订阅服务") |
| | | codes_sh = [] |
| | | codes_sz = [] |
| | | for i in range(15): |
| | | try: |
| | | codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() |
| | | logger_local_huaxin_l1.info(f"获取上证,深证代码数量:sh-{len(codes_sh)} sz-{len(codes_sz)}") |
| | | break |
| | | except Exception as e: |
| | | logger_local_huaxin_l1.exception(e) |
| | | time.sleep(4) |
| | | logger_system.info(f"获取L1订阅目标票数量:sh-{len(codes_sh)} sz-{len(codes_sz)}") |
| | | # 打印接口版本号 |
| | | print(xmdapi.CTORATstpXMdApi_GetApiVersion()) |
| | | |
| | | # 创建接口对象 |
| | | api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST) |
| | | |
| | | # 创建回调对象 |
| | | spi = MdSpi(api, codes_sh, codes_sz) |
| | | |
| | | # 注册回调接口 |
| | | api.RegisterSpi(spi) |
| | | |
| | | # 注册单个行情前置服务地址 |
| | | # api.RegisterFront("tcp://210.14.72.16:9402") |
| | | # 注册多个行情前置服务地址,用逗号隔开 |
| | | # api.RegisterFront("tcp://10.0.1.101:6402,tcp://10.0.1.101:16402") |
| | | # 注册名字服务器地址,支持多服务地址逗号隔开 |
| | | # api.RegisterNameServer('tcp://224.224.3.19:7888') |
| | | # api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370') |
| | | |
| | | # -------------------------正式地址B类------------------------------- |
| | | api.RegisterMulticast(ADDRESS, None, "") |
| | | |
| | | # -------------------------正式地址A类------------------------------- |
| | | # api.RegisterMulticast("udp://224.224.1.9:7880", None, "") |
| | | |
| | | # 启动接口 |
| | | api.Init() |
| | | |
| | | logger_system.info("L1订阅服务启动成功") |
| | | # 测试链路 |
| | | # level1_data_dict["000969"] = ( |
| | | # "000969", 9.46, 9.11, 771000*100, time.time()) |
| | | # level1_data_dict["002292"] = ( |
| | | # "002292", 8.06, 9.96, 969500 * 100, time.time()) |
| | | # TODO:测试 |
| | | threading.Thread(target= lambda: test_add_datas(), daemon=True).start() |
| | | # 等待程序结束 |
| | | while True: |
| | | try: |
| | | # (代码,现价,涨幅,量,时间) |
| | | datas = [] |
| | | while True: |
| | | if not level1_data_queue.empty(): |
| | | data = level1_data_queue.get() |
| | | datas.append(data) |
| | | else: |
| | | break |
| | | if datas: |
| | | __upload_codes_info(datas) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | | time.sleep(0.5) |
| | | # 释放接口对象 |
| | | api.Release() |
| | | |
| | | |
| | | def run_async(): |
| | | logger_system.info("L1进程ID:{}", os.getpid()) |
| | | t1 = threading.Thread(target=lambda: run(), daemon=True) |
| | | t1.start() |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | pass |