New file |
| | |
| | | import multiprocessing |
| | | import time |
| | | |
| | | import msgpack |
| | | |
| | | |
| | | class L2ChannelCommunicationParams: |
| | | """ |
| | | L2通信参数 |
| | | """ |
| | | |
| | | def __init__(self, delegate_ipc_addr: str, deal_ipc_addr: str, delegate_data_shared_memory: multiprocessing.Array, |
| | | deal_data_shared_memory: multiprocessing.Array): |
| | | """ |
| | | 初始化参数 |
| | | @param delegate_ipc_addr: 逐笔委托ipc地址 |
| | | @param deal_ipc_addr: 逐笔成交ipc地址 |
| | | @param delegate_data_shared_memroy: 逐笔委托数据共享内存 |
| | | @param deal_data_shared_memroy: 逐笔成交数据共享内存 |
| | | """ |
| | | self.delegate_ipc_addr = delegate_ipc_addr |
| | | self.deal_ipc_addr = deal_ipc_addr |
| | | self.delegate_data_shared_memory = delegate_data_shared_memory |
| | | self.deal_data_shared_memory = deal_data_shared_memory |
| | | |
| | | |
| | | class L2SharedMemoryDataUtil: |
| | | """ |
| | | L2共享内存数据帮助 |
| | | """ |
| | | |
| | | @classmethod |
| | | def create_shared_memory(cls, size = 1024 * 1024): |
| | | return multiprocessing.Array('i',size) |
| | | |
| | | @classmethod |
| | | def set_data(cls, data, array: multiprocessing.Array): |
| | | """ |
| | | 将数据设置进入共享内存 |
| | | @param data: 设置的数据,设置数据最大为99994字节 |
| | | @param array: 目标地址 |
| | | @return: 数据设置耗时 |
| | | """ |
| | | time_ = time.time() |
| | | str_ = msgpack.packb(data) |
| | | lenth_count = 5 |
| | | str_lenth = "{0:0>5}".format(len(str_)) |
| | | lenth_bytes = str_lenth.encode('utf-8') |
| | | array[0:lenth_count] = lenth_bytes |
| | | array[lenth_count:len(str_) + lenth_count] = str_ |
| | | return time.time() - time_ |
| | | |
| | | @classmethod |
| | | def get_data(cls, array: multiprocessing.Array): |
| | | """ |
| | | 获取共享内存数据 |
| | | @param array: 内存地址 |
| | | @return: 数据,耗时 |
| | | """ |
| | | time_ = time.time() |
| | | lenth_count = 5 |
| | | raw_data = array.get_obj() |
| | | data_lenth = int(bytes(raw_data[:lenth_count]).decode('utf-8')) |
| | | data = raw_data[lenth_count:data_lenth + lenth_count] |
| | | data = msgpack.unpackb(bytes(data)) |
| | | return data, time.time() - time_ |
| | | |
New file |
| | |
| | | import multiprocessing |
| | | import threading |
| | | import time |
| | | |
| | | import zmq |
| | | |
| | | import constant |
| | | from huaxin_client.communication.l2_communication import L2ChannelCommunicationParams, L2SharedMemoryDataUtil |
| | | |
| | | |
| | | class L2Strategy: |
| | | |
| | | def __init__(self, commu_params): |
| | | self.context = zmq.Context() |
| | | self.commu_params = commu_params |
| | | |
| | | def rec_delegate_data(self, delegate_ipc_addr, shared_memory): |
| | | socket = self.context.socket(zmq.REP) |
| | | socket.bind(delegate_ipc_addr) |
| | | while True: |
| | | msg = socket.recv_json() |
| | | try: |
| | | use_time = (time.time() - msg['time']) * 1000 |
| | | decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory) |
| | | print(f"获取到逐笔委托数据: 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s") |
| | | except: |
| | | pass |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | def rec_deal_data(self, deal_ipc_addr, shared_memory): |
| | | socket = self.context.socket(zmq.REP) |
| | | socket.bind(deal_ipc_addr) |
| | | while True: |
| | | msg = socket.recv_json() |
| | | try: |
| | | use_time = (time.time() - msg['time']) * 1000 |
| | | decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory) |
| | | print(f"获取到逐笔成交数据: 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s") |
| | | except: |
| | | pass |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | def run(self): |
| | | for param in self.commu_params: |
| | | threading.Thread(target=self.rec_delegate_data, |
| | | args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,), daemon=True).start() |
| | | threading.Thread(target=self.rec_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,), |
| | | daemon=True).start() |
| | | |
| | | |
| | | class L2DataCollector: |
| | | |
| | | def __init__(self, commu_params): |
| | | self.context = zmq.Context() |
| | | self.commu_params = commu_params |
| | | |
| | | def send_delegate_data(self, delegate_ipc_addr, shared_memory): |
| | | socket = self.context.socket(zmq.REQ) |
| | | socket.connect(delegate_ipc_addr) |
| | | while True: |
| | | try: |
| | | datas = [('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100] |
| | | L2SharedMemoryDataUtil.set_data(datas, shared_memory) |
| | | socket.send_json({'data': [], "time": time.time()}) |
| | | response = socket.recv_string() |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(10) |
| | | |
| | | def send_deal_data(self, deal_ipc_addr, shared_memory): |
| | | socket = self.context.socket(zmq.REQ) |
| | | socket.connect(deal_ipc_addr) |
| | | while True: |
| | | try: |
| | | datas = [('888888', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100] |
| | | L2SharedMemoryDataUtil.set_data(datas, shared_memory) |
| | | socket.send_json({'data': [], "time": time.time()}) |
| | | response = socket.recv_string() |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(10) |
| | | |
| | | def run(self): |
| | | for param in self.commu_params: |
| | | threading.Thread(target=self.send_delegate_data, |
| | | args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,), |
| | | daemon=True).start() |
| | | threading.Thread(target=self.send_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,), |
| | | daemon=True).start() |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | channel_count = 100 |
| | | params = [] |
| | | for i in range(0, channel_count): |
| | | delegate_data_array = L2SharedMemoryDataUtil.create_shared_memory() |
| | | deal_data_array = L2SharedMemoryDataUtil.create_shared_memory() |
| | | param = L2ChannelCommunicationParams( |
| | | delegate_ipc_addr=f"ipc://order_{i}.ipc", |
| | | deal_ipc_addr=f"ipc://deal_{i}.ipc", |
| | | delegate_data_shared_memory=delegate_data_array, |
| | | deal_data_shared_memory=deal_data_array) |
| | | if constant.is_windows(): |
| | | param.delegate_ipc_addr = f"tcp://*:{10000 + i}" |
| | | param.deal_ipc_addr = f"tcp://*:{11000 + i}" |
| | | params.append(param) |
| | | strategy = L2Strategy(params) |
| | | dataCollector = L2DataCollector(params) |
| | | strategyProcess = multiprocessing.Process(target=strategy.run, args=()) |
| | | l2Process = multiprocessing.Process(target=dataCollector.run, args=()) |
| | | strategyProcess.start() |
| | | l2Process.start() |
| | | while True: |
| | | time.sleep(100) |
New file |
| | |
| | | # -*- mode: python ; coding: utf-8 -*- |
| | | |
| | | |
| | | block_cipher = None |
| | | |
| | | |
| | | a = Analysis( |
| | | ['test_communication.py'], |
| | | pathex=[], |
| | | binaries=[], |
| | | datas=[], |
| | | hiddenimports=[], |
| | | hookspath=[], |
| | | hooksconfig={}, |
| | | runtime_hooks=[], |
| | | excludes=[], |
| | | win_no_prefer_redirects=False, |
| | | win_private_assemblies=False, |
| | | noarchive=False, |
| | | cipher=block_cipher, |
| | | ) |
| | | pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher) |
| | | |
| | | exe = EXE( |
| | | pyz, |
| | | a.scripts, |
| | | [], |
| | | exclude_binaries=True, |
| | | name='test_communication', |
| | | debug=False, |
| | | bootloader_ignore_signals=False, |
| | | strip=False, |
| | | upx=True, |
| | | console=True, |
| | | disable_windowed_traceback=False, |
| | | argv_emulation=False, |
| | | target_arch=None, |
| | | codesign_identity=None, |
| | | entitlements_file=None, |
| | | ) |
| | | coll = COLLECT( |
| | | exe, |
| | | a.binaries, |
| | | a.zipfiles, |
| | | a.datas, |
| | | strip=False, |
| | | upx=True, |
| | | upx_exclude=[], |
| | | name='test_communication', |
| | | ) |