| | |
| | | |
| | | # L2上传数据管理器 |
| | | class L2DataUploadManager: |
| | | # order_ipc_hosts:远程host |
| | | def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, |
| | | transaction_queue_distribute_manager: CodeQueueDistributeManager, |
| | | market_data_queue: multiprocessing.Queue): |
| | | market_data_queue: multiprocessing.Queue, order_ipc_hosts): |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | | self.transaction_queue_distribute_manager = transaction_queue_distribute_manager |
| | | self.market_data_queue = market_data_queue |
| | |
| | | self.upload_l2_data_task_dict = {} |
| | | self.l2_order_codes = set() |
| | | self.l2_transaction_codes = set() |
| | | # 订单 |
| | | self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts) |
| | | |
| | | # 设置订单过滤条件 |
| | | # special_price:过滤的1手的价格 |
| | |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | | if code not in self.temp_transaction_queue_dict: |
| | | self.temp_transaction_queue_dict[code] = collections.deque() |
| | | # 分配订单上传协议 |
| | | self.l2_order_upload_protocol.distribute_upload_host(code) |
| | | |
| | | if code not in self.upload_l2_data_task_dict: |
| | | t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) |
| | |
| | | self.temp_transaction_queue_dict.pop(code) |
| | | if code in self.upload_l2_data_task_dict: |
| | | self.upload_l2_data_task_dict.pop(code) |
| | | self.l2_order_upload_protocol.release_distributed_upload_host(code) |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait((code, datas, time.time())) |
| | | |
| | | def __upload_l2_order_data(self, code, datas): |
| | | self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time())) |
| | | |
| | | # 处理订单数据并上传 |
| | | def __run_upload_order_task(self, code): |
| | |
| | | |
| | | if temp_list: |
| | | # 上传数据 |
| | | self.__upload_l2_data(code, upload_queue, temp_list) |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | self.__upload_l2_order_data(code, temp_list) |
| | | temp_list = [] |
| | | else: |
| | | if code not in self.temp_order_queue_dict: |
| | |
| | | pass |
| | | |
| | | |
| | | # L2上传数据协议管理器 |
| | | class L2DataUploadProtocolManager: |
| | | |
| | | # ipchosts IPC协议 |
| | |
| | | self.socket_client_dict = {} |
| | | # 保存代码分配的client 格式:{code:(host, socket)} |
| | | self.code_socket_client_dict = {} |
| | | self.lock = threading.Lock() |
| | | self.rlock = threading.RLock() |
| | | context = zmq.Context() |
| | | for host in self.ipchosts: |
| | | socket = context.socket(zmq.REQ) |
| | |
| | | |
| | | # 获取 |
| | | def __get_available_ipchost(self): |
| | | try: |
| | | if len(self.code_socket_client_dict) >= len(self.socket_client_dict): |
| | | raise Exception("无可用host") |
| | | used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict]) |
| | | for host in self.socket_client_dict: |
| | | if host not in used_hosts: |
| | | return host, self.socket_client_dict[host] |
| | | if len(self.code_socket_client_dict) >= len(self.socket_client_dict): |
| | | raise Exception("无可用host") |
| | | finally: |
| | | self.lock.release() |
| | | used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict]) |
| | | for host in self.socket_client_dict: |
| | | if host not in used_hosts: |
| | | return host, self.socket_client_dict[host] |
| | | raise Exception("无可用host") |
| | | |
| | | # 分配HOST |
| | | def distribute_upload_host(self, code): |
| | | self.lock.acquire() |
| | | self.rlock.acquire() |
| | | try: |
| | | host_info = self.__get_available_ipchost() |
| | | if host_info: |
| | | self.code_socket_client_dict[code] = host_info |
| | | finally: |
| | | self.lock.release() |
| | | self.rlock.release() |
| | | |
| | | def release_distributed_upload_host(self, code): |
| | | self.lock.acquire() |
| | | self.rlock.acquire() |
| | | try: |
| | | if code in self.code_socket_client_dict: |
| | | self.code_socket_client_dict.pop(code) |
| | | finally: |
| | | self.lock.release() |
| | | self.rlock.release() |
| | | |
| | | def upload_data_as_json(self, code, data): |
| | | if code not in self.code_socket_client_dict: |
| | |
| | | pass |
| | | |
| | | |
| | | |
| | | def run_test(): |
| | | t = threading.Thread(target=lambda: __test(), daemon=True) |
| | | t.start() |