Administrator
2024-03-18 5c5a34e67334e53e3ac9da5adc85ccc8ec0529c4
huaxin_client/l2_data_manager.py
@@ -2,7 +2,6 @@
import json
import logging
import marshal
import multiprocessing
import queue
import threading
import time
@@ -13,7 +12,7 @@
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager
from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
from log_module import async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
@@ -34,14 +33,7 @@
# L2上传数据管理器
class L2DataUploadManager:
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue,
                 data_callback_distribute_manager: CodeDataCallbackDistributeManager):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
    def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager):
        self.data_callback_distribute_manager = data_callback_distribute_manager
        # 代码分配的对象
        self.temp_order_queue_dict = {}
@@ -133,10 +125,6 @@
    # 分配上传队列
    def distribute_upload_queue(self, code):
        if not self.order_queue_distribute_manager.get_distributed_queue(code):
            self.order_queue_distribute_manager.distribute_queue(code)
        if not self.transaction_queue_distribute_manager.get_distributed_queue(code):
            self.transaction_queue_distribute_manager.distribute_queue(code)
        if not self.data_callback_distribute_manager.get_distributed_callback(code):
            self.data_callback_distribute_manager.distribute_callback(code)
@@ -157,8 +145,6 @@
        # 释放已经分配的队列
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        self.data_callback_distribute_manager.release_distribute_callback(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
@@ -179,8 +165,6 @@
    def __run_upload_order_task(self, code):
        q: collections.deque = self.temp_order_queue_dict.get(code)
        temp_list = []
        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
        upload_queue = queue_info[1]
        while True:
            try:
                while len(q) > 0:
@@ -217,8 +201,6 @@
    # 处理成交数据并上传
    def __run_upload_transaction_task(self, code):
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
        upload_queue = queue_info[1]
        temp_list = []
        while True:
            try: