Administrator
2024-02-27 122def357140a4a504710a57fe2bc1a8020aa7b1
新版L2数据传输协议测试
5个文件已修改
142 ■■■■ 已修改文件
huaxin_client/l2_client.py 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_listen_manager.py 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -192,7 +192,8 @@
        d = self.codes_volume_and_price_dict.get(code)
        if d:
            min_volume, limit_up_price, special_price, buy_volume = d[0], d[1], d[2], d[3]
            self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price, special_price,buy_volume,
            self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price, special_price,
                                                                    buy_volume,
                                                                    {volume, constant.SHADOW_ORDER_VOLUME},
                                                                    time.time() + 3)
            huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"设置下单量监听:{code}-{volume}")
@@ -565,8 +566,47 @@
pipe_strategy = None
def test_add_codes(queue_r):
    time.sleep(5)
    # if value:
    #     if type(value) == bytes:
    #         value = value.decode("utf-8")
    #     data = json.loads(value)
    #     _type = data["type"]
    #     if _type == "listen_volume":
    #         volume = data["data"]["volume"]
    #         code = data["data"]["code"]
    #         spi.set_code_special_watch_volume(code, volume)
    #     elif _type == "l2_cmd":
    #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
    demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35, 6.00, 200),
                  ("002654", int(50 * 10000 / 15.59), 15.59, 15.3, 200),
                  ("603701", int(50 * 10000 / 14.28), 14.28, 14.00, 200),
                  ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)]
    queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
    time.sleep(1)
    spi.l2_data_upload_manager.add_l2_order_detail(
        {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
         'OrderTime': '13000015',
         'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
    spi.l2_data_upload_manager.add_l2_order_detail(
        {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
         'OrderTime': '13000015',
         'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
    queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
    time.sleep(0.1)
    spi.l2_data_upload_manager.add_l2_order_detail(
        {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
         'OrderTime': '13000015',
         'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None:
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue,
        order_ipc_hosts: list) -> None:
    # def test_add_codes():
    #     time.sleep(5)
    #     # if value:
@@ -614,12 +654,13 @@
        order_queue_distribute_manager = CodeQueueDistributeManager(order_queues)
        transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
        l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
                                                     transaction_queue_distribute_manager, market_queue)
                                                     transaction_queue_distribute_manager, market_queue,
                                                     order_ipc_hosts)
        __init_l2(l2_data_upload_manager)
        l2_data_manager.run_upload_common()
        l2_data_manager.run_log()
        # 测试
        # threading.Thread(target=lambda: test_add_codes(),daemon=True).start()
        threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
        l2CommandManager.init(MyL2ActionCallback())
huaxin_client/l2_data_manager.py
@@ -30,9 +30,10 @@
# L2上传数据管理器
class L2DataUploadManager:
    # order_ipc_hosts:远程host
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue):
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
@@ -42,6 +43,8 @@
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
        self.l2_transaction_codes = set()
        # 订单
        self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts)
    # 设置订单过滤条件
    # special_price:过滤的1手的价格
@@ -141,6 +144,8 @@
            self.temp_order_queue_dict[code] = collections.deque()
        if code not in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code] = collections.deque()
        # 分配订单上传协议
        self.l2_order_upload_protocol.distribute_upload_host(code)
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
@@ -161,9 +166,13 @@
            self.temp_transaction_queue_dict.pop(code)
        if code in self.upload_l2_data_task_dict:
            self.upload_l2_data_task_dict.pop(code)
        self.l2_order_upload_protocol.release_distributed_upload_host(code)
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait((code, datas, time.time()))
    def __upload_l2_order_data(self, code, datas):
        self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time()))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
@@ -182,7 +191,8 @@
                if temp_list:
                    # 上传数据
                    self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.__upload_l2_order_data(code, temp_list)
                    temp_list = []
                else:
                    if code not in self.temp_order_queue_dict:
@@ -225,7 +235,6 @@
                pass
# L2上传数据协议管理器
class L2DataUploadProtocolManager:
    # ipchosts IPC协议
@@ -235,7 +244,7 @@
        self.socket_client_dict = {}
        # 保存代码分配的client 格式:{code:(host, socket)}
        self.code_socket_client_dict = {}
        self.lock = threading.Lock()
        self.rlock = threading.RLock()
        context = zmq.Context()
        for host in self.ipchosts:
            socket = context.socket(zmq.REQ)
