Administrator
2024-03-18 23cecbb7d4ddf4149cd6b106bfaad415db45bcc3
L2日志修改
5个文件已修改
36 ■■■■ 已修改文件
huaxin_client/l2_client.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_listen_manager.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -590,7 +590,7 @@
def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list, data_callbacks:list) -> None:
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, data_callbacks:list) -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
@@ -605,7 +605,7 @@
        transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
        l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
                                                     transaction_queue_distribute_manager, market_queue, order_ipc_hosts, data_callback_distribute_manager)
                                                     transaction_queue_distribute_manager, market_queue, data_callback_distribute_manager)
        __init_l2(l2_data_upload_manager)
        l2_data_manager.run_upload_common()
        l2_data_manager.run_log()
huaxin_client/l2_data_manager.py
@@ -14,7 +14,6 @@
# 活动时间
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
@@ -37,7 +36,7 @@
class L2DataUploadManager:
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts,
                 market_data_queue: multiprocessing.Queue,
                 data_callback_distribute_manager: CodeDataCallbackDistributeManager):
        self.order_queue_distribute_manager = order_queue_distribute_manager
@@ -53,7 +52,6 @@
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
        self.l2_transaction_codes = set()
        self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts)
    # 设置订单过滤条件
    # special_price:过滤的1手的价格
@@ -148,10 +146,6 @@
            self.temp_transaction_queue_dict[code] = collections.deque()
        if code not in self.temp_log_queue_dict:
            self.temp_log_queue_dict[code] = queue.Queue()
        # 分配订单上传协议
        if not constant.is_windows():
            self.l2_order_upload_protocol.distribute_upload_host(code)
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
            t1.start()
@@ -165,7 +159,6 @@
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        self.l2_order_upload_protocol.release_distributed_upload_host(code)
        self.data_callback_distribute_manager.release_distribute_callback(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
@@ -181,9 +174,6 @@
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
    def __upload_l2_order_data(self, code, datas):
        self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time()))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
l2/l2_data_listen_manager.py
@@ -134,7 +134,7 @@
            threading.Thread(target=lambda: self.__create_ipc_server(host), daemon=True).start()
    # 接收L2数据
    def receive_l2_data(self, order_queues, transaction_queues, market_queue, order_ipc_hosts):
    def receive_l2_data(self, order_queues, transaction_queues, market_queue):
        # TODO 暂时不通过队列接收数据
        # for q in order_queues:
        #     t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
@@ -144,9 +144,6 @@
            t2.start()
        t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True)
        t3.start()
        # 接收订单hosts
        if not constant.is_windows():
            self.__create_ipc_server_hosts(order_ipc_hosts)
    def get_active_count(self, type_):
        expire_time = time.time() - 5
main.py
@@ -29,7 +29,7 @@
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_,
                      market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_, order_ipc_hosts_):
                      market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -53,7 +53,7 @@
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_, order_queues_,
                            transaction_queues_, market_queue_,
                            queue_l1_trade_w_strategy_r_, order_ipc_hosts_)
                            queue_l1_trade_w_strategy_r_)
# 主服务
@@ -131,14 +131,11 @@
        # 创建L2通信队列
        order_queues = []
        transaction_queues = []
        order_ipc_hosts = []
        market_queue = multiprocessing.Queue()
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            order_queues.append(multiprocessing.Queue())
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            transaction_queues.append(multiprocessing.Queue())
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            order_ipc_hosts.append(f"ipc://l2order{i}.ipc")
        # 此处将L2的进程与策略进程合并
        # L2
@@ -148,14 +145,14 @@
        # l2Process.start()
        # 将L2的进程改为进程执行
        threading.Thread(target=huaxin_client.l2_client.run, args=(
            queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts,
            queue_other_w_l2_r, order_queues, transaction_queues, market_queue,
            huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read,
                          order_queues, transaction_queues, market_queue, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r, order_ipc_hosts)
                          queue_l1_trade_w_strategy_r)
        # 将tradeServer作为主进程
        l1Process.join()
trade/huaxin/huaxin_trade_server.py
@@ -1685,7 +1685,7 @@
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        order_queues, transaction_queues,
        market_queue, queue_l1_trade_w_strategy_r, order_ipc_hosts):
        market_queue, queue_l1_trade_w_strategy_r):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
@@ -1701,7 +1701,7 @@
        # 监听L2数据
        global l2DataListenManager
        l2DataListenManager = L2DataListenManager(my_l2_data_callback)
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue, order_ipc_hosts)
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue)
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,