| | |
| | | import json |
| | | import logging |
| | | import marshal |
| | | import multiprocessing |
| | | import queue |
| | | import threading |
| | | import time |
| | |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager |
| | | from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager |
| | | from log_module import async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ |
| | |
| | | |
| | | # L2上传数据管理器 |
| | | class L2DataUploadManager: |
| | | def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, |
| | | transaction_queue_distribute_manager: CodeQueueDistributeManager, |
| | | market_data_queue: multiprocessing.Queue, |
| | | data_callback_distribute_manager: CodeDataCallbackDistributeManager): |
| | | |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | | self.transaction_queue_distribute_manager = transaction_queue_distribute_manager |
| | | self.market_data_queue = market_data_queue |
| | | def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager): |
| | | self.data_callback_distribute_manager = data_callback_distribute_manager |
| | | # 代码分配的对象 |
| | | self.temp_order_queue_dict = {} |
| | |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code): |
| | | if not self.order_queue_distribute_manager.get_distributed_queue(code): |
| | | self.order_queue_distribute_manager.distribute_queue(code) |
| | | if not self.transaction_queue_distribute_manager.get_distributed_queue(code): |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | if not self.data_callback_distribute_manager.get_distributed_callback(code): |
| | | self.data_callback_distribute_manager.distribute_callback(code) |
| | | |
| | |
| | | # 释放已经分配的队列 |
| | | |
| | | def release_distributed_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.release_distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.release_distribute_queue(code) |
| | | self.data_callback_distribute_manager.release_distribute_callback(code) |
| | | if code in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code].clear() |
| | |
| | | def __run_upload_order_task(self, code): |
| | | q: collections.deque = self.temp_order_queue_dict.get(code) |
| | | temp_list = [] |
| | | queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) |
| | | upload_queue = queue_info[1] |
| | | while True: |
| | | try: |
| | | while len(q) > 0: |
| | |
| | | # 处理成交数据并上传 |
| | | def __run_upload_transaction_task(self, code): |
| | | q: collections.deque = self.temp_transaction_queue_dict.get(code) |
| | | queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) |
| | | upload_queue = queue_info[1] |
| | | temp_list = [] |
| | | while True: |
| | | try: |