| | |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager |
| | | from huaxin_client.l2_data_transform_protocol import L2DataCallBack |
| | | from log_module import async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ |
| | |
| | | class L2DataUploadManager: |
| | | def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, |
| | | transaction_queue_distribute_manager: CodeQueueDistributeManager, |
| | | market_data_queue: multiprocessing.Queue, order_ipc_hosts, |
| | | market_data_queue: multiprocessing.Queue, |
| | | data_callback_distribute_manager: CodeDataCallbackDistributeManager): |
| | | |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | |
| | | self.upload_l2_data_task_dict = {} |
| | | self.l2_order_codes = set() |
| | | self.l2_transaction_codes = set() |
| | | self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts) |
| | | |
| | | # 设置订单过滤条件 |
| | | # special_price:过滤的1手的价格 |
| | |
| | | self.temp_transaction_queue_dict[code] = collections.deque() |
| | | if code not in self.temp_log_queue_dict: |
| | | self.temp_log_queue_dict[code] = queue.Queue() |
| | | # 分配订单上传协议 |
| | | if not constant.is_windows(): |
| | | self.l2_order_upload_protocol.distribute_upload_host(code) |
| | | |
| | | if code not in self.upload_l2_data_task_dict: |
| | | t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) |
| | | t1.start() |
| | |
| | | def release_distributed_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.release_distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.release_distribute_queue(code) |
| | | self.l2_order_upload_protocol.release_distributed_upload_host(code) |
| | | self.data_callback_distribute_manager.release_distribute_callback(code) |
| | | if code in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code].clear() |
| | |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait(marshal.dumps([code, datas, time.time()])) |
| | | |
| | | def __upload_l2_order_data(self, code, datas): |
| | | self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time())) |
| | | |
| | | # 处理订单数据并上传 |
| | | def __run_upload_order_task(self, code): |