Administrator
2023-11-13 b86057f378a6567842e41f93df25f9188f0dc25c
修改l2数据上传规则(未生效)
1个文件已修改
68 ■■■■■ 已修改文件
huaxin_client/l2_data_manager.py 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py
@@ -14,6 +14,7 @@
from log_module import  async_log_util
from log_module.log import logger_local_huaxin_l2_error, logger_system
from utils import tool
import collections
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
@@ -33,6 +34,8 @@
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
        self.temp_order_queue_dict = {}
        self.temp_transaction_queue_dict = {}
    # 添加委托详情
    def add_l2_order_detail(self, data, start_time, istransaction=False):
@@ -63,11 +66,75 @@
    def distribute_upload_queue(self, code):
        self.order_queue_distribute_manager.distribute_queue(code)
        self.transaction_queue_distribute_manager.distribute_queue(code)
        self.temp_order_queue_dict[code] = collections.deque()
        self.temp_transaction_queue_dict[code] = collections.deque()
        # threading.Thread(target=lambda: self.__run_upload_order_task(code)).start()
        # threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start()
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait((code, datas, time.time()))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
        q: collections.deque = self.temp_order_queue_dict.get(code)
        temp_list = []
        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
        upload_queue = queue_info[1]
        while True:
            try:
                if not q:
                    data = q.popleft()
                    temp_list.append(data)
                else:
                    if temp_list:
                        # 上传数据
                        self.__upload_l2_data(code, upload_queue, temp_list)
                        temp_list.clear()
                    else:
                        if code not in self.temp_order_queue_dict:
                            break
                        time.sleep(0.001)
            except:
                pass
            finally:
                pass
    # 处理成交数据并上传
    def __run_upload_transaction_task(self, code):
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
        upload_queue = queue_info[1]
        temp_list = []
        while True:
            try:
                if not q:
                    data = q.popleft()
                    temp_list.append(data)
                else:
                    if temp_list:
                        # 上传数据
                        self.__upload_l2_data(code, upload_queue, temp_list)
                        temp_list.clear()
                    else:
                        if code not in self.temp_transaction_queue_dict:
                            break
                        time.sleep(0.002)
            except:
                pass
            finally:
                pass
    # 释放已经分配的队列
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
        if code in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code].clear()
            self.temp_transaction_queue_dict.pop(code)
def add_target_code(code):
@@ -81,6 +148,7 @@
    if code in target_codes_add_time:
        target_codes_add_time.pop(code)
def add_subscript_codes(codes):
    print("add_subscript_codes", codes)
    # 加入上传队列