Administrator
2024-03-13 bb2c58fb407a3783b3704b33df6a154207ae4199
更改L2数据接收方式
6个文件已修改
55 ■■■■■ 已修改文件
huaxin_client/l2_client.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_listen_manager.py 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -585,7 +585,7 @@
def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None:
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list) -> None:
    # def test_add_codes():
    #     time.sleep(5)
    #     # if value:
@@ -633,7 +633,7 @@
        order_queue_distribute_manager = CodeQueueDistributeManager(order_queues)
        transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
        l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
                                                     transaction_queue_distribute_manager, market_queue)
                                                     transaction_queue_distribute_manager, market_queue, order_ipc_hosts)
        __init_l2(l2_data_upload_manager)
        l2_data_manager.run_upload_common()
        l2_data_manager.run_log()
huaxin_client/l2_data_manager.py
@@ -34,7 +34,7 @@
class L2DataUploadManager:
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue):
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
@@ -45,6 +45,7 @@
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
        self.l2_transaction_codes = set()
        self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts)
    # 设置订单过滤条件
    # special_price:过滤的1手的价格
@@ -133,7 +134,8 @@
            self.temp_transaction_queue_dict[code] = collections.deque()
        if code not in self.temp_log_queue_dict:
            self.temp_log_queue_dict[code] = queue.Queue()
        # 分配订单上传协议
        self.l2_order_upload_protocol.distribute_upload_host(code)
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
@@ -148,6 +150,7 @@
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        self.l2_order_upload_protocol.release_distributed_upload_host(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
@@ -162,6 +165,9 @@
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
    def __upload_l2_order_data(self, code, datas):
        self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time()))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
@@ -180,8 +186,8 @@
                if temp_list:
                    # 上传数据
                    self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_order_data(code, temp_list)
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.__upload_l2_order_data(code, temp_list)
                    temp_list = []
                else:
                    if code not in self.temp_order_queue_dict:
@@ -289,7 +295,7 @@
        if code not in self.code_socket_client_dict:
            raise Exception("尚未分配host")
        host, socket = self.code_socket_client_dict[code]
        socket.send_json(data)
        socket.send(marshal.dumps(data))
        socket.recv_string()
l2/cancel_buy_strategy.py
@@ -1130,7 +1130,11 @@
        self.__real_place_order_index_dict[code] = (index, is_default)
        RedisUtils.setex_async(self.__db, f"l_cancel_real_place_order_index-{code}", tool.get_expire(), index)
        if buy_single_index is not None:
            self.compute_watch_index(code, buy_single_index, buy_single_index, index)
            if code in self.__last_trade_progress_dict:
                self.compute_watch_index(code, buy_single_index, max(buy_single_index, self.__last_trade_progress_dict[code] + 1), index)
            else:
                self.compute_watch_index(code, buy_single_index, buy_single_index, index)
        if self.__last_trade_progress_dict.get(code):
            self.__compute_trade_progress_near_by_indexes(code, buy_single_index,
                                                          self.__last_trade_progress_dict.get(code) + 1, index)
l2/l2_data_listen_manager.py
@@ -114,7 +114,8 @@
        count = 0
        while True:
            try:
                data = socket.recv_json()
                data = socket.recv()
                data = marshal.loads(data)
                self.my_l2_data_callback.OnL2Order(data[0], data[1], data[2])
                socket.send_string("SUCCESS")
            except Exception as e:
@@ -132,18 +133,18 @@
            threading.Thread(target=lambda: self.__create_ipc_server(host), daemon=True).start()
    # 接收L2数据
    def receive_l2_data(self, order_queues, transaction_queues, market_queue):
    def receive_l2_data(self, order_queues, transaction_queues, market_queue, order_ipc_hosts):
        # TODO 暂时不通过队列接收数据
        for q in order_queues:
            t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
            t1.start()
        # for q in order_queues:
        #     t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
        #     t1.start()
        for q in transaction_queues:
            t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True)
            t2.start()
        t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True)
        t3.start()
        # 接收订单hosts
        # self.__create_ipc_server_hosts(order_ipc_hosts)
        self.__create_ipc_server_hosts(order_ipc_hosts)
    def get_active_count(self, type_):
        expire_time = time.time() - 5
main.py
@@ -29,7 +29,7 @@
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_,
                      market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_):
                      market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_, order_ipc_hosts_):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -53,7 +53,7 @@
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_, order_queues_,
                            transaction_queues_, market_queue_,
                            queue_l1_trade_w_strategy_r_)
                            queue_l1_trade_w_strategy_r_, order_ipc_hosts_)
# 主服务
@@ -143,14 +143,14 @@
        # L2
        l2Process = multiprocessing.Process(
            target=huaxin_client.l2_client.run,
            args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue))
            args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts))
        l2Process.start()
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read,
                          order_queues, transaction_queues, market_queue, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r)
                          queue_l1_trade_w_strategy_r, order_ipc_hosts)
        # 将tradeServer作为主进程
        l1Process.join()
trade/huaxin/huaxin_trade_server.py
@@ -381,11 +381,11 @@
    @classmethod
    def l2_order(cls, code, _datas, timestamp):
        now_timestamp = int(time.time() * 1000)
        use_time = int((time.time() - timestamp) * 1000)
        thread_id = random.randint(0, 100000)
        l2_log.threadIds[code] = thread_id
        async_log_util.info(hx_logger_l2_orderdetail,
                            f"{code}#耗时:{int((time.time() - timestamp) * 1000)}-{thread_id}#{_datas}")
                            f"{code}#耗时:{use_time}-{thread_id}#{_datas}")
        # l2_data_log.l2_time_log(code, "开始处理L2逐笔委托")
        try:
@@ -1682,7 +1682,7 @@
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        order_queues, transaction_queues,
        market_queue, queue_l1_trade_w_strategy_r):
        market_queue, queue_l1_trade_w_strategy_r,order_ipc_hosts):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
@@ -1698,7 +1698,7 @@
        # 监听L2数据
        global l2DataListenManager
        l2DataListenManager = L2DataListenManager(my_l2_data_callback)
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue)
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue, order_ipc_hosts)
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,