@@ -244,34 +253,31 @@
    # 获取
    def __get_available_ipchost(self):
        try:
            if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
                raise Exception("无可用host")
            used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
            for host in self.socket_client_dict:
                if host not in used_hosts:
                    return host, self.socket_client_dict[host]
        if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
            raise Exception("无可用host")
        finally:
            self.lock.release()
        used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
        for host in self.socket_client_dict:
            if host not in used_hosts:
                return host, self.socket_client_dict[host]
        raise Exception("无可用host")
    # 分配HOST
    def distribute_upload_host(self, code):
        self.lock.acquire()
        self.rlock.acquire()
        try:
            host_info = self.__get_available_ipchost()
            if host_info:
                self.code_socket_client_dict[code] = host_info
        finally:
            self.lock.release()
            self.rlock.release()
    def release_distributed_upload_host(self, code):
        self.lock.acquire()
        self.rlock.acquire()
        try:
            if code in self.code_socket_client_dict:
                self.code_socket_client_dict.pop(code)
        finally:
            self.lock.release()
            self.rlock.release()
    def upload_data_as_json(self, code, data):
        if code not in self.code_socket_client_dict:
@@ -386,6 +392,7 @@
    pass
def run_test():
    t = threading.Thread(target=lambda: __test(), daemon=True)
    t.start()
l2/l2_data_listen_manager.py
@@ -5,6 +5,8 @@
import threading
import time
import zmq
from log_module import async_log_util
from log_module.log import logger_debug
@@ -22,6 +24,7 @@
        self.__l2_order_active_time_dict = {}
        self.__l2_transaction_active_time_dict = {}
        self.__l2_market_active_time_dict = {}
        self.zmq_context = zmq.Context()
    # 接收L2逐笔委托数据
    def __recive_l2_orders(self, q: multiprocessing.Queue):
@@ -102,16 +105,42 @@
            finally:
                self.__l2_market_active_time_dict[__id] = time.time()
    def __create_ipc_server(self, host):
        socket = self.zmq_context.socket(zmq.REP)
        socket.bind(host)
        count = 0
        while True:
            try:
                data = socket.recv_json()
                self.my_l2_data_callback.OnL2Order(data[0], data[1], data[2])
                socket.send_string("SUCCESS")
            except Exception as e:
                async_log_util.exception(logger_debug, e)
            finally:
                count += 1
                if count > 100:
                    count = 0
                    # 记录活跃时间,每100次记录一次
                    self.__l2_order_active_time_dict[host] = time.time()
    # 创建订单的IPC服务
    def __create_ipc_server_hosts(self, order_ipc_hosts):
        for host in order_ipc_hosts:
            threading.Thread(target=lambda: self.__create_ipc_server(host), daemon=True).start()
    # 接收L2数据
    def receive_l2_data(self, order_queues, transaction_queues, market_queue):
        for q in order_queues:
            t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
            t1.start()
    def receive_l2_data(self, order_queues, transaction_queues, market_queue, order_ipc_hosts):
        # TODO 暂时不通过队列接收数据
        # for q in order_queues:
        #     t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
        #     t1.start()
        for q in transaction_queues:
            t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True)
            t2.start()
        t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True)
        t3.start()
        # 接收订单hosts
        self.__create_ipc_server_hosts(order_ipc_hosts)
    def get_active_count(self, type_):
        expire_time = time.time() - 5
main.py
@@ -29,7 +29,7 @@
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_,
                      market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_):
                      market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_, order_ipc_hosts_):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -53,7 +53,7 @@
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_, order_queues_,
                            transaction_queues_, market_queue_,
                            queue_l1_trade_w_strategy_r_)
                            queue_l1_trade_w_strategy_r_, order_ipc_hosts_)
# 主服务
@@ -131,23 +131,26 @@
        # 创建L2通信队列
        order_queues = []
        transaction_queues = []
        order_ipc_hosts = []
        market_queue = multiprocessing.Queue()
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            order_queues.append(multiprocessing.Queue())
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            transaction_queues.append(multiprocessing.Queue())
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            order_ipc_hosts.append(f"ipc://l2order{i}.ipc")
        # L2
        l2Process = multiprocessing.Process(
            target=huaxin_client.l2_client.run,
            args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue))
            args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts))
        l2Process.start()
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read,
                          order_queues, transaction_queues, market_queue, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r)
                          queue_l1_trade_w_strategy_r, order_ipc_hosts)
        # 将tradeServer作为主进程
        l1Process.join()
trade/huaxin/huaxin_trade_server.py
@@ -1607,7 +1607,7 @@
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        order_queues, transaction_queues,
        market_queue, queue_l1_trade_w_strategy_r):
        market_queue, queue_l1_trade_w_strategy_r, order_ipc_hosts):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
@@ -1623,7 +1623,7 @@
        # 监听L2数据
        global l2DataListenManager
        l2DataListenManager = L2DataListenManager(my_l2_data_callback)
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue)
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue, order_ipc_hosts)
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,