Administrator
2025-06-09 8b7972581d0324e3f634b5b5a57a9ed7db1addaf
test_communication.py
@@ -1,5 +1,6 @@
import logging
import multiprocessing
import random
import threading
import time
@@ -7,6 +8,7 @@
import constant
from huaxin_client.communication.l2_communication import L2ChannelCommunicationParams, L2SharedMemoryDataUtil
from log_module.log import logger_debug
class L2Strategy:
@@ -24,6 +26,8 @@
                use_time = (time.time() - msg['time']) * 1000
                decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
                print(f"获取到逐笔委托数据: 数据长度:{len(decoded_data)} 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
                if use_time > 1:
                    logger_debug.info(f"委托通信耗时:{use_time}")
            except Exception as e:
                logging.exception(e)
            finally:
@@ -38,6 +42,8 @@
                use_time = (time.time() - msg['time']) * 1000
                decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
                print(f"获取到逐笔成交数据:  数据长度:{len(decoded_data)} 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
                if use_time > 1:
                    logger_debug.info(f"成交通信耗时:{use_time}")
            except Exception as e:
                logging.exception(e)
            finally:
@@ -49,6 +55,8 @@
                             args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,), daemon=True).start()
            threading.Thread(target=self.rec_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
                             daemon=True).start()
        while True:
            time.sleep(100)
class L2DataCollector:
@@ -62,30 +70,33 @@
        socket.connect(delegate_ipc_addr)
        while True:
            try:
                datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100
                datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 150
                L2SharedMemoryDataUtil.set_data(datas, shared_memory)
                socket.send_json({'data': [], "time": time.time()})
                response = socket.recv_string()
                print("收到委托反馈")
            except Exception as e:
                logging.exception(e)
            finally:
                time.sleep(5)
                t = random.randint(0, 100)
                time.sleep(t / 1000)
    def send_deal_data(self, deal_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REQ)
        socket.connect(deal_ipc_addr)
        while True:
            try:
                datas = ('888888', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100
                L2SharedMemoryDataUtil.set_data(datas, shared_memory)
                datas = ('888888', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 150
                use_time = L2SharedMemoryDataUtil.set_data(datas, shared_memory)
                if use_time > 0.0005:
                    logger_debug.info(f"数据装载耗时:{use_time}")
                socket.send_json({'data': [], "time": time.time()})
                response = socket.recv_string()
            except Exception as e:
                logging.exception(e)
            finally:
                time.sleep(10)
                t = random.randint(0, 100)
                time.sleep(t / 1000)
    def run(self):
        for param in self.commu_params:
@@ -94,6 +105,8 @@
                             daemon=True).start()
            threading.Thread(target=self.send_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
                             daemon=True).start()
        while True:
            time.sleep(100)
if __name__ == "__main__":
@@ -119,3 +132,7 @@
    l2Process.start()
    while True:
        time.sleep(100)
if __name__ == "__main__1":
    datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100
    print(datas)