From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 18 六月 2025 18:41:30 +0800 Subject: [PATCH] 异常保护 --- main.py | 72 +++++++++++++++++++++++++++++++---- 1 files changed, 63 insertions(+), 9 deletions(-) diff --git a/main.py b/main.py index 9f5c8e3..e3bac57 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,13 @@ """ GUI绠$悊 """ +import math + import psutil +import constant from code_attribute import gpcode_manager +from l2.subscript import l2_subscript_manager from log_module import log from log_module.log import logger_l2_trade, logger_system import logging @@ -14,13 +18,14 @@ from task import task_manager from third_data import hx_qc_value_util from third_data.code_plate_key_manager import KPLPlateForbiddenManager +from utils import shared_memery_util logger_system.info("绋嬪簭鍚姩Pre锛歿}", os.getpid()) import huaxin_client.trade_client import huaxin_client.l2_client import huaxin_client.l1_client -from huaxin_client import l2_market_client +from huaxin_client import l2_market_client, l2_client_v2 from servers import server_util, huaxin_trade_server, server @@ -74,6 +79,54 @@ if __name__ == '__main__1': huaxin_client.l2_client.test() + +def __create_l2_subscript(): + channel_list = [] + for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): + # 鍒涘缓濮旀墭/鎴愪氦鐨勫叡浜暟缁勫拰ZMQ閫氫俊閫氶亾 + delegate_ipc_addr = f"ipc://order_{i}.ipc" + deal_ipc_addr = f"ipc://deal_{i}.ipc" + delegate = [0, shared_memery_util.create_array(), delegate_ipc_addr] + delegate[0] = shared_memery_util.get_number(delegate[1]) + deal = [0, shared_memery_util.create_array(), deal_ipc_addr] + deal[0] = shared_memery_util.get_number(deal[1]) + channel_list.append((delegate, deal)) + + # L2杩涚▼鏁伴噺 + l2_process_count = 8 + + base_channel_count = len(channel_list) // l2_process_count + left_count = len(channel_list) % l2_process_count + index = 0 + # ======鍒嗙粍====== + # 璁板綍姣忎釜鍒嗙粍鐨勬暟閲� + channel_count_list = [] + # 鏁版嵁鍥炶皟闃熷垪 + data_callback_queue_list = [] + # 娑堟伅浼犻�掗槦鍒� + sub_single_queue_list = [] + + for i in range(l2_process_count): + channel_count = base_channel_count + (1 if i < left_count else 0) + channel_count_list.append(channel_count) + # 璇ヨ繘绋嬩笅鐨勯�氶亾 + channels = channel_list[index:index + channel_count] + index += channel_count + # 璁㈤槄淇″彿闃熷垪, 鏁版嵁鍥炶皟闃熷垪锛堝洖璋冮娆″皬鐨勬暟鎹�氳繃杩欑鍥炶皟锛� + sub_single_queue, data_callback_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024) + sub_single_queue_list.append(sub_single_queue) + data_callback_queue_list.append(data_callback_queue) + l2_process = multiprocessing.Process(target=l2_client_v2.run, + args=(sub_single_queue, data_callback_queue, channels, i, )) + l2_process.start() + + l2_subscript_manager.process_manager = l2_subscript_manager.TargetCodeProcessManager(sub_single_queue_list, channel_count_list) + # 鐩戝惉L2甯傚満琛屾儏鏁版嵁 + huaxin_trade_server.run_l2_market_info_reciever(data_callback_queue_list) + # 鍚姩ZMQserver锛岄拡瀵瑰鎵橀槦鍒椾笌鎴愪氦闃熷垪杩涜鐩戝惉 + l2_subscript_manager.L2DataListener(channel_list).create_data_listener(huaxin_trade_server.my_l2_data_callback) + + if __name__ == '__main__': # 鍙粦瀹�16-31涔嬮棿鐨勬牳 try: @@ -123,14 +176,15 @@ queue_strategy_w_trade_r_for_read)) tradeProcess.start() # 姝ゅ灏哃2鐨勮繘绋嬩笌绛栫暐杩涚▼鍚堝苟 - # L2 - # l2Process = multiprocessing.Process( - # target=huaxin_client.l2_client.run, - # args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts,huaxin_trade_server.my_l2_data_callback)) - # l2Process.start() - # 灏哃2鐨勮繘绋嬫敼涓虹嚎绋嬫墽琛� - threading.Thread(target=huaxin_client.l2_client.run, args=( - queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start() + + # 娴嬭瘯L2鍗曠嫭杩涚▼ + + if constant.IS_L2_NEW: + __create_l2_subscript() + else: + # 灏哃2鐨勮繘绋嬫敼涓虹嚎绋嬫墽琛� + threading.Thread(target=huaxin_client.l2_client.run, args=( + queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start() # 杩愯鍗庨懌澧炲�兼湇鍔¤繘绋� threading.Thread(target=hx_qc_value_util.run, daemon=True).start() -- Gitblit v1.8.0