| | |
| | | # -*- coding: utf-8 -*- |
| | | import collections |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import os |
| | | import threading |
| | | import time |
| | | |
| | | from huaxin_client import socket_util, l1_subscript_codes_manager |
| | | from huaxin_client import socket_util |
| | | import xmdapi |
| | | from huaxin_client import tool, constant |
| | | from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript |
| | | from huaxin_client import constant |
| | | from log_module.log import logger_system, logger_local_huaxin_l1 |
| | | |
| | | ################B类################## |
| | | ADDRESS = "udp://224.224.1.19:7880" |
| | |
| | | codes_sh, codes_sz = self.__seperate_codes(add_codes) |
| | | logger_local_huaxin_l1.info(f"新增订阅:{codes_sh} {codes_sz}") |
| | | if codes_sh: |
| | | self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE) |
| | | ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE) |
| | | if ret != 0: |
| | | logger_local_huaxin_l1.info('SubscribeMarketData fail, ret[%d]' % ret) |
| | | else: |
| | | logger_local_huaxin_l1.info('SubscribeMarketData success, ret[%d]' % ret) |
| | | if codes_sz: |
| | | self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE) |
| | | ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE) |
| | | if ret != 0: |
| | | logger_local_huaxin_l1.info('SubscribeMarketData fail, ret[%d]' % ret) |
| | | else: |
| | | logger_local_huaxin_l1.info('SubscribeMarketData success, ret[%d]' % ret) |
| | | |
| | | if del_codes: |
| | | codes_sh, codes_sz = self.__seperate_codes(del_codes) |
| | |
| | | self.__api.UnSubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE) |
| | | if codes_sz: |
| | | self.__api.UnSubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE) |
| | | |
| | | self.__subscribed_codes = set(codes) |
| | | |
| | | def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID): |
| | | if pRspInfoField.ErrorID == 0: |
| | |
| | | def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField): |
| | | if pRspInfoField.ErrorID == 0: |
| | | print('OnRspSubMarketData: OK!') |
| | | logger_local_huaxin_l1.info(f"订阅:{pSpecificSecurityField['SecurityID']}") |
| | | self.__subscribed_codes.add(pSpecificSecurityField["SecurityID"]) |
| | | else: |
| | | logger_local_huaxin_l1.info('OnRspSubMarketData: Error! [%d] [%s]' |
| | | % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | |
| | | def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField): |
| | | if pRspInfoField.ErrorID == 0: |
| | | print('OnRspUnSubMarketData: OK!') |
| | | logger_local_huaxin_l1.info(f"取消订阅:{pSpecificSecurityField['SecurityID']}") |
| | | self.__subscribed_codes.discard(pSpecificSecurityField["SecurityID"]) |
| | | else: |
| | | print('OnRspUnSubMarketData: Error! [%d] [%s]' |
| | | % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | |
| | | return |
| | | rate = 0 |
| | | self.l1_data_queue.append(( |
| | | pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, pMarketDataField.UpdateTime, |
| | | pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, |
| | | pMarketDataField.UpdateTime, |
| | | pMarketDataField.BidPrice1, pMarketDataField.BidVolume1)) |
| | | |
| | | # print( |
| | |
| | | |
| | | def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w): |
| | | logger_local_huaxin_l1.info("运行l1_for_trade订阅服务") |
| | | codes_sh = [] |
| | | codes_sz = [] |
| | | # 打印接口版本号 |
| | | print(xmdapi.CTORATstpXMdApi_GetApiVersion()) |
| | | |
| | | # 创建接口对象 |
| | | api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST) |
| | | # api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_TCP) # 测试 |
| | | |
| | | # 创建回调对象 |
| | | global spi |
| | |
| | | |
| | | # -------------------------正式地址B类------------------------------- |
| | | api.RegisterMulticast(ADDRESS, None, "") |
| | | # api.RegisterFront("tcp://210.14.72.16:9402") # 测试地址 |
| | | |
| | | # -------------------------正式地址A类------------------------------- |
| | | # api.RegisterMulticast("udp://224.224.1.9:7880", None, "") |
| | |
| | | api.Release() |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | pass |
| | | def test_run(): |
| | | def test_sub(): |
| | | time.sleep(5) |
| | | queue_l1_trade_r_strategy_w.put_nowait( |
| | | json.dumps({"type": "set_target_codes", "data": ["603825", "603767", "603778"]})) |
| | | |
| | | def read_data(): |
| | | while True: |
| | | val = queue_l1_trade_w_strategy_r.get() |
| | | print(val) |
| | | |
| | | queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w = multiprocessing.Queue(), multiprocessing.Queue() |
| | | threading.Thread(target=test_sub, daemon=True).start() |
| | | threading.Thread(target=read_data, daemon=True).start() |
| | | run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w) |