Administrator
2024-03-12 cb8227fef858f06290fea302f2c13a055695ca05
L2订单数据接收方式修改
5个文件已修改
44 ■■■■■ 已修改文件
huaxin_client/l2_client.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_listen_manager.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -585,8 +585,7 @@
def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue,
        order_ipc_hosts: list) -> None:
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None:
    # def test_add_codes():
    #     time.sleep(5)
    #     # if value:
@@ -634,8 +633,7 @@
        order_queue_distribute_manager = CodeQueueDistributeManager(order_queues)
        transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
        l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
                                                     transaction_queue_distribute_manager, market_queue,
                                                     order_ipc_hosts)
                                                     transaction_queue_distribute_manager, market_queue)
        __init_l2(l2_data_upload_manager)
        l2_data_manager.run_upload_common()
        l2_data_manager.run_log()
huaxin_client/l2_data_manager.py
@@ -31,10 +31,9 @@
# L2上传数据管理器
class L2DataUploadManager:
    # order_ipc_hosts:远程host
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts):
                 market_data_queue: multiprocessing.Queue):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
@@ -45,8 +44,6 @@
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
        self.l2_transaction_codes = set()
        # 订单
        self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts)
    # 设置订单过滤条件
    # special_price:过滤的1手的价格
@@ -135,8 +132,7 @@
            self.temp_transaction_queue_dict[code] = collections.deque()
        if code not in self.temp_log_queue_dict:
            self.temp_log_queue_dict[code] = queue.Queue()
        # 分配订单上传协议
        self.l2_order_upload_protocol.distribute_upload_host(code)
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
@@ -151,7 +147,6 @@
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        self.l2_order_upload_protocol.release_distributed_upload_host(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
@@ -166,9 +161,6 @@
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait((code, datas, time.time()))
    def __upload_l2_order_data(self, code, datas):
        self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time()))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
@@ -187,8 +179,8 @@
                if temp_list:
                    # 上传数据
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.__upload_l2_order_data(code, temp_list)
                    self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_order_data(code, temp_list)
                    temp_list = []
                else:
                    if code not in self.temp_order_queue_dict:
l2/l2_data_listen_manager.py
@@ -129,18 +129,18 @@
            threading.Thread(target=lambda: self.__create_ipc_server(host), daemon=True).start()
    # 接收L2数据
    def receive_l2_data(self, order_queues, transaction_queues, market_queue, order_ipc_hosts):
    def receive_l2_data(self, order_queues, transaction_queues, market_queue):
        # TODO 暂时不通过队列接收数据
        # for q in order_queues:
        #     t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
        #     t1.start()
        for q in order_queues:
            t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
            t1.start()
        for q in transaction_queues:
            t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True)
            t2.start()
        t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True)
        t3.start()
        # 接收订单hosts
        self.__create_ipc_server_hosts(order_ipc_hosts)
        # self.__create_ipc_server_hosts(order_ipc_hosts)
    def get_active_count(self, type_):
        expire_time = time.time() - 5
main.py
@@ -29,7 +29,7 @@
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_,
                      market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_, order_ipc_hosts_):
                      market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -53,7 +53,7 @@
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_, order_queues_,
                            transaction_queues_, market_queue_,
                            queue_l1_trade_w_strategy_r_, order_ipc_hosts_)
                            queue_l1_trade_w_strategy_r_)
# 主服务
@@ -143,14 +143,14 @@
        # L2
        l2Process = multiprocessing.Process(
            target=huaxin_client.l2_client.run,
            args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts))
            args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue))
        l2Process.start()
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read,
                          order_queues, transaction_queues, market_queue, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r, order_ipc_hosts)
                          queue_l1_trade_w_strategy_r)
        # 将tradeServer作为主进程
        l1Process.join()
trade/huaxin/huaxin_trade_server.py
@@ -1680,7 +1680,7 @@
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        order_queues, transaction_queues,
        market_queue, queue_l1_trade_w_strategy_r, order_ipc_hosts):
        market_queue, queue_l1_trade_w_strategy_r):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
@@ -1696,7 +1696,7 @@
        # 监听L2数据
        global l2DataListenManager
        l2DataListenManager = L2DataListenManager(my_l2_data_callback)
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue, order_ipc_hosts)
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue)
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,