Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
huaxin_client/l2_client.py
@@ -17,7 +17,7 @@
from huaxin_client.l2_data_manager import L2DataUploadManager
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug
from utils import tool
###B类###
@@ -50,8 +50,7 @@
SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
                 b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
set_codes_data_queue = queue.Queue()
market_code_dict = {}
set_codes_data_queue = queue.Queue(maxsize=1000)
ENABLE_NGST = True
@@ -106,6 +105,9 @@
            self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def subscribe_codes(self, _codes):
        self.__subscribe(_codes)
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
@@ -167,6 +169,8 @@
        if add_codes:
            logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}")
            for c in add_codes:
                logger_l2_codes_subscript.info(f"l2委托数据过滤条件:{c} - {self.codes_volume_and_price_dict.get(c)}")
        logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes))
@@ -241,6 +245,8 @@
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
            # 初始化
            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
@@ -262,6 +268,8 @@
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
            # 初始化
            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
@@ -326,8 +334,8 @@
                 "avgAskPrice": pDepthMarketData["AvgAskPrice"],
                 "buy": buys,
                 "sell": sells}
            market_code_dict[pDepthMarketData['SecurityID']] = time.time()
            self.l2_data_upload_manager.add_market_data(d)
            SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID'])
        except:
            pass
@@ -498,6 +506,45 @@
            print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
class SubscriptDefend:
    """
    订阅守护
    定义:当订阅的代码超过一定时间没有回调数据时重新订阅
    """
    __l2_market_update_time = {}
    @classmethod
    def set_l2_market_update(cls, code):
        cls.__l2_market_update_time[code] = time.time()
    @classmethod
    def run(cls):
        while True:
            try:
                now_time = tool.get_now_time_as_int()
                if now_time < int("093015"):
                    continue
                if int("112945") < now_time < int("130015"):
                    continue
                if int("145645") < now_time:
                    continue
                if spi.subscripted_codes:
                    codes = []
                    for code in spi.subscripted_codes:
                        # 获取上次更新时间
                        update_time = cls.__l2_market_update_time.get(code)
                        if update_time and time.time() - update_time > 15:
                            # 需要重新订阅
                            codes.append(code)
                    if codes:
                        logger_debug.info(f"重新订阅:{codes}")
                        spi.subscribe_codes(codes)
            except:
                pass
            finally:
                time.sleep(15)
class MyL2ActionCallback(L2ActionCallback):
    def OnSetL2Position(self, codes_data):
@@ -621,7 +668,8 @@
        if queue_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
            t1.start()
        # 订阅守护
        threading.Thread(target=SubscriptDefend.run, daemon=True).start()
        # 初始化
        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
        l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)
@@ -676,13 +724,13 @@
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
    queue_r = multiprocessing.Queue()
    queue_r = multiprocessing.Queue(maxsize=1024)
    order_queues = []
    transaction_queues = []
    market_queue = multiprocessing.Queue()
    market_queue = multiprocessing.Queue(maxsize=1024)
    for i in range(20):
        order_queues.append(multiprocessing.Queue())
        transaction_queues.append(multiprocessing.Queue())
        order_queues.append(multiprocessing.Queue(maxsize=1024))
        transaction_queues.append(multiprocessing.Queue(maxsize=1024))
    threading.Thread(target=test_add_codes).start()
    run(queue_r, order_queues, transaction_queues, market_queue)