Administrator
2025-03-11 46f51dfb83f6e6a2784676bde64577e5f6f28cf0
huaxin_client/l2_data_manager_v2.py
@@ -2,6 +2,7 @@
import json
import logging
import marshal
import multiprocessing
import queue
import threading
import time
@@ -35,13 +36,20 @@
# L2上传数据管理器
class L2DataUploadManager:
    """
    L2逐笔委托/L2逐笔成交:通过共享内存+ZMQ上传数据
    L2市场行情: 通过普通数据上传队列写入
    L2订阅的代码: 通过普通数据上传队列写入
    """
    TYPE_DELEGATE = 1
    TYPE_TRANSACTION = 2
    TYPE_MARKET = 3
    def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager):
    def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager,
                 common_data_upload_queue: multiprocessing.Queue):
        self.data_channel_distribute_manager = data_channel_distribute_manager
        self.common_data_upload_queue = common_data_upload_queue
        # 代码分配的对象
        self.temp_order_queue_dict = {}
        self.temp_transaction_queue_dict = {}
@@ -144,13 +152,9 @@
                      data['SellNo'], data['ExecType'], time.time()))
    def add_market_data(self, data):
        # 加入上传队列
        # self.market_data_queue.put_nowait(data)
        code = data['securityID']
        # TODO 改为zmq发送
        callback = self.data_channel_distribute_manager.get_distributed_channel(code)
        if callback:
            callback.OnMarketData(code, data)
        #  改为队列回调发送
        self.common_data_upload_queue.put_nowait({"type": "l2_market", "data": (code, data)})
    # 分配上传队列
    def distribute_upload_queue(self, code, _target_codes=None):
@@ -192,9 +196,6 @@
        if code in self.upload_l2_data_task_dict:
            self.upload_l2_data_task_dict.pop(code)
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
@@ -265,7 +266,8 @@
                                                 (code, temp_list))
                    # 通知获取数据
                    _socket = self.data_channel_distribute_manager.get_zmq_socket(zmq_host)
                    _socket.send(msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}}))
                    _socket.send(
                        msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}}))
                    _socket.recv_string()
                    temp_list = []
                else: