import logging import multiprocessing import random import threading import time import zmq import constant from huaxin_client.communication.l2_communication import L2ChannelCommunicationParams, L2SharedMemoryDataUtil from log_module.log import logger_debug class L2Strategy: def __init__(self, commu_params): self.context = zmq.Context() self.commu_params = commu_params def rec_delegate_data(self, delegate_ipc_addr, shared_memory): socket = self.context.socket(zmq.REP) socket.bind(delegate_ipc_addr) while True: msg = socket.recv_json() try: use_time = (time.time() - msg['time']) * 1000 decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory) print(f"获取到逐笔委托数据: 数据长度:{len(decoded_data)} 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s") if use_time > 1: logger_debug.info(f"委托通信耗时:{use_time}") except Exception as e: logging.exception(e) finally: socket.send_string("SUCCESS") def rec_deal_data(self, deal_ipc_addr, shared_memory): socket = self.context.socket(zmq.REP) socket.bind(deal_ipc_addr) while True: msg = socket.recv_json() try: use_time = (time.time() - msg['time']) * 1000 decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory) print(f"获取到逐笔成交数据: 数据长度:{len(decoded_data)} 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s") if use_time > 1: logger_debug.info(f"成交通信耗时:{use_time}") except Exception as e: logging.exception(e) finally: socket.send_string("SUCCESS") def run(self): for param in self.commu_params: threading.Thread(target=self.rec_delegate_data, args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,), daemon=True).start() threading.Thread(target=self.rec_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,), daemon=True).start() while True: time.sleep(100) class L2DataCollector: def __init__(self, commu_params): self.context = zmq.Context() self.commu_params = commu_params def send_delegate_data(self, delegate_ipc_addr, shared_memory): socket = self.context.socket(zmq.REQ) socket.connect(delegate_ipc_addr) while True: try: datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 150 L2SharedMemoryDataUtil.set_data(datas, shared_memory) socket.send_json({'data': [], "time": time.time()}) response = socket.recv_string() except Exception as e: logging.exception(e) finally: t = random.randint(0, 100) time.sleep(t / 1000) def send_deal_data(self, deal_ipc_addr, shared_memory): socket = self.context.socket(zmq.REQ) socket.connect(deal_ipc_addr) while True: try: datas = ('888888', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 150 use_time = L2SharedMemoryDataUtil.set_data(datas, shared_memory) if use_time > 0.0005: logger_debug.info(f"数据装载耗时:{use_time}") socket.send_json({'data': [], "time": time.time()}) response = socket.recv_string() except Exception as e: logging.exception(e) finally: t = random.randint(0, 100) time.sleep(t / 1000) def run(self): for param in self.commu_params: threading.Thread(target=self.send_delegate_data, args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,), daemon=True).start() threading.Thread(target=self.send_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,), daemon=True).start() while True: time.sleep(100) if __name__ == "__main__": channel_count = 100 params = [] for i in range(0, channel_count): delegate_data_array = L2SharedMemoryDataUtil.create_shared_memory() deal_data_array = L2SharedMemoryDataUtil.create_shared_memory() param = L2ChannelCommunicationParams( delegate_ipc_addr=f"ipc://order_{i}.ipc", deal_ipc_addr=f"ipc://deal_{i}.ipc", delegate_data_shared_memory=delegate_data_array, deal_data_shared_memory=deal_data_array) if constant.is_windows(): param.delegate_ipc_addr = f"tcp://127.0.0.1:{15000 + i}" param.deal_ipc_addr = f"tcp://127.0.0.1:{16000 + i}" params.append(param) strategy = L2Strategy(params) dataCollector = L2DataCollector(params) strategyProcess = multiprocessing.Process(target=strategy.run, args=()) l2Process = multiprocessing.Process(target=dataCollector.run, args=()) strategyProcess.start() l2Process.start() while True: time.sleep(100) if __name__ == "__main__1": datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100 print(datas)