Administrator
2024-01-31 d8e24c2ec1832ef718a15f307dd34f2937cc50ed
L1交易订阅修改
1个文件已修改
50 ■■■■■ 已修改文件
huaxin_client/l1_client_for_trade.py 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client_for_trade.py
@@ -1,16 +1,14 @@
# -*- coding: utf-8 -*-
import collections
import json
import logging
import multiprocessing
import os
import threading
import time
from huaxin_client import socket_util, l1_subscript_codes_manager
from huaxin_client import socket_util
import xmdapi
from huaxin_client import tool, constant
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript
from huaxin_client import constant
from log_module.log import logger_system, logger_local_huaxin_l1
################B类##################
ADDRESS = "udp://224.224.1.19:7880"
@@ -68,9 +66,17 @@
            codes_sh, codes_sz = self.__seperate_codes(add_codes)
            logger_local_huaxin_l1.info(f"新增订阅:{codes_sh}  {codes_sz}")
            if codes_sh:
                self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
                ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
                if ret != 0:
                    logger_local_huaxin_l1.info('SubscribeMarketData fail, ret[%d]' % ret)
                else:
                    logger_local_huaxin_l1.info('SubscribeMarketData success, ret[%d]' % ret)
            if codes_sz:
                self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
                ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
                if ret != 0:
                    logger_local_huaxin_l1.info('SubscribeMarketData fail, ret[%d]' % ret)
                else:
                    logger_local_huaxin_l1.info('SubscribeMarketData success, ret[%d]' % ret)
        if del_codes:
            codes_sh, codes_sz = self.__seperate_codes(del_codes)
@@ -78,6 +84,8 @@
                self.__api.UnSubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
            if codes_sz:
                self.__api.UnSubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
        self.__subscribed_codes = set(codes)
    def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID):
        if pRspInfoField.ErrorID == 0:
@@ -104,8 +112,6 @@
    def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspSubMarketData: OK!')
            logger_local_huaxin_l1.info(f"订阅:{pSpecificSecurityField['SecurityID']}")
            self.__subscribed_codes.add(pSpecificSecurityField["SecurityID"])
        else:
            logger_local_huaxin_l1.info('OnRspSubMarketData: Error! [%d] [%s]'
                                        % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
@@ -113,8 +119,6 @@
    def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspUnSubMarketData: OK!')
            logger_local_huaxin_l1.info(f"取消订阅:{pSpecificSecurityField['SecurityID']}")
            self.__subscribed_codes.discard(pSpecificSecurityField["SecurityID"])
        else:
            print('OnRspUnSubMarketData: Error! [%d] [%s]'
                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
@@ -126,7 +130,8 @@
            return
        rate = 0
        self.l1_data_queue.append((
            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, pMarketDataField.UpdateTime,
            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume,
            pMarketDataField.UpdateTime,
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1))
        # print(
@@ -166,13 +171,12 @@
def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w):
    logger_local_huaxin_l1.info("运行l1_for_trade订阅服务")
    codes_sh = []
    codes_sz = []
    # 打印接口版本号
    print(xmdapi.CTORATstpXMdApi_GetApiVersion())
    # 创建接口对象
    api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_MCAST)
    # api = xmdapi.CTORATstpXMdApi_CreateTstpXMdApi(xmdapi.TORA_TSTP_MST_TCP) # 测试
    # 创建回调对象
    global spi
@@ -191,6 +195,7 @@
    # -------------------------正式地址B类-------------------------------
    api.RegisterMulticast(ADDRESS, None, "")
    # api.RegisterFront("tcp://210.14.72.16:9402")  # 测试地址
    # -------------------------正式地址A类-------------------------------
    # api.RegisterMulticast("udp://224.224.1.9:7880", None, "")
@@ -224,5 +229,18 @@
    api.Release()
if __name__ == "__main__":
    pass
def test_run():
    def test_sub():
        time.sleep(5)
        queue_l1_trade_r_strategy_w.put_nowait(
            json.dumps({"type": "set_target_codes", "data": ["603825", "603767", "603778"]}))
    def read_data():
        while True:
            val = queue_l1_trade_w_strategy_r.get()
            print(val)
    queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w = multiprocessing.Queue(), multiprocessing.Queue()
    threading.Thread(target=test_sub, daemon=True).start()
    threading.Thread(target=read_data, daemon=True).start()
    run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w)