| | |
| | | return None |
| | | |
| | | # 为代码分配队列 |
| | | def distribute_callback(self, code): |
| | | def distribute_callback(self, code, target_codes=None): |
| | | if code in self.distibuted_code_callback_dict: |
| | | return self.distibuted_code_callback_dict.get(code) |
| | | callback_info = self.get_available_callback() |
| | |
| | | distibuted_callbacks_ids = set() |
| | | for code in self.distibuted_code_callback_dict: |
| | | distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0]) |
| | | # 如果代码没在目标代码中就移除 |
| | | if target_codes and code not in target_codes: |
| | | self.release_distribute_callback(code) |
| | | logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_callback_dict.keys()}") |
| | | logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}") |
| | | # 删除已经没在目标代码中的分配 |
| | | raise Exception("无可用的回调对象") |
| | | self.distibuted_code_callback_dict[code] = callback_info |
| | | return callback_info |
| | |
| | | codes.add(code) |
| | | self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4]) |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4]) |
| | | logger_l2_codes_subscript.info("华鑫L2订阅总数:{}", len(codes)) |
| | | add_codes = codes - self.subscripted_codes |
| | | del_codes = self.subscripted_codes - codes |
| | | print("add del codes", add_codes, del_codes) |
| | |
| | | self.l2_data_upload_manager.release_distributed_upload_queue(c) |
| | | l2_data_manager.del_target_code(c) |
| | | for c in codes: |
| | | self.l2_data_upload_manager.distribute_upload_queue(c) |
| | | self.l2_data_upload_manager.distribute_upload_queue(c, codes) |
| | | l2_data_manager.add_target_code(c) |
| | | except Exception as e: # TODO 清除原来还没释放掉的数据 |
| | | logger_system.error(f"L2代码分配上传队列出错:{str(e)}") |
| | |
| | | callback.OnMarketData(code, data) |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code): |
| | | def distribute_upload_queue(self, code, _target_codes=None): |
| | | """ |
| | | 分配上传队列 |
| | | @param code: 代码 |
| | | @param _target_codes: 所有的目标代码 |
| | | @return: |
| | | """ |
| | | if not self.data_callback_distribute_manager.get_distributed_callback(code): |
| | | self.data_callback_distribute_manager.distribute_callback(code) |
| | | self.data_callback_distribute_manager.distribute_callback(code, _target_codes) |
| | | |
| | | if code not in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code] = collections.deque() |