| | |
| | | import queue |
| | | import threading |
| | | import time |
| | | |
| | | import constant |
| | | from huaxin_client import socket_util |
| | | |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager |
| | | from huaxin_client.l2_data_transform_protocol import L2DataCallBack |
| | | from log_module import async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ |
| | |
| | | class L2DataUploadManager: |
| | | def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, |
| | | transaction_queue_distribute_manager: CodeQueueDistributeManager, |
| | | market_data_queue: multiprocessing.Queue, order_ipc_hosts): |
| | | market_data_queue: multiprocessing.Queue, order_ipc_hosts, data_callback_distribute_manager:CodeDataCallbackDistributeManager): |
| | | |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | | self.transaction_queue_distribute_manager = transaction_queue_distribute_manager |
| | | self.market_data_queue = market_data_queue |
| | | self.data_callback_distribute_manager = data_callback_distribute_manager |
| | | # 代码分配的对象 |
| | | self.temp_order_queue_dict = {} |
| | | self.temp_transaction_queue_dict = {} |
| | | self.temp_log_queue_dict = {} |
| | | |
| | | self.filter_order_condition_dict = {} |
| | | self.upload_l2_data_task_dict = {} |
| | | self.l2_order_codes = set() |
| | |
| | | |
| | | def add_market_data(self, data): |
| | | # 加入上传队列 |
| | | self.market_data_queue.put_nowait(data) |
| | | # self.market_data_queue.put_nowait(data) |
| | | code = data['securityID'] |
| | | callback = self.data_callback_distribute_manager.get_distributed_callback(code) |
| | | if callback: |
| | | callback.OnMarketData(code, data) |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code): |
| | |
| | | self.order_queue_distribute_manager.distribute_queue(code) |
| | | if not self.transaction_queue_distribute_manager.get_distributed_queue(code): |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | if not self.data_callback_distribute_manager.get_distributed_callback(code): |
| | | self.data_callback_distribute_manager.distribute_callback(code) |
| | | |
| | | |
| | | if code not in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | |
| | | if code not in self.temp_log_queue_dict: |
| | | self.temp_log_queue_dict[code] = queue.Queue() |
| | | # 分配订单上传协议 |
| | | self.l2_order_upload_protocol.distribute_upload_host(code) |
| | | if not constant.is_windows(): |
| | | self.l2_order_upload_protocol.distribute_upload_host(code) |
| | | |
| | | if code not in self.upload_l2_data_task_dict: |
| | | t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) |
| | |
| | | self.order_queue_distribute_manager.release_distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.release_distribute_queue(code) |
| | | self.l2_order_upload_protocol.release_distributed_upload_host(code) |
| | | self.data_callback_distribute_manager.release_distribute_callback(code) |
| | | if code in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code].clear() |
| | | self.temp_order_queue_dict.pop(code) |
| | |
| | | if temp_list: |
| | | # 上传数据 |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | self.__upload_l2_order_data(code, temp_list) |
| | | # self.__upload_l2_order_data(code, temp_list) |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, time.time()) |
| | | temp_list = [] |
| | | |
| | | else: |
| | | if code not in self.temp_order_queue_dict: |
| | | self.l2_order_codes.discard(code) |
| | |
| | | temp_list.append(data) |
| | | if temp_list: |
| | | # 上传数据 |
| | | self.__upload_l2_data(code, upload_queue, temp_list) |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, temp_list) |
| | | temp_list = [] |
| | | else: |
| | | if code not in self.temp_transaction_queue_dict: |
| | |
| | | self.code_socket_client_dict = {} |
| | | self.rlock = threading.RLock() |
| | | context = zmq.Context() |
| | | if constant.is_windows(): |
| | | return |
| | | for host in self.ipchosts: |
| | | socket = context.socket(zmq.REQ) |
| | | socket.connect(host) |