Administrator
2024-04-18 2811588dbd78314051c60c5c53cfefdd12dddd08
共享内存/zeromq通信测试
3个文件已添加
235 ■■■■■ 已修改文件
huaxin_client/communication/l2_communication.py 67 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_communication.py 118 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_communication.spec 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/communication/l2_communication.py
New file
@@ -0,0 +1,67 @@
import multiprocessing
import time
import msgpack
class L2ChannelCommunicationParams:
    """
    L2通信参数
    """
    def __init__(self, delegate_ipc_addr: str, deal_ipc_addr: str, delegate_data_shared_memory: multiprocessing.Array,
                 deal_data_shared_memory: multiprocessing.Array):
        """
        初始化参数
        @param delegate_ipc_addr: 逐笔委托ipc地址
        @param deal_ipc_addr: 逐笔成交ipc地址
        @param delegate_data_shared_memroy: 逐笔委托数据共享内存
        @param deal_data_shared_memroy: 逐笔成交数据共享内存
        """
        self.delegate_ipc_addr = delegate_ipc_addr
        self.deal_ipc_addr = deal_ipc_addr
        self.delegate_data_shared_memory = delegate_data_shared_memory
        self.deal_data_shared_memory = deal_data_shared_memory
class L2SharedMemoryDataUtil:
    """
     L2共享内存数据帮助
    """
    @classmethod
    def create_shared_memory(cls, size = 1024 * 1024):
        return multiprocessing.Array('i',size)
    @classmethod
    def set_data(cls, data, array: multiprocessing.Array):
        """
        将数据设置进入共享内存
        @param data: 设置的数据,设置数据最大为99994字节
        @param array: 目标地址
        @return: 数据设置耗时
        """
        time_ = time.time()
        str_ = msgpack.packb(data)
        lenth_count = 5
        str_lenth = "{0:0>5}".format(len(str_))
        lenth_bytes = str_lenth.encode('utf-8')
        array[0:lenth_count] = lenth_bytes
        array[lenth_count:len(str_) + lenth_count] = str_
        return time.time() - time_
    @classmethod
    def get_data(cls, array: multiprocessing.Array):
        """
        获取共享内存数据
        @param array: 内存地址
        @return: 数据,耗时
        """
        time_ = time.time()
        lenth_count = 5
        raw_data = array.get_obj()
        data_lenth = int(bytes(raw_data[:lenth_count]).decode('utf-8'))
        data = raw_data[lenth_count:data_lenth + lenth_count]
        data = msgpack.unpackb(bytes(data))
        return data, time.time() - time_
test_communication.py
New file
@@ -0,0 +1,118 @@
import multiprocessing
import threading
import time
import zmq
import constant
from huaxin_client.communication.l2_communication import L2ChannelCommunicationParams, L2SharedMemoryDataUtil
class L2Strategy:
    def __init__(self, commu_params):
        self.context = zmq.Context()
        self.commu_params = commu_params
    def rec_delegate_data(self, delegate_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REP)
        socket.bind(delegate_ipc_addr)
        while True:
            msg = socket.recv_json()
            try:
                use_time = (time.time() - msg['time']) * 1000
                decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
                print(f"获取到逐笔委托数据: 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
            except:
                pass
            finally:
                socket.send_string("SUCCESS")
    def rec_deal_data(self, deal_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REP)
        socket.bind(deal_ipc_addr)
        while True:
            msg = socket.recv_json()
            try:
                use_time = (time.time() - msg['time']) * 1000
                decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
                print(f"获取到逐笔成交数据: 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
            except:
                pass
            finally:
                socket.send_string("SUCCESS")
    def run(self):
        for param in self.commu_params:
            threading.Thread(target=self.rec_delegate_data,
                             args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,), daemon=True).start()
            threading.Thread(target=self.rec_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
                             daemon=True).start()
class L2DataCollector:
    def __init__(self, commu_params):
        self.context = zmq.Context()
        self.commu_params = commu_params
    def send_delegate_data(self, delegate_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REQ)
        socket.connect(delegate_ipc_addr)
        while True:
            try:
                datas = [('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100]
                L2SharedMemoryDataUtil.set_data(datas, shared_memory)
                socket.send_json({'data': [], "time": time.time()})
                response = socket.recv_string()
            except:
                pass
            finally:
                time.sleep(10)
    def send_deal_data(self, deal_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REQ)
        socket.connect(deal_ipc_addr)
        while True:
            try:
                datas = [('888888', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100]
                L2SharedMemoryDataUtil.set_data(datas, shared_memory)
                socket.send_json({'data': [], "time": time.time()})
                response = socket.recv_string()
            except:
                pass
            finally:
                time.sleep(10)
    def run(self):
        for param in self.commu_params:
            threading.Thread(target=self.send_delegate_data,
                             args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,),
                             daemon=True).start()
            threading.Thread(target=self.send_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
                             daemon=True).start()
if __name__ == "__main__":
    channel_count = 100
    params = []
    for i in range(0, channel_count):
        delegate_data_array = L2SharedMemoryDataUtil.create_shared_memory()
        deal_data_array = L2SharedMemoryDataUtil.create_shared_memory()
        param = L2ChannelCommunicationParams(
            delegate_ipc_addr=f"ipc://order_{i}.ipc",
            deal_ipc_addr=f"ipc://deal_{i}.ipc",
            delegate_data_shared_memory=delegate_data_array,
            deal_data_shared_memory=deal_data_array)
        if constant.is_windows():
            param.delegate_ipc_addr = f"tcp://*:{10000 + i}"
            param.deal_ipc_addr = f"tcp://*:{11000 + i}"
        params.append(param)
    strategy = L2Strategy(params)
    dataCollector = L2DataCollector(params)
    strategyProcess = multiprocessing.Process(target=strategy.run, args=())
    l2Process = multiprocessing.Process(target=dataCollector.run, args=())
    strategyProcess.start()
    l2Process.start()
    while True:
        time.sleep(100)
test_communication.spec
New file
@@ -0,0 +1,50 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
    ['test_communication.py'],
    pathex=[],
    binaries=[],
    datas=[],
    hiddenimports=[],
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=[],
    win_no_prefer_redirects=False,
    win_private_assemblies=False,
    noarchive=False,
    cipher=block_cipher,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
    pyz,
    a.scripts,
    [],
    exclude_binaries=True,
    name='test_communication',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=True,
    console=True,
    disable_windowed_traceback=False,
    argv_emulation=False,
    target_arch=None,
    codesign_identity=None,
    entitlements_file=None,
)
coll = COLLECT(
    exe,
    a.binaries,
    a.zipfiles,
    a.datas,
    strip=False,
    upx=True,
    upx_exclude=[],
    name='test_communication',
)