From aa3392d3d5b0b858deedd44abd7c4020eb565f4c Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 16 八月 2023 15:28:42 +0800 Subject: [PATCH] ++++++++++++++++ bug修复 --- trade/trade_result_manager.py | 12 +- l2/l2_data_util.py | 2 l2/cancel_buy_strategy.py | 6 - l2/l2_data_manager_new.py | 163 +++++++++++++++++++++++----------------- 4 files changed, 101 insertions(+), 82 deletions(-) diff --git a/l2/cancel_buy_strategy.py b/l2/cancel_buy_strategy.py index 430cf2e..1e5f887 100644 --- a/l2/cancel_buy_strategy.py +++ b/l2/cancel_buy_strategy.py @@ -166,13 +166,11 @@ # 濡傛灉start_index涓巄uy_single_index鐩稿悓锛屽嵆鏄笅鍗曞悗鐨勭涓�娆¤绠� # 闇�瑕佹煡璇拱鍏ヤ俊鍙蜂箣鍓嶇殑鍚�1s鏄惁鏈夋定鍋滄挙鐨勬暟鎹� process_index = process_index_old - # 涓嬪崟娆℃暟 - place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) if buy_single_index == start_index: # 绗�1娆¤绠楅渶瑕佽绠椾拱鍏ヤ俊鍙�-鎵ц浣嶇殑鍑�鍊� left_big_num = self.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index, - total_data, place_order_count) + total_data, volume_rate_index) buy_num += left_big_num # 璁剧疆涔板叆淇″彿-涔板叆鎵ц浣嶇殑鏁版嵁涓嶉渶瑕佸鐞� start_index = end_index + 1 @@ -233,7 +231,6 @@ cancel_num += data["re"] * int(val["num"]) # 淇濆瓨鏁版嵁 - if need_cancel: rate__ = round(cancel_num / max(buy_num, 1), 2) if rate__ > cancel_rate_threshold: @@ -509,7 +506,6 @@ l2_log.cancel_debug(code, "H绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿}-{} ", start_index, end_index) # 鑾峰彇涓嬪崟娆℃暟 - place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) cancel_rate_threshold = self.__hCancelParamsManager.get_cancel_rate(volume_index) process_index = start_index # 鏄惁鏈夎娴嬬殑鏁版嵁鎾ゅ崟 diff --git a/l2/l2_data_manager_new.py b/l2/l2_data_manager_new.py index ec5f577..a2ce51c 100644 --- a/l2/l2_data_manager_new.py +++ b/l2/l2_data_manager_new.py @@ -13,7 +13,7 @@ from db import redis_manager_delegate as redis_manager from third_data.code_plate_key_manager import CodePlateKeyBuyManager from trade import trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \ - trade_result_manager, first_code_score_manager, current_price_process_manager + trade_result_manager, first_code_score_manager, current_price_process_manager, trade_data_manager from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log, l2_data_source_util, code_price_manager, \ transaction_progress from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \ @@ -32,7 +32,7 @@ import dask -from trade.trade_manager import TradeTargetCodeModeManager +from trade.trade_manager import TradeTargetCodeModeManager, AccountAvailableMoneyManager class L2DataManager: @@ -215,6 +215,17 @@ __last_buy_single_dict = {} __TradeBuyQueue = transaction_progress.TradeBuyQueue() __latest_process_unique_keys = {} + # 鍒濆鍖� + __TradePointManager = l2_data_manager.TradePointManager() + __SecondCancelBigNumComputer = SecondCancelBigNumComputer() + __HourCancelBigNumComputer = HourCancelBigNumComputer() + __LCancelBigNumComputer = LCancelBigNumComputer() + __TradeStateManager = trade_manager.TradeStateManager() + __CodesTradeStateManager = trade_manager.CodesTradeStateManager() + __PauseBuyCodesManager = gpcode_manager.PauseBuyCodesManager() + __Buy1PriceManager = code_price_manager.Buy1PriceManager() + __AccountAvailableMoneyManager = AccountAvailableMoneyManager() + __TradeBuyDataManager = trade_data_manager.TradeBuyDataManager() # 鑾峰彇浠g爜璇勫垎 @classmethod @@ -271,8 +282,8 @@ @classmethod def process_huaxin(cls, code, origin_datas): print("process_huaxin", code, len(origin_datas)) - origin_start_time = round(t.time() * 1000) datas = None + origin_start_time = round(t.time() * 1000) try: # 鍔犺浇鍘嗗彶鐨凩2鏁版嵁 is_normal = l2.l2_data_util.load_l2_data(code, load_latest=False) @@ -280,6 +291,7 @@ print("鍘嗗彶鏁版嵁寮傚父:", code) # 鏁版嵁涓嶆甯搁渶瑕佺姝氦鏄� l2_trade_util.forbidden_trade(code) + origin_start_time = round(t.time() * 1000) # 杞崲鏁版嵁鏍煎紡 _start_index = 0 total_datas = local_today_datas.get(code) @@ -369,7 +381,7 @@ # 鏃堕棿宸笉鑳藉お澶ф墠鑳藉鐞� if not l2_trade_util.is_in_forbidden_trade_codes(code): # 鍒ゆ柇鏄惁宸茬粡鎸傚崟 - state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) + state = cls.__CodesTradeStateManager.get_trade_state_cache(code) start_index = len(total_datas) - len(add_datas) end_index = len(total_datas) - 1 if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS: @@ -476,18 +488,19 @@ # return cancel_data, cancel_msg # S鎾� - @dask.delayed + # @dask.delayed def s_cancel(): _start_time = round(t.time() * 1000) # S鎾ゅ崟璁$畻锛岀湅绉掔骇澶у崟鎾ゅ崟 try: - b_need_cancel, b_cancel_data = SecondCancelBigNumComputer().need_cancel(code, buy_single_index, - buy_exec_index, start_index, - end_index, total_data, - code_volumn_manager.get_volume_rate_index( - buy_volume_rate), - cls.volume_rate_info[code][1], - is_first_code) + b_need_cancel, b_cancel_data = cls.__SecondCancelBigNumComputer.need_cancel(code, buy_single_index, + buy_exec_index, start_index, + end_index, total_data, + code_volumn_manager.get_volume_rate_index( + buy_volume_rate), + cls.volume_rate_info[code][ + 1], + is_first_code) if b_need_cancel: return b_cancel_data, "S澶у崟鎾ら攢姣斾緥瑙﹀彂闃堝��" except Exception as e: @@ -498,19 +511,19 @@ return None, "" # H鎾� - @dask.delayed + # @dask.delayed def h_cancel(): _start_time = round(t.time() * 1000) try: - b_need_cancel, b_cancel_data = HourCancelBigNumComputer().need_cancel(code, buy_single_index, - buy_exec_index, start_index, - end_index, total_data, - local_today_num_operate_map.get( - code), - code_volumn_manager.get_volume_rate_index( - buy_volume_rate), - cls.volume_rate_info[code][1], - is_first_code) + b_need_cancel, b_cancel_data = cls.__HourCancelBigNumComputer.need_cancel(code, buy_single_index, + buy_exec_index, start_index, + end_index, total_data, + local_today_num_operate_map.get( + code), + code_volumn_manager.get_volume_rate_index( + buy_volume_rate), + cls.volume_rate_info[code][1], + is_first_code) if b_need_cancel and b_cancel_data: return b_cancel_data, "H鎾ら攢姣斾緥瑙﹀彂闃堝��" except Exception as e: @@ -520,15 +533,15 @@ return None, "" # L鎾� - @dask.delayed + # @dask.delayed def l_cancel(): _start_time = round(t.time() * 1000) try: - b_need_cancel, b_cancel_data = LCancelBigNumComputer().need_cancel(code, - buy_exec_index, start_index, - end_index, total_data, - local_today_num_operate_map.get( - code), is_first_code) + b_need_cancel, b_cancel_data = cls.__LCancelBigNumComputer.need_cancel(code, + buy_exec_index, start_index, + end_index, total_data, + local_today_num_operate_map.get( + code), is_first_code) if b_need_cancel and b_cancel_data: return b_cancel_data, "L鎾ら攢姣斾緥瑙﹀彂闃堝��" except Exception as e: @@ -577,19 +590,27 @@ # 鑾峰彇涔板叆淇″彿璧峰鐐� buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( code) + # 鑰佺増鏈挙鍗� + # f1 = compute_safe_count() + # f2 = compute_m_big_num() + # f3 = s_cancel() + # f4 = h_cancel() + # f5 = buy_1_cancel() + # f6 = sell_cancel() + # f7 = l_cancel() + # dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6, f7) + # if is_first_code: + # dask_result = is_need_cancel(f3, f4, f7) + # l2_log.debug(code, "鎾ゅ崟璁$畻寮�濮�") + # cancel_data, cancel_msg = dask_result.compute() + # l2_log.debug(code, "鎾ゅ崟璁$畻缁撴潫") - f1 = compute_safe_count() - f2 = compute_m_big_num() - f3 = s_cancel() - f4 = h_cancel() - f5 = buy_1_cancel() - f6 = sell_cancel() - f7 = l_cancel() - dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6, f7) - if is_first_code: - dask_result = is_need_cancel(f3, f4, f7) - l2_log.debug(code, "鎾ゅ崟璁$畻寮�濮�") - cancel_data, cancel_msg = dask_result.compute() + # 渚濇澶勭悊 + cancel_data, cancel_msg = s_cancel() + if not cancel_data: + cancel_data, cancel_msg = h_cancel() + if not cancel_data: + cancel_data, cancel_msg = l_cancel() l2_log.debug(code, "鎾ゅ崟璁$畻缁撴潫") _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "宸蹭笅鍗�-鎾ゅ崟 鍒ゆ柇鏄惁闇�瑕佹挙鍗�") @@ -660,7 +681,7 @@ last_data_index) l2_log.debug(code, "鎵ц涔板叆鎴愬姛") ################涓嬪崟鎴愬姛澶勭悊################ - trade_result_manager.real_buy_success(code) + trade_result_manager.real_buy_success(code, cls.__TradePointManager) l2_log.debug(code, "澶勭悊涔板叆鎴愬姛") params_desc = cls.__l2PlaceOrderParamsManagerDict[code].get_buy_rank_desc() l2_log.debug(code, params_desc) @@ -714,7 +735,7 @@ @classmethod def __can_buy(cls, code): __start_time = t.time() - if not trade_manager.TradeStateManager().is_can_buy_cache(): + if not cls.__TradeStateManager.is_can_buy_cache(): return False, True, f"浠婃棩宸茬姝氦鏄�" # 涔嬪墠鐨勪唬鐮� # 棣栨澘浠g爜涓斿皻鏈定鍋滆繃鐨勪笉鑳戒笅鍗� @@ -841,10 +862,10 @@ @classmethod def __can_buy_first(cls, code): - if not trade_manager.TradeStateManager().is_can_buy_cache(): + if not cls.__TradeStateManager.is_can_buy_cache(): return False, True, f"浠婃棩宸茬姝氦鏄�" - if gpcode_manager.PauseBuyCodesManager().is_in_cache(code): + if cls.__PauseBuyCodesManager.is_in_cache(code): return False, True, f"璇ヤ唬鐮佽鏆傚仠浜ゆ槗" limit_up_price = gpcode_manager.get_limit_up_price(code) @@ -886,7 +907,7 @@ zyltgb = global_util.zyltgb_map.get(code) if zyltgb >= 200 * 100000000: - buy1_price = code_price_manager.Buy1PriceManager().get_buy1_price(code) + buy1_price = cls.__Buy1PriceManager.get_buy1_price(code) if buy1_price is None: return False, True, f"灏氭湭鑾峰彇鍒颁拱1浠�" dif = float(limit_up_price) - float(buy1_price) @@ -894,13 +915,13 @@ if dif > 0.10001: return False, True, f"鑷敱娴侀��200浜夸互涓婏紝涔�1鍓╀綑妗f暟澶т簬10妗o紝涔颁竴锛坽buy1_price}锛夋定鍋滐紙{limit_up_price}锛�" - open_limit_up_lowest_price = code_price_manager.Buy1PriceManager().get_open_limit_up_lowest_price(code) + open_limit_up_lowest_price = cls.__Buy1PriceManager.get_open_limit_up_lowest_price(code) price_pre_close = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code) if open_limit_up_lowest_price and ( float(open_limit_up_lowest_price) - price_pre_close) / price_pre_close < 0.05: return False, True, f"鐐告澘鍚庢渶浣庝环璺岃嚦5%浠ヤ笅" - limit_up_info = code_price_manager.Buy1PriceManager().get_limit_up_info(code) + limit_up_info = cls.__Buy1PriceManager.get_limit_up_info(code) if limit_up_info[0] is None and False: total_data = local_today_datas.get(code) buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( @@ -908,13 +929,13 @@ # 涔嬪墠娌℃湁娑ㄥ仠杩� # 缁熻涔板叆淇″彿浣嶅埌褰撳墠浣嶇疆娌℃湁鎾ょ殑澶у崟閲戦 min_money_w = l2_data_util.get_big_money_val(float(total_data[buy_single_index]["val"]["price"])) // 10000 - left_big_num = l2.cancel_buy_strategy.SecondCancelBigNumComputer().compute_left_big_num(code, - buy_single_index, - buy_exec_index, - total_data[-1][ - "index"], - total_data, - 0, min_money_w) + left_big_num = cls.__SecondCancelBigNumComputer.compute_left_big_num(code, + buy_single_index, + buy_exec_index, + total_data[-1][ + "index"], + total_data, + 0, min_money_w) if left_big_num > 0: # 閲嶆柊鑾峰彇鍒嗘暟涓庡垎鏁扮储寮� limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code) @@ -1141,7 +1162,7 @@ l2_log.debug(code, "save_limit_up_time") cls.__virtual_buy(code, buy_single_index, compute_index, capture_time) l2_log.debug(code, "__virtual_buy") - l2_data_manager.TradePointManager().delete_buy_cancel_point(code) + cls.__TradePointManager.delete_buy_cancel_point(code) l2_log.debug(code, "delete_buy_cancel_point") # 鏆傛椂涓嶉渶瑕� # f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index, @@ -1172,13 +1193,13 @@ # 鏁版嵁鏄惁澶勭悊瀹屾瘯 if compute_index >= compute_end_index: - need_cancel, cancel_data = SecondCancelBigNumComputer().need_cancel(code, buy_single_index, - compute_index, - buy_single_index, compute_index, - total_datas, is_first_code, - cls.volume_rate_info[code][1], - cls.volume_rate_info[code][1], - True) + need_cancel, cancel_data = cls.__SecondCancelBigNumComputer.need_cancel(code, buy_single_index, + compute_index, + buy_single_index, compute_index, + total_datas, is_first_code, + cls.volume_rate_info[code][1], + cls.volume_rate_info[code][1], + True) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "S绾уぇ鍗曞鐞嗚�楁椂", force=True) l2_log.debug(code, "鏁版嵁澶勭悊瀹屾瘯锛屼笅鍗�, 鏁版嵁鎴浘鏃堕棿-{}", capture_time) @@ -1190,10 +1211,10 @@ else: cls.__buy(code, capture_time, total_datas[compute_index], compute_index, is_first_code) else: - SecondCancelBigNumComputer().need_cancel(code, buy_single_index, compute_index, buy_single_index, - compute_index, total_datas, is_first_code, - cls.volume_rate_info[code][1], - cls.volume_rate_info[code][1], False) + cls.__SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index, + compute_index, total_datas, is_first_code, + cls.volume_rate_info[code][1], + cls.volume_rate_info[code][1], False) l2_log.debug(code, "S绾уぇ鍗曞鐞�") _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, @@ -1225,17 +1246,17 @@ # 鑾峰彇涓嬪崟璧峰淇″彿 @classmethod def __get_order_begin_pos(cls, code): - buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache( + buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = cls.__TradePointManager.get_buy_compute_start_data_cache( code) return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate # 淇濆瓨涓嬪崟璧峰淇″彿 @classmethod - def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, + def __save_order_begin_data(cls, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate): - TradePointManager().set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, - count, - max_num_set, volume_rate) + cls.__TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, + count, + max_num_set, volume_rate) # 璁$畻涓嬪崟璧峰淇″彿 # compute_data_count 鐢ㄤ簬璁$畻鐨刲2鏁版嵁鏁伴噺 diff --git a/l2/l2_data_util.py b/l2/l2_data_util.py index 0804c78..ad6f487 100644 --- a/l2/l2_data_util.py +++ b/l2/l2_data_util.py @@ -152,7 +152,7 @@ # 淇濆瓨鏈�杩戠殑鏁版嵁 __start_time = round(time.time() * 1000) if datas: - RedisUtils.setex(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) + RedisUtils.setex_async(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂") # 璁剧疆杩涘唴瀛� local_latest_datas[code] = datas diff --git a/trade/trade_result_manager.py b/trade/trade_result_manager.py index 52f4bdf..d0c1f79 100644 --- a/trade/trade_result_manager.py +++ b/trade/trade_result_manager.py @@ -15,10 +15,12 @@ def virtual_buy_success(code): + # 宸茬粡涓嶉渶瑕佷簡锛屾殏鏃舵敞閲婃帀 # 澧炲姞涓嬪崟璁$畻 - trade_data_manager.PlaceOrderCountManager().place_order(code) + # trade_data_manager.PlaceOrderCountManager().place_order(code) # 鍒犻櫎涔嬪墠鐨勬澘涓婂崠淇℃伅 - L2LimitUpSellStatisticUtil().delete(code) + # L2LimitUpSellStatisticUtil().delete(code) + pass # 铏氭嫙鎾ゆ垚鍔� @@ -35,7 +37,7 @@ # 鐪熷疄涔版垚鍔� -def real_buy_success(code): +def real_buy_success(code, tradePointManager): # @dask.delayed def clear_max_buy1_volume(code): # 涓嬪崟鎴愬姛锛岄渶瑕佸垹闄ゆ渶澶т拱1 @@ -67,14 +69,14 @@ logging.exception(e) logger_l2_error.exception(e) - buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache( + buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, volume_rate = tradePointManager.get_buy_compute_start_data_cache( code) clear_max_buy1_volume(code) safe_count(code, buy_single_index, buy_exec_index) h_cancel(code, buy_single_index, buy_exec_index) l_cancel(code) - l2_data_manager.TradePointManager().delete_buy_cancel_point(code) + tradePointManager.delete_buy_cancel_point(code) # 鐪熷疄鎾ゆ垚鍔� -- Gitblit v1.8.0