Administrator
2024-03-08 2aece7c50cf3ab4b130448aad7a911172b2ff6e6
监听1手日志
2个文件已修改
40 ■■■■ 已修改文件
huaxin_client/l2_data_manager.py 34 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py
@@ -13,7 +13,8 @@
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
from log_module import async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
    logger_local_huaxin_l2_special_volume
from utils import tool
import collections
import zmq
@@ -63,7 +64,7 @@
            if item[2] >= filter_condition[0][0]:
                return item
            # 1手的买单满足价格
            if item[2] == 100: #and abs(filter_condition[0][2] - item[1]) < 0.001:
            if item[2] == 100:  # and abs(filter_condition[0][2] - item[1]) < 0.001:
                return item
            # 买量
            if item[2] == filter_condition[0][3]:
@@ -91,6 +92,10 @@
        # queue_info[1].put_nowait(
        #     (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
        #      data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
        if data['Volume'] == 100:
            log_queue = self.temp_log_queue_dict.get(code)
            if log_queue:
                log_queue.put_nowait(data)
        q: collections.deque = self.temp_order_queue_dict.get(code)
        q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
@@ -123,10 +128,13 @@
            self.order_queue_distribute_manager.distribute_queue(code)
        if not self.transaction_queue_distribute_manager.get_distributed_queue(code):
            self.transaction_queue_distribute_manager.distribute_queue(code)
        if code not in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code] = collections.deque()
        if code not in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code] = collections.deque()
        if code not in self.temp_log_queue_dict:
            self.temp_log_queue_dict[code] = queue.Queue()
        # 分配订单上传协议
        self.l2_order_upload_protocol.distribute_upload_host(code)
@@ -135,7 +143,9 @@
            t1.start()
            t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True)
            t2.start()
            self.upload_l2_data_task_dict[code] = (t1, t2)
            t3 = threading.Thread(target=lambda: self.__run_log_task(code), daemon=True)
            t3.start()
            self.upload_l2_data_task_dict[code] = (t1, t2, t3)
        # 释放已经分配的队列
    def release_distributed_upload_queue(self, code):
@@ -148,9 +158,11 @@
        if code in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code].clear()
            self.temp_transaction_queue_dict.pop(code)
        if code in self.temp_log_queue_dict:
            self.temp_log_queue_dict.pop(code)
        if code in self.upload_l2_data_task_dict:
            self.upload_l2_data_task_dict.pop(code)
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait((code, datas, time.time()))
@@ -217,6 +229,19 @@
                pass
            finally:
                pass
    def __run_log_task(self, code):
        q: queue.Queue = self.temp_log_queue_dict.get(code)
        while True:
            try:
                temp = q.get(timeout=10)
                huaxin_l2_log.info(logger_local_huaxin_l2_special_volume,
                                   f"{temp}")
            except:
                time.sleep(0.02)
            finally:
                if code not in self.temp_log_queue_dict:
                    break
class L2DataUploadProtocolManager:
@@ -378,7 +403,6 @@
def __test():
    # 分配数据
    pass
def run_test():
log_module/log.py
@@ -281,6 +281,10 @@
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_g_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "special_volume"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l2_special_volume",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "l2_buy_no"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l2_buy_no",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -408,7 +412,7 @@
logger_local_huaxin_g_cancel = __mylogger.get_logger("local_huaxin_g_cancel")
logger_local_huaxin_l2_buy_no = __mylogger.get_logger("local_huaxin_l2_buy_no")
logger_local_huaxin_l1_trade_info = __mylogger.get_logger("local_huaxin_l1_trade_info")
logger_local_huaxin_l2_special_volume = __mylogger.get_logger("local_huaxin_l2_special_volume")