Administrator
2024-06-26 d9a3b754d14f93375cd3d91afa49be8deb07975c
订阅bug修改
3个文件已修改
19 ■■■■ 已修改文件
huaxin_client/code_queue_distribute_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/code_queue_distribute_manager.py
@@ -70,7 +70,7 @@
        return None
    # 为代码分配队列
    def distribute_callback(self, code):
    def distribute_callback(self, code, target_codes=None):
        if code in self.distibuted_code_callback_dict:
            return self.distibuted_code_callback_dict.get(code)
        callback_info = self.get_available_callback()
@@ -78,8 +78,12 @@
            distibuted_callbacks_ids = set()
            for code in self.distibuted_code_callback_dict:
                distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0])
                # 如果代码没在目标代码中就移除
                if target_codes and code not in target_codes:
                    self.release_distribute_callback(code)
            logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_callback_dict.keys()}")
            logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}")
            # 删除已经没在目标代码中的分配
            raise Exception("无可用的回调对象")
        self.distibuted_code_callback_dict[code] = callback_info
        return callback_info
huaxin_client/l2_client.py
@@ -148,6 +148,7 @@
            codes.add(code)
            self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4])
            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4])
        logger_l2_codes_subscript.info("华鑫L2订阅总数:{}", len(codes))
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
@@ -156,7 +157,7 @@
                self.l2_data_upload_manager.release_distributed_upload_queue(c)
                l2_data_manager.del_target_code(c)
            for c in codes:
                self.l2_data_upload_manager.distribute_upload_queue(c)
                self.l2_data_upload_manager.distribute_upload_queue(c, codes)
                l2_data_manager.add_target_code(c)
        except Exception as e:  # TODO 清除原来还没释放掉的数据
            logger_system.error(f"L2代码分配上传队列出错:{str(e)}")
huaxin_client/l2_data_manager.py
@@ -132,9 +132,15 @@
            callback.OnMarketData(code, data)
    # 分配上传队列
    def distribute_upload_queue(self, code):
    def distribute_upload_queue(self, code, _target_codes=None):
        """
        分配上传队列
        @param code: 代码
        @param _target_codes: 所有的目标代码
        @return:
        """
        if not self.data_callback_distribute_manager.get_distributed_callback(code):
            self.data_callback_distribute_manager.distribute_callback(code)
            self.data_callback_distribute_manager.distribute_callback(code, _target_codes)
        if code not in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code] = collections.deque()