From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 18 六月 2025 18:41:30 +0800
Subject: [PATCH] 异常保护

---
 main.py |  207 ++++++++++++++++++++++++++++++++++++---------------
 1 files changed, 147 insertions(+), 60 deletions(-)

diff --git a/main.py b/main.py
index 0b12b5f..e3bac57 100644
--- a/main.py
+++ b/main.py
@@ -1,54 +1,66 @@
 """
 GUI绠$悊
 """
+import math
 
+import psutil
+
+import constant
+from code_attribute import gpcode_manager
+from l2.subscript import l2_subscript_manager
+from log_module import log
+from log_module.log import logger_l2_trade, logger_system
+import logging
 import multiprocessing
 import os
+import threading
 
-from db import redis_manager_delegate as redis_manager
+from task import task_manager
+from third_data import hx_qc_value_util
+from third_data.code_plate_key_manager import KPLPlateForbiddenManager
+from utils import shared_memery_util
+
+logger_system.info("绋嬪簭鍚姩Pre锛歿}", os.getpid())
+
 import huaxin_client.trade_client
 import huaxin_client.l2_client
 import huaxin_client.l1_client
-from log_module import log
-from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1
+from huaxin_client import l2_market_client, l2_client_v2
 
-from server import *
-
-# 浜ゆ槗鏈嶅姟
-from third_data import data_server
-from trade.huaxin import huaxin_trade_server, huaxin_trade_api_server
+from servers import server_util, huaxin_trade_server, server
 
 
-# from huaxin_api import trade_client, l2_client, l1_client
-
-
-def createTradeServer(pipe_server, queue_strategy_r_trade_w: multiprocessing.Queue, pipe_l1, pipe_l2, queue_trade_w_l2_r: multiprocessing.Queue, psl2_l2, queue_strategy_w_trade_r: multiprocessing.Queue):
+def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue,
+                 queue_l1_w_strategy_r_: multiprocessing.Queue,
+                 queue_strategy_w_trade_r_: multiprocessing.Queue,
+                 queue_strategy_w_trade_r_for_read_: multiprocessing.Queue,
+                 trade_ipc_addr):
+    """
+    绛栫暐杩涚▼
+    @param queue_strategy_r_trade_w_:
+    @param queue_l1_w_strategy_r_:
+    @param queue_strategy_w_trade_r_:
+    @param queue_strategy_w_trade_r_for_read_:
+    @param trade_ipc_addr: 浜ゆ槗ipc鍦板潃(涓嬪崟鍦板潃, 鎾ゅ崟鍦板潃)
+    @return:
+    """
     logger_system.info("绛栫暐杩涚▼ID锛歿}", os.getpid())
     log.close_print()
     # 鍒濆鍖栧弬鏁�
-    global_data_loader.init()
+    server.global_data_loader.init()
 
-    # # 鏁版嵁鏈嶅姟
-    t1 = threading.Thread(target=createDataServer, name="createDataServer", daemon=True)
-    t1.start()
-    #
-    # 浜ゆ槗鎺ュ彛鏈嶅姟
-    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2),
-                          daemon=True)
-    t1.start()
-    #
-    # redis鍚庡彴鏈嶅姟
-    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True)
-    t1.start()
-    #
-    # 鍚姩L2璁㈤槄鏈嶅姟
-    t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client",
-                          args=(queue_trade_w_l2_r, psl2_l2, huaxin_trade_server.my_l2_data_callback),
-                          daemon=True)
-    t1.start()
+    # 寮�鍚暟鎹湇鍔″櫒
+    threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start()
+
+    # 杩愯鏁版嵁鐩戝惉鏈嶅姟
+    threading.Thread(target=task_manager.run_data_listener, name="task_manager",
+                     args=(queue_other_w_l2_r, queue_l1_w_strategy_r_),
+                     daemon=True).start()
     #
     # 鍚姩鍗庨懌浜ゆ槗鏈嶅姟
-    huaxin_trade_server.run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r)
+    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_,
+                            queue_strategy_w_trade_r_for_read_,
+                            trade_ipc_addr)
 
 
 # 涓绘湇鍔�
