Administrator
2024-02-27 824b434b3e1555ac50ff58d412171a052a953d8a
zeromq集成测试
2个文件已修改
75 ■■■■■ 已修改文件
huaxin_client/l2_data_manager.py 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_pyzmq.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py
@@ -16,6 +16,7 @@
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript
from utils import tool
import collections
import zmq
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
@@ -224,6 +225,62 @@
                pass
# L2上传数据协议管理器
class L2DataUploadProtocolManager:
    # ipchosts IPC协议
    def __init__(self, ipchosts):
        self.ipchosts = ipchosts
        # 所有的client
        self.socket_client_dict = {}
        # 保存代码分配的client 格式:{code:(host, socket)}
        self.code_socket_client_dict = {}
        self.lock = threading.Lock()
        context = zmq.Context()
        for host in self.ipchosts:
            socket = context.socket(zmq.REQ)
            socket.connect(host)
            self.socket_client_dict[host] = socket
    # 获取
    def __get_available_ipchost(self):
        try:
            if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
                raise Exception("无可用host")
            used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
            for host in self.socket_client_dict:
                if host not in used_hosts:
                    return host, self.socket_client_dict[host]
            raise Exception("无可用host")
        finally:
            self.lock.release()
    # 分配HOST
    def distribute_upload_host(self, code):
        self.lock.acquire()
        try:
            host_info = self.__get_available_ipchost()
            if host_info:
                self.code_socket_client_dict[code] = host_info
        finally:
            self.lock.release()
    def release_distributed_upload_host(self, code):
        self.lock.acquire()
        try:
            if code in self.code_socket_client_dict:
                self.code_socket_client_dict.pop(code)
        finally:
            self.lock.release()
    def upload_data_as_json(self, code, data):
        if code not in self.code_socket_client_dict:
            raise Exception("尚未分配host")
        host, socket = self.code_socket_client_dict[code]
        socket.send_json(data)
        socket.recv_string()
def add_target_code(code):
    target_codes.add(code)
    # 记录代码加入时间
@@ -335,4 +392,10 @@
def test():
    pass
    ipclist = []
    for i in range(0, 70):
        ipclist.append(f"ipc://l2order{i}.ipc")
    manager = L2DataUploadProtocolManager(ipclist)
    code = "000333"
    manager.distribute_upload_host(code)
    manager.upload_data_as_json(code, {"test": "test"})
test/test_pyzmq.py
@@ -2,7 +2,7 @@
import multiprocessing
import threading
import time
from huaxin_client import l2_data_manager
import zmq
@@ -45,7 +45,7 @@
    response = socket_client_dict[i].recv_string()
if __name__ == "__main__":
if __name__ == "__main__1":
    serverProcess = multiprocessing.Process(
        target=create_server)
@@ -59,3 +59,9 @@
            threading.Thread(target=lambda: send_data(context, i), daemon=True).start()
        time.sleep(5)
    input()
if __name__ == "__main__":
    l2_data_manager.test()
    pass