| | |
| | | import threading |
| | | import time |
| | | |
| | | import zmq |
| | | |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_debug |
| | | |
| | |
| | | self.__l2_order_active_time_dict = {} |
| | | self.__l2_transaction_active_time_dict = {} |
| | | self.__l2_market_active_time_dict = {} |
| | | self.zmq_context = zmq.Context() |
| | | |
| | | # 接收L2逐笔委托数据 |
| | | def __recive_l2_orders(self, q: multiprocessing.Queue): |
| | |
| | | finally: |
| | | self.__l2_market_active_time_dict[__id] = time.time() |
| | | |
| | | def __create_ipc_server(self, host): |
| | | socket = self.zmq_context.socket(zmq.REP) |
| | | socket.bind(host) |
| | | count = 0 |
| | | while True: |
| | | try: |
| | | data = socket.recv_json() |
| | | self.my_l2_data_callback.OnL2Order(data[0], data[1], data[2]) |
| | | socket.send_string("SUCCESS") |
| | | except Exception as e: |
| | | async_log_util.exception(logger_debug, e) |
| | | finally: |
| | | count += 1 |
| | | if count > 100: |
| | | count = 0 |
| | | # 记录活跃时间,每100次记录一次 |
| | | self.__l2_order_active_time_dict[host] = time.time() |
| | | |
| | | # 创建订单的IPC服务 |
| | | def __create_ipc_server_hosts(self, order_ipc_hosts): |
| | | for host in order_ipc_hosts: |
| | | threading.Thread(target=lambda: self.__create_ipc_server(host), daemon=True).start() |
| | | |
| | | # 接收L2数据 |
| | | def receive_l2_data(self, order_queues, transaction_queues, market_queue): |
| | | for q in order_queues: |
| | | t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True) |
| | | t1.start() |
| | | def receive_l2_data(self, order_queues, transaction_queues, market_queue, order_ipc_hosts): |
| | | # TODO 暂时不通过队列接收数据 |
| | | # for q in order_queues: |
| | | # t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True) |
| | | # t1.start() |
| | | for q in transaction_queues: |
| | | t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True) |
| | | t2.start() |
| | | t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True) |
| | | t3.start() |
| | | # 接收订单hosts |
| | | self.__create_ipc_server_hosts(order_ipc_hosts) |
| | | |
| | | def get_active_count(self, type_): |
| | | expire_time = time.time() - 5 |