From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 18 六月 2025 18:41:30 +0800 Subject: [PATCH] 异常保护 --- main.py | 221 +++++++++++++++++++++++++++++++++++++++---------------- 1 files changed, 156 insertions(+), 65 deletions(-) diff --git a/main.py b/main.py index edfd9b0..e3bac57 100644 --- a/main.py +++ b/main.py @@ -1,117 +1,208 @@ """ GUI绠$悊 """ +import math +import psutil + +import constant +from code_attribute import gpcode_manager +from l2.subscript import l2_subscript_manager +from log_module import log +from log_module.log import logger_l2_trade, logger_system +import logging import multiprocessing import os -import sys +import threading -from db import redis_manager_delegate as redis_manager +from task import task_manager +from third_data import hx_qc_value_util +from third_data.code_plate_key_manager import KPLPlateForbiddenManager +from utils import shared_memery_util + +logger_system.info("绋嬪簭鍚姩Pre锛歿}", os.getpid()) + import huaxin_client.trade_client import huaxin_client.l2_client import huaxin_client.l1_client -from log_module import log -from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1 +from huaxin_client import l2_market_client, l2_client_v2 -from server import * - -# 浜ゆ槗鏈嶅姟 -from third_data import data_server -from trade.huaxin import trade_server, trade_api_server +from servers import server_util, huaxin_trade_server, server -# from huaxin_api import trade_client, l2_client, l1_client - - -def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, ptl2_trade, pst_trade): +def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue, + queue_l1_w_strategy_r_: multiprocessing.Queue, + queue_strategy_w_trade_r_: multiprocessing.Queue, + queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, + trade_ipc_addr): + """ + 绛栫暐杩涚▼ + @param queue_strategy_r_trade_w_: + @param queue_l1_w_strategy_r_: + @param queue_strategy_w_trade_r_: + @param queue_strategy_w_trade_r_for_read_: + @param trade_ipc_addr: 浜ゆ槗ipc鍦板潃(涓嬪崟鍦板潃, 鎾ゅ崟鍦板潃) + @return: + """ logger_system.info("绛栫暐杩涚▼ID锛歿}", os.getpid()) log.close_print() # 鍒濆鍖栧弬鏁� - global_data_loader.init() + server.global_data_loader.init() - # # 鏁版嵁鏈嶅姟 - t1 = threading.Thread(target=createDataServer, name="createDataServer", daemon=True) - t1.start() - # - # 浜ゆ槗鎺ュ彛鏈嶅姟 - t1 = threading.Thread(target=trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2), - daemon=True) - t1.start() - # - # redis鍚庡彴鏈嶅姟 - t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True) - t1.start() - # - # 鍚姩L2璁㈤槄鏈嶅姟 - t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client", - args=(ptl2_l2, psl2_l2, trade_server.my_l2_data_callback), - daemon=True) - t1.start() + # 寮�鍚暟鎹湇鍔″櫒 + threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start() + + # 杩愯鏁版嵁鐩戝惉鏈嶅姟 + threading.Thread(target=task_manager.run_data_listener, name="task_manager", + args=(queue_other_w_l2_r, queue_l1_w_strategy_r_), + daemon=True).start() # # 鍚姩鍗庨懌浜ゆ槗鏈嶅姟 - t1 = threading.Thread( - target=lambda: trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd), - name="trade_server", daemon=True) - t1.start() - - huaxin_client.trade_client.run(trade_server.my_trade_response, ptl2_trade, pst_trade) + huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, + queue_strategy_w_trade_r_for_read_, + trade_ipc_addr) # 涓绘湇鍔� def createServer(pipe): logger_system.info("create Server") laddr = "", 9001 - tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_trade=pipe) # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle try: + tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle, + pipe_trade=pipe) # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle tcpserver.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"绔彛鏈嶅姟鍣細{laddr[1]} 鍚姩澶辫触") -def createDataServer(): - logger_system.info("create DataServer") - logger_system.info(f"createDataServer 绾跨▼ID:{tool.get_thread_id()}") - tcpserver = data_server.run("", 9004) - tcpserver.serve_forever() - try: - tcpserver.serve_forever() - except Exception as e: - logger_system.exception(e) - logger_system.error(f"绔彛鏈嶅姟鍣細{9004} 鍚姩澶辫触") +if __name__ == '__main__1': + huaxin_client.l2_client.test() + + +def __create_l2_subscript(): + channel_list = [] + for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): + # 鍒涘缓濮旀墭/鎴愪氦鐨勫叡浜暟缁勫拰ZMQ閫氫俊閫氶亾 + delegate_ipc_addr = f"ipc://order_{i}.ipc" + deal_ipc_addr = f"ipc://deal_{i}.ipc" + delegate = [0, shared_memery_util.create_array(), delegate_ipc_addr] + delegate[0] = shared_memery_util.get_number(delegate[1]) + deal = [0, shared_memery_util.create_array(), deal_ipc_addr] + deal[0] = shared_memery_util.get_number(deal[1]) + channel_list.append((delegate, deal)) + + # L2杩涚▼鏁伴噺 + l2_process_count = 8 + + base_channel_count = len(channel_list) // l2_process_count + left_count = len(channel_list) % l2_process_count + index = 0 + # ======鍒嗙粍====== + # 璁板綍姣忎釜鍒嗙粍鐨勬暟閲� + channel_count_list = [] + # 鏁版嵁鍥炶皟闃熷垪 + data_callback_queue_list = [] + # 娑堟伅浼犻�掗槦鍒� + sub_single_queue_list = [] + + for i in range(l2_process_count): + channel_count = base_channel_count + (1 if i < left_count else 0) + channel_count_list.append(channel_count) + # 璇ヨ繘绋嬩笅鐨勯�氶亾 + channels = channel_list[index:index + channel_count] + index += channel_count + # 璁㈤槄淇″彿闃熷垪, 鏁版嵁鍥炶皟闃熷垪锛堝洖璋冮娆″皬鐨勬暟鎹�氳繃杩欑鍥炶皟锛� + sub_single_queue, data_callback_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024) + sub_single_queue_list.append(sub_single_queue) + data_callback_queue_list.append(data_callback_queue) + l2_process = multiprocessing.Process(target=l2_client_v2.run, + args=(sub_single_queue, data_callback_queue, channels, i, )) + l2_process.start() + + l2_subscript_manager.process_manager = l2_subscript_manager.TargetCodeProcessManager(sub_single_queue_list, channel_count_list) + # 鐩戝惉L2甯傚満琛屾儏鏁版嵁 + huaxin_trade_server.run_l2_market_info_reciever(data_callback_queue_list) + # 鍚姩ZMQserver锛岄拡瀵瑰鎵橀槦鍒椾笌鎴愪氦闃熷垪杩涜鐩戝惉 + l2_subscript_manager.L2DataListener(channel_list).create_data_listener(huaxin_trade_server.my_l2_data_callback) if __name__ == '__main__': + # 鍙粦瀹�16-31涔嬮棿鐨勬牳 try: logger_l2_trade.info("鍚姩绋嬪簭") logger_system.info("鍚姩绋嬪簭--------") log.close_print() - # 绛栫暐涓巗erver闂寸殑閫氫俊 - pss_server, pss_strategy = multiprocessing.Pipe() - # 绛栫暐涓庝氦鏄撻棿鐨勯�氫俊 - pst_trade, pst_strategy = multiprocessing.Pipe() - # 浜ゆ槗涓巐2涔嬮棿鐨勯�氫俊 - ptl2_trade, ptl2_l2 = multiprocessing.Pipe() - # 绛栫暐涓巐2涔嬮棿鐨勯�氫俊 - psl2_strategy, psl2_l2 = multiprocessing.Pipe() - # l1涓庣瓥鐣ラ棿鐨勯�氫俊 - pl1t_l1, pl1t_strategy = multiprocessing.Pipe() + # L2璇诲叾浠栧啓 + queue_other_w_l2_r = multiprocessing.Queue(maxsize=1000) + # l1 + queue_l1_w_strategy_r = multiprocessing.Queue(maxsize=1000) + queue_l1_r_strategy_w = multiprocessing.Queue(maxsize=1000) - # 鎵樼鐜涓嬩笉鍒涘缓 - # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,)) - # serverProcess.start() + # 浜ゆ槗璇荤瓥鐣ュ啓 + queue_strategy_w_trade_r = multiprocessing.Queue(maxsize=1000) + queue_strategy_w_trade_r_for_read = multiprocessing.Queue(maxsize=1000) + # 绛栫暐璇讳氦鏄撳啓 + queue_strategy_r_trade_w = multiprocessing.Queue(maxsize=1000) + + # 涓嬪崟,鎾ゅ崟ipc鍦板潃 + order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc" + logger_system.info("涓昏繘绋婭D锛歿}", os.getpid()) + fix_codes = set() + open_buy_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes() + if open_buy_codes: + fix_codes |= set(open_buy_codes) + # 瑕佺洃鎺х殑楂樻爣 + high_codes = KPLPlateForbiddenManager().get_watch_high_codes() + if high_codes: + fix_codes |= set(high_codes) # L1璁㈤槄鏁版嵁 - l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,)) + l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, + args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w, + fix_codes,)) l1Process.start() + l2MarketProcess = multiprocessing.Process(target=l2_market_client.run, + args=(queue_l1_w_strategy_r,)) + l2MarketProcess.start() + + # 浜ゆ槗杩涚▼ + tradeProcess = multiprocessing.Process( + target=huaxin_client.trade_client.run, + args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r, + queue_strategy_w_trade_r_for_read)) + tradeProcess.start() + # 姝ゅ灏哃2鐨勮繘绋嬩笌绛栫暐杩涚▼鍚堝苟 + + # 娴嬭瘯L2鍗曠嫭杩涚▼ + + if constant.IS_L2_NEW: + __create_l2_subscript() + else: + # 灏哃2鐨勮繘绋嬫敼涓虹嚎绋嬫墽琛� + threading.Thread(target=huaxin_client.l2_client.run, args=( + queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start() + + # 杩愯鍗庨懌澧炲�兼湇鍔¤繘绋� + threading.Thread(target=hx_qc_value_util.run, daemon=True).start() + + # 缁戞牳杩愯 + # psutil.Process(l1Process.pid).cpu_affinity([20]) + # psutil.Process(tradeProcess.pid).cpu_affinity([21, 22]) + # cpu_indexes = [i for i in range(23, 30)] + # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes) # 涓昏繘绋� - createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, ptl2_trade, - pst_trade) + run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, + queue_strategy_w_trade_r_for_read, + (order_ipc_addr, cancel_order_ipc_addr)) # 灏唗radeServer浣滀负涓昏繘绋� l1Process.join() + # l2Process.join() + tradeProcess.join() except Exception as e: + logging.exception(e) logger_system.exception(e) -- Gitblit v1.8.0