@@ -56,65 +68,140 @@
     logger_system.info("create Server")
     laddr = "", 9001
     try:
-        tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_trade=pipe)  # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle
+        tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle,
+                                                pipe_trade=pipe)  # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle
         tcpserver.serve_forever()
     except Exception as e:
         logger_system.exception(e)
         logger_system.error(f"绔彛鏈嶅姟鍣細{laddr[1]} 鍚姩澶辫触")
 
 
-def createDataServer():
-    logger_system.info("create DataServer")
-    logger_system.info(f"createDataServer 绾跨▼ID:{tool.get_thread_id()}")
-    tcpserver = data_server.run("", 9004)
-    tcpserver.serve_forever()
-    try:
-        tcpserver.serve_forever()
-    except Exception as e:
-        logger_system.exception(e)
-        logger_system.error(f"绔彛鏈嶅姟鍣細{9004} 鍚姩澶辫触")
+if __name__ == '__main__1':
+    huaxin_client.l2_client.test()
+
+
+def __create_l2_subscript():
+    channel_list = []
+    for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
+        # 鍒涘缓濮旀墭/鎴愪氦鐨勫叡浜暟缁勫拰ZMQ閫氫俊閫氶亾
+        delegate_ipc_addr = f"ipc://order_{i}.ipc"
+        deal_ipc_addr = f"ipc://deal_{i}.ipc"
+        delegate = [0, shared_memery_util.create_array(), delegate_ipc_addr]
+        delegate[0] = shared_memery_util.get_number(delegate[1])
+        deal = [0, shared_memery_util.create_array(), deal_ipc_addr]
+        deal[0] = shared_memery_util.get_number(deal[1])
+        channel_list.append((delegate, deal))
+
+    # L2杩涚▼鏁伴噺
+    l2_process_count = 8
+
+    base_channel_count = len(channel_list) // l2_process_count
+    left_count = len(channel_list) % l2_process_count
+    index = 0
+    # ======鍒嗙粍======
+    # 璁板綍姣忎釜鍒嗙粍鐨勬暟閲�
+    channel_count_list = []
+    # 鏁版嵁鍥炶皟闃熷垪
+    data_callback_queue_list = []
+    # 娑堟伅浼犻�掗槦鍒�
+    sub_single_queue_list = []
+
+    for i in range(l2_process_count):
+        channel_count = base_channel_count + (1 if i < left_count else 0)
+        channel_count_list.append(channel_count)
+        # 璇ヨ繘绋嬩笅鐨勯�氶亾
+        channels = channel_list[index:index + channel_count]
+        index += channel_count
+        # 璁㈤槄淇″彿闃熷垪, 鏁版嵁鍥炶皟闃熷垪锛堝洖璋冮娆″皬鐨勬暟鎹�氳繃杩欑鍥炶皟锛�
+        sub_single_queue, data_callback_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024)
+        sub_single_queue_list.append(sub_single_queue)
+        data_callback_queue_list.append(data_callback_queue)
+        l2_process = multiprocessing.Process(target=l2_client_v2.run,
+                                             args=(sub_single_queue, data_callback_queue, channels, i, ))
+        l2_process.start()
+
+    l2_subscript_manager.process_manager = l2_subscript_manager.TargetCodeProcessManager(sub_single_queue_list, channel_count_list)
+    # 鐩戝惉L2甯傚満琛屾儏鏁版嵁
+    huaxin_trade_server.run_l2_market_info_reciever(data_callback_queue_list)
+    # 鍚姩ZMQserver锛岄拡瀵瑰鎵橀槦鍒椾笌鎴愪氦闃熷垪杩涜鐩戝惉
+    l2_subscript_manager.L2DataListener(channel_list).create_data_listener(huaxin_trade_server.my_l2_data_callback)
 
 
 if __name__ == '__main__':
+    # 鍙粦瀹�16-31涔嬮棿鐨勬牳
     try:
         logger_l2_trade.info("鍚姩绋嬪簭")
         logger_system.info("鍚姩绋嬪簭--------")
         log.close_print()
-        # 绛栫暐涓巗erver闂寸殑閫氫俊
-        pss_server, pss_strategy = multiprocessing.Pipe()
 
-        # 浜ゆ槗鍐橪2璇�
-        queue_trade_w_l2_r = multiprocessing.Queue()
-        # 绛栫暐涓巐2涔嬮棿鐨勯�氫俊
-        psl2_strategy, psl2_l2 = multiprocessing.Pipe()
-
-        # l1涓庣瓥鐣ラ棿鐨勯�氫俊
-        pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
+        # L2璇诲叾浠栧啓
+        queue_other_w_l2_r = multiprocessing.Queue(maxsize=1000)
+        # l1
+        queue_l1_w_strategy_r = multiprocessing.Queue(maxsize=1000)
+        queue_l1_r_strategy_w = multiprocessing.Queue(maxsize=1000)
 
         # 浜ゆ槗璇荤瓥鐣ュ啓
-        queue_strategy_w_trade_r = multiprocessing.Queue()
+        queue_strategy_w_trade_r = multiprocessing.Queue(maxsize=1000)
+        queue_strategy_w_trade_r_for_read = multiprocessing.Queue(maxsize=1000)
         # 绛栫暐璇讳氦鏄撳啓
-        queue_strategy_r_trade_w = multiprocessing.Queue()
+        queue_strategy_r_trade_w = multiprocessing.Queue(maxsize=1000)
 
-        # 鎵樼鐜涓嬩笉鍒涘缓
-        # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
-        # serverProcess.start()
+        # 涓嬪崟,鎾ゅ崟ipc鍦板潃
+        order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc"
+
         logger_system.info("涓昏繘绋婭D锛歿}", os.getpid())
 
+        fix_codes = set()
+        open_buy_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
+        if open_buy_codes:
+            fix_codes |= set(open_buy_codes)
+        # 瑕佺洃鎺х殑楂樻爣
+        high_codes = KPLPlateForbiddenManager().get_watch_high_codes()
+        if high_codes:
+            fix_codes |= set(high_codes)
         # L1璁㈤槄鏁版嵁
-        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,))
+        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
+                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,
+                                                  fix_codes,))
         l1Process.start()
+
+        l2MarketProcess = multiprocessing.Process(target=l2_market_client.run,
+                                                  args=(queue_l1_w_strategy_r,))
+        l2MarketProcess.start()
 
         # 浜ゆ槗杩涚▼
         tradeProcess = multiprocessing.Process(
-            target=lambda: huaxin_client.trade_client.run(None, queue_trade_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r))
+            target=huaxin_client.trade_client.run,
+            args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
+                  queue_strategy_w_trade_r_for_read))
         tradeProcess.start()
+        # 姝ゅ灏哃2鐨勮繘绋嬩笌绛栫暐杩涚▼鍚堝苟
 
+        # 娴嬭瘯L2鍗曠嫭杩涚▼
+
+        if constant.IS_L2_NEW:
+            __create_l2_subscript()
+        else:
+            # 灏哃2鐨勮繘绋嬫敼涓虹嚎绋嬫墽琛�
+            threading.Thread(target=huaxin_client.l2_client.run, args=(
+                queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
+
+        # 杩愯鍗庨懌澧炲�兼湇鍔¤繘绋�
+        threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
+
+        # 缁戞牳杩愯
+        # psutil.Process(l1Process.pid).cpu_affinity([20])
+        # psutil.Process(tradeProcess.pid).cpu_affinity([21, 22])
+        # cpu_indexes = [i for i in range(23, 30)]
+        # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes)
         # 涓昏繘绋�
-        createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, queue_trade_w_l2_r, psl2_l2, queue_strategy_w_trade_r)
+        run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
+                     queue_strategy_w_trade_r_for_read,
+                     (order_ipc_addr, cancel_order_ipc_addr))
 
         # 灏唗radeServer浣滀负涓昏繘绋�
         l1Process.join()
+        # l2Process.join()
         tradeProcess.join()
     except Exception as e:
         logging.exception(e)

--
Gitblit v1.8.0