From 256248e09fa5e44c18a2cd7b45b203d09aa4a3ee Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 10 九月 2025 10:06:37 +0800 Subject: [PATCH] bug修复 --- l2/l2_transaction_data_processor.py | 148 +++++++++++++++++++++++++++++++++++-------------- 1 files changed, 105 insertions(+), 43 deletions(-) diff --git a/l2/l2_transaction_data_processor.py b/l2/l2_transaction_data_processor.py index abbc61f..438b8e2 100644 --- a/l2/l2_transaction_data_processor.py +++ b/l2/l2_transaction_data_processor.py @@ -22,24 +22,35 @@ from log_module import async_log_util from log_module.log import hx_logger_l2_debug, logger_l2_trade_buy_queue, logger_debug, hx_logger_l2_upload, \ logger_trade, logger_l2_trade -from trade import current_price_process_manager, trade_constant +from trade import current_price_process_manager, trade_constant, trade_manager import concurrent.futures from trade.buy_radical import radical_buy_strategy from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, EveryLimitupBigDealOrderManager -from utils import tool +from utils import tool, trade_util class HuaXinTransactionDatasProcessor: __statistic_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=constant.HUAXIN_L2_MAX_CODES_COUNT + 2) __TradeBuyQueue = transaction_progress.TradeBuyQueue() + # 闈炴定鍋滄垚浜ゆ椂闂� + __not_limit_up_time_dict = {} + # 鏈�杩戞垚浜ゆ暟鎹瓧鍏� + __latest_transaction_data_dict = {} # 璁$畻鎴愪氦杩涘害 @classmethod - def __compute_latest_trade_progress(cls, code, fdatas): + def __compute_latest_trade_progress(cls, code, fdatas, buy_exec_index=None): + """ + 璁$畻鐪熷疄涓嬪崟浣嶇疆 + @param code: + @param fdatas: + @param buy_exec_index: + @return:鐪熷疄涓嬪崟浣嶇疆, 鏄惁鏄繎浼艰绠� + """ buyno_map = l2_data_util.local_today_buyno_map.get(code) if not buyno_map: - return None + return None, False buy_progress_index = None for i in range(len(fdatas) - 1, -1, -1): d = fdatas[i] @@ -49,7 +60,37 @@ if L2DataUtil.is_limit_up_price_buy(buyno_map[buy_no]["val"]): buy_progress_index = buyno_map[buy_no]["index"] break - return buy_progress_index + if buy_progress_index is None and buy_exec_index is not None and buy_exec_index >= 0: + # 娌℃湁鎵惧埌鐪熷疄鎴愪氦杩涘害浣嶄笖鏈変拱鍏ユ墽琛屼綅缃� + # 鏍规嵁鏈�杩戠殑鎴愪氦涔板崟鍙疯绠楃湡瀹炴垚浜や綅缃� + try: + latest_buy_order_no = fdatas[-1][0][6] + total_datas = l2_data_util.local_today_datas.get(code) + if tool.trade_time_sub(total_datas[-1]["val"]["time"], total_datas[buy_exec_index]["val"]["time"]) < 60: + # 涓嬪崟60s鍐呮墠杩欐牱璁$畻 + # 鍙栨渶鏂扮殑鎴愪氦涔板崟鍙凤紝鍦ㄤ袱涓定鍋滃鎵樹拱涔嬮棿鐨勫悗涓�涓暟鎹� + index_1 = None + max_index = total_datas[-1]["index"] + for i in range(max_index, -1, -1): + data = total_datas[i] + val = data['val'] + if not L2DataUtil.is_limit_up_price_buy(val): + continue + if val['orderNo'] < latest_buy_order_no: + index_1 = i + break + if index_1 is not None: + for i in range(index_1 + 1, max_index + 1): + data = total_datas[i] + val = data['val'] + if not L2DataUtil.is_limit_up_price_buy(val): + continue + if val['orderNo'] > latest_buy_order_no: + buy_progress_index = i + return buy_progress_index, True + except Exception as e: + async_log_util.info(logger_debug, f"璁$畻鐪熷疄鎴愪氦杩涘害浣嶅嚭閿欙細{str(e)}") + return buy_progress_index, False @classmethod def statistic_big_order_infos(cls, code, fdatas, order_begin_pos: OrderBeginPosInfo): @@ -57,7 +98,7 @@ 缁熻澶у崟鎴愪氦 @param code: @param fdatas: 鏍煎紡锛歔(鏁版嵁鏈韩, 鏄惁涓诲姩涔�, 鏄惁娑ㄥ仠, 鎬绘垚浜ら, 涓嶅惈ms鏃堕棿锛屽惈ms鏃堕棿)] - @return: + @return: 澶т拱鍗曞垪琛�,澶у崠鍗曞垪琛� """ def statistic_big_buy_data(): @@ -126,6 +167,7 @@ else: buy_datas = statistic_big_buy_data() sell_datas = statistic_big_sell_data() + return buy_datas, sell_datas # L鎾ょ殑姣斾緥涓庝拱鍗栧ぇ鍗曟棤鐩存帴鍏崇郴浜� # if buy_datas or sell_datas: # buy_money = BigOrderDealManager().get_total_buy_money(code) @@ -139,6 +181,8 @@ # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], # data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], # data['SellNo'], data['ExecType'])) + if o_datas: + cls.__latest_transaction_data_dict[code] = o_datas[-1] fdatas = [ [d, d[6] > d[7], limit_up_price == d[1], d[1] * d[2], '', ''] for d in o_datas] @@ -155,11 +199,19 @@ try: current_price_process_manager.set_trade_price(code, fdatas[-1][0][1]) if not fdatas[-1][2]: - # 娌℃湁娑ㄥ仠 - EveryLimitupBigDealOrderManager.open_limit_up(code, f"鏈�鏂版垚浜や环锛歿fdatas[-1][0][1]}") - radical_buy_strategy.clear_data(code, msg=f"娌℃湁娑ㄥ仠锛歿fdatas[-1][0]}") - except: - pass + if code not in cls.__not_limit_up_time_dict: + cls.__not_limit_up_time_dict[code] = fdatas[-1][5] + last_time = cls.__not_limit_up_time_dict[code] + # 鐐告澘鏃堕棿鎸佺画500ms浠ヤ笂绠楃偢鏉� + if tool.trade_time_sub_with_ms(fdatas[-1][5], last_time) > 500: + # 娌℃湁娑ㄥ仠 + EveryLimitupBigDealOrderManager.open_limit_up(code, f"鏈�鏂版垚浜や环锛歿fdatas[-1][0][1]}") + radical_buy_strategy.clear_data(code, msg=f"娌℃湁娑ㄥ仠锛歿fdatas[-1][0]}") + else: + if code in cls.__not_limit_up_time_dict: + cls.__not_limit_up_time_dict.pop(code) + except Exception as e: + async_log_util.error(logger_debug, f"L2鎴愪氦寮�鏉胯绠楅敊璇細{str(e)}") total_datas = l2_data_util.local_today_datas.get(code) use_time_list = [] @@ -179,29 +231,21 @@ _start_time = time.time() # 澶у崟缁熻 # cls.__statistic_thread_pool.submit(cls.statistic_big_order_infos, code, datas, order_begin_pos) + big_sell_list = [] try: - cls.statistic_big_order_infos(code, fdatas, order_begin_pos) + big_buy_list, big_sell_list = cls.statistic_big_order_infos(code, fdatas, order_begin_pos) except Exception as e: async_log_util.error(hx_logger_l2_debug, f"缁熻澶у崟鍑洪敊锛歿str(e)}") use_time_list.append(("缁熻澶у崟鏁版嵁", time.time() - _start_time)) _start_time = time.time() try: + last_data = fdatas[-1] # 缁熻涓婃澘鏃堕棿 - try: - for d in fdatas: - if d[1]: - # 涓诲姩涔� - if d[2]: - # 娑ㄥ仠 - current_price_process_manager.set_latest_not_limit_up_time(code, d[5]) - else: - # 涓诲姩鍗栵紙鏉夸笂锛� - if d[2]: - L2LimitUpSellDataManager.clear_data(code) - break - except: - pass + if last_data[1] and last_data[2]: + current_price_process_manager.set_latest_not_limit_up_time(code, last_data[5]) + if not last_data[1] and last_data[2]: + L2LimitUpSellDataManager.clear_data(code) big_sell_order_info = None # 缁熻鍗栧崟 big_sell_order_info = HuaXinSellOrderStatisticManager.statistic_continue_limit_up_sell_transaction_datas( @@ -212,17 +256,18 @@ _start_time = time.time() if is_placed_order: - - LCancelBigNumComputer().set_big_sell_order_info(code, big_sell_order_info) - # need_cancel, cancel_msg = SCancelBigNumComputer().set_big_sell_order_info_for_cancel(code, # big_sell_order_info, # order_begin_pos) need_cancel, cancel_msg = False, "" cancel_type = None - if need_cancel: - cancel_msg = f"S鎾�:{cancel_msg}" - cancel_type = trade_constant.CANCEL_TYPE_S + try: + can_cancel, cancel_data = LCancelBigNumComputer().add_big_sell_order_deal_list(code, big_sell_list) + if can_cancel: + need_cancel, cancel_msg = True, f"L鍚庡ぇ鍗栧崟鎴愪氦鍙犲姞瑙﹀彂鎾ゅ崟锛歿big_sell_list}" + except Exception as e: + async_log_util.error(logger_debug, f"L鍚庡ぇ鍗栧崟鎴愪氦鍙犲姞瑙﹀彂鎾ゅ崟:{str(e)}") + if not need_cancel: need_cancel, cancel_msg = FCancelBigNumComputer().need_cancel_for_p(code, order_begin_pos) @@ -250,12 +295,17 @@ # if big_money_count > 0: # LCancelRateManager.compute_big_num_deal_rate(code) - buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas) + trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) + # 璁$畻鎴愪氦杩涘害 + buy_progress_index, is_similar = cls.__compute_latest_trade_progress(code, fdatas, + order_begin_pos.buy_exec_index if trade_util.is_delegated( + trade_state) else -1) if buy_progress_index is not None: buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index, total_datas) - l2_log.info(code, logger_l2_trade_buy_queue, "鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code, buy_progress_index) + l2_log.info(code, logger_l2_trade_buy_queue, "鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} is_similar-{}", code, + buy_progress_index, is_similar) if is_placed_order: # NewGCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index, # buy_progress_index) @@ -326,7 +376,6 @@ big_sell_order_info = HuaXinSellOrderStatisticManager.statistic_continue_limit_up_sell_transaction_datas( code, fdatas, limit_up_price) - LCancelBigNumComputer().set_big_sell_order_info(code, big_sell_order_info) need_cancel, cancel_msg = False, "" cancel_type = None if not need_cancel: @@ -337,23 +386,28 @@ L2TradeDataProcessor.cancel_buy(code, cancel_msg, cancel_type=cancel_type) # 缁熻娑ㄥ仠涓诲姩鍗栨垚浜わ紝涓轰簡F鎾ゅ噯澶囨暟鎹� HuaXinSellOrderStatisticManager.statistic_active_sell_deal_volume(code, fdatas, limit_up_price) + trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) # 璁$畻鎴愪氦杩涘害 - buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas) - if buy_progress_index is not None: + _buy_progress_index, _is_similar = cls.__compute_latest_trade_progress(code, fdatas, + order_begin_pos.buy_exec_index if trade_util.is_delegated( + trade_state) else -1) + if _buy_progress_index is not None: total_datas = l2_data_util.local_today_datas.get(code) - buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index, + buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, _buy_progress_index, total_datas) - async_log_util.info(logger_l2_trade_buy_queue, "鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code, - buy_progress_index) + async_log_util.info(logger_l2_trade_buy_queue, "鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} similar-{}", code, + _buy_progress_index, _is_similar) if is_placed_order: LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index, - buy_progress_index, + _buy_progress_index, total_datas) - cancel_result = FCancelBigNumComputer().need_cancel_for_deal_fast(code, buy_progress_index) + cancel_result = FCancelBigNumComputer().need_cancel_for_deal_fast(code, _buy_progress_index) if cancel_result[0]: L2TradeDataProcessor.cancel_buy(code, f"F鎾�:{cancel_result[1]}", cancel_type=trade_constant.CANCEL_TYPE_F) + if o_datas: + cls.__latest_transaction_data_dict[code] = o_datas[-1] limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) # =====鏍煎紡鍖栨暟鎹�===== # 鏁村舰鏁版嵁锛屾牸寮忥細[(鏁版嵁鏈韩, 鏄惁涓诲姩涔�, 鏄惁娑ㄥ仠, 鎬绘垚浜ら, 涓嶅惈ms鏃堕棿锛屽惈ms鏃堕棿)] @@ -417,7 +471,11 @@ # 濡傛灉鏄鍔ㄤ拱灏辨洿鏂版垚浜よ繘搴� if not fdatas[-1][1]: - buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas) + trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) + # 璁$畻鎴愪氦杩涘害 + buy_progress_index, is_similar = cls.__compute_latest_trade_progress(code, fdatas, + order_begin_pos.buy_exec_index if trade_util.is_delegated( + trade_state) else -1) if buy_progress_index is not None: total_datas = l2_data_util.local_today_datas.get(code) cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index, @@ -437,3 +495,7 @@ if _start_time - __start_time > 5: l2_log.info(code, hx_logger_l2_upload, f"{code}澶勭悊鎴愪氦鐢ㄦ椂锛歿_start_time - __start_time} 鏁版嵁鏁伴噺锛歿len(fdatas)} 璇︽儏:{use_time_list}") + + @classmethod + def get_latest_transaction_data(cls, code): + return cls.__latest_transaction_data_dict.get(code) -- Gitblit v1.8.0