From c20c3c10635ce78db4a86ce9c0bb1d02e90f525d Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 08 八月 2023 17:40:42 +0800 Subject: [PATCH] 单例+缓存优化 --- l2/cancel_buy_strategy.py | 557 ++++++++++++++++++++++++++++++------------------------- 1 files changed, 307 insertions(+), 250 deletions(-) diff --git a/l2/cancel_buy_strategy.py b/l2/cancel_buy_strategy.py index a349020..663e63d 100644 --- a/l2/cancel_buy_strategy.py +++ b/l2/cancel_buy_strategy.py @@ -32,64 +32,78 @@ __sCancelParamsManager = l2_trade_factor.SCancelParamsManager __s_big_num_cancel_compute_data_cache = {} + __instance = None + + def __new__(cls, *args, **kwargs): + if not cls.__instance: + cls.__instance = super(SecondCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) + cls.__load_datas() + return cls.__instance + @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() - # 淇濆瓨缁撴潫浣嶇疆 @classmethod - def __save_compute_data(cls, code, process_index, buy_num, cancel_num): - CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code, + def __load_datas(cls): + __redis = cls.__get_redis() + try: + keys = RedisUtils.keys(__redis, "s_big_num_cancel_compute_data-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + val = json.loads(val) + tool.CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code, val) + finally: + RedisUtils.realse(__redis) + + # 淇濆瓨缁撴潫浣嶇疆 + def __save_compute_data(self, code, process_index, buy_num, cancel_num): + CodeDataCacheUtil.set_cache(self.__s_big_num_cancel_compute_data_cache, code, (process_index, buy_num, cancel_num)) key = "s_big_num_cancel_compute_data-{}".format(code) - RedisUtils.setex_async(cls.__db, key, tool.get_expire(), + RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((process_index, buy_num, cancel_num))) - @classmethod - def __get_compute_data(cls, code): + def __get_compute_data(self, code): key = "s_big_num_cancel_compute_data-{}".format(code) - val = RedisUtils.get(cls.__get_redis(), key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return -1, 0, 0 val = json.loads(val) return val[0], val[1], val[2] - @classmethod - def __get_compute_data_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__s_big_num_cancel_compute_data_cache, code) + def __get_compute_data_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__s_big_num_cancel_compute_data_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_compute_data(code) - CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code, val) + val = self.__get_compute_data(code) + CodeDataCacheUtil.set_cache(self.__s_big_num_cancel_compute_data_cache, code, val) return val - @classmethod - def __clear_data(cls, code): - CodeDataCacheUtil.clear_cache(cls.__s_big_num_cancel_compute_data_cache, code) + def __clear_data(self, code): + CodeDataCacheUtil.clear_cache(self.__s_big_num_cancel_compute_data_cache, code) ks = ["s_big_num_cancel_compute_data-{}".format(code)] for key in ks: - RedisUtils.delete_async(cls.__db, key) + RedisUtils.delete_async(self.__db, key) - @classmethod - def clear_data(cls): + def clear_data(self): ks = ["s_big_num_cancel_compute_data-*"] for key in ks: - keys = RedisUtils.keys(cls.__get_redis(), key) + keys = RedisUtils.keys(self.__get_redis(), key) for k in keys: code = k.replace("s_big_num_cancel_compute_data-", "") - cls.__clear_data(code) + self.__clear_data(code) # 璁$畻鍑�澶у崟 - @classmethod - def __compute_left_big_num(cls, code, buy_single_index, start_index, end_index, total_data, volume_rate_index): + def __compute_left_big_num(self, code, buy_single_index, start_index, end_index, total_data, volume_rate_index): # 鐐圭伀澶у崟鏁伴噺 - fire_count = cls.__sCancelParamsManager.get_max_exclude_count(volume_rate_index) - return cls.compute_left_big_num(code, buy_single_index, start_index, end_index, total_data, fire_count, - constant.S_CANCEL_MIN_MONEY) + fire_count = self.__sCancelParamsManager.get_max_exclude_count(volume_rate_index) + return self.compute_left_big_num(code, buy_single_index, start_index, end_index, total_data, fire_count, + constant.S_CANCEL_MIN_MONEY) # 璁$畻鏈挙鐨勬�绘墜鏁� - @classmethod - def compute_left_big_num(cls, code, buy_single_index, start_index, end_index, total_data, fire_count, min_money_w): + def compute_left_big_num(self, code, buy_single_index, start_index, end_index, total_data, fire_count, min_money_w): # 鑾峰彇澶у崟鐨勬渶灏忔墜鏁� left_big_num = 0 for i in range(start_index, end_index + 1): @@ -128,8 +142,7 @@ left_big_num -= val["num"] * data["re"] return left_big_num - @classmethod - def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, is_first_code, + def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, is_first_code, buy_volume_rate_index, volume_rate_index, need_cancel=True): @@ -150,7 +163,7 @@ break # 鑾峰彇澶勭悊杩涘害 - process_index_old, buy_num, cancel_num = cls.__get_compute_data_cache(code) + process_index_old, buy_num, cancel_num = self.__get_compute_data_cache(code) # 濡傛灉start_index涓巄uy_single_index鐩稿悓锛屽嵆鏄笅鍗曞悗鐨勭涓�娆¤绠� # 闇�瑕佹煡璇拱鍏ヤ俊鍙蜂箣鍓嶇殑鍚�1s鏄惁鏈夋定鍋滄挙鐨勬暟鎹� @@ -160,8 +173,8 @@ if buy_single_index == start_index: # 绗�1娆¤绠楅渶瑕佽绠椾拱鍏ヤ俊鍙�-鎵ц浣嶇殑鍑�鍊� - left_big_num = cls.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index, - total_data, place_order_count) + left_big_num = self.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index, + total_data, place_order_count) buy_num += left_big_num # 璁剧疆涔板叆淇″彿-涔板叆鎵ц浣嶇殑鏁版嵁涓嶉渶瑕佸鐞� start_index = end_index + 1 @@ -180,8 +193,8 @@ # if buy_index is not None and a_start_index <= buy_index <= a_end_index: # # 鍦ㄤ拱鍏ヤ俊鍙蜂箣鍚� # cls.__save_cancel_data(code, i) - range_seconds = cls.__sCancelParamsManager.get_buy_time_range(buy_volume_rate_index) - cancel_rate_threshold = cls.__sCancelParamsManager.get_cancel_rate(volume_rate_index) + range_seconds = self.__sCancelParamsManager.get_buy_time_range(buy_volume_rate_index) + cancel_rate_threshold = self.__sCancelParamsManager.get_cancel_rate(volume_rate_index) try: for i in range(start_index, end_index + 1): data = total_data[i] @@ -240,13 +253,12 @@ buy_num, round(cancel_num / max(buy_num, 1), 2), cancel_rate_threshold, range_seconds) # 淇濆瓨澶勭悊杩涘害涓庢暟鎹� - cls.__save_compute_data(code, process_index, buy_num, cancel_num) + self.__save_compute_data(code, process_index, buy_num, cancel_num) return False, None # 涓嬪崟鎴愬姛 - @classmethod - def cancel_success(cls, code): - cls.__clear_data(code) + def cancel_success(self, code): + self.__clear_data(code) # --------------------------------H鎾�------------------------------- @@ -263,172 +275,204 @@ __cancel_traded_progress_cache = {} __cancel_compute_data_cache = {} + __instance = None + + def __new__(cls, *args, **kwargs): + if not cls.__instance: + cls.__instance = super(HourCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) + cls.__load_datas() + return cls.__instance + @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() - # 淇濆瓨鎴愪氦浣嶇疆鍒版墽琛屼綅缃殑鎻芥嫭鑼冨洿鏁版嵁 @classmethod - def __save_watch_index_set(cls, code, datas, process_index, finish): - CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish)) + def __load_datas(cls): + __redis = cls.__get_redis() + try: + keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + val = json.loads(val) + CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val) + + keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs_exec-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + val = json.loads(val) + CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code, val) + + keys = RedisUtils.keys(__redis, "h_cancel_watch_canceled_indexs-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.smembers(__redis, k) + CodeDataCacheUtil.set_cache(cls.__cancel_watch_canceled_indexs_cache, code, val) + + keys = RedisUtils.keys(__redis, "h_cancel_traded_progress-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + val = json.loads(val) + CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code, val) + + keys = RedisUtils.keys(__redis, "h_cancel_compute_data-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + val = json.loads(val) + CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code, val) + + finally: + RedisUtils.realse(__redis) + + # 淇濆瓨鎴愪氦浣嶇疆鍒版墽琛屼綅缃殑鎻芥嫭鑼冨洿鏁版嵁 + + def __save_watch_index_set(self, code, datas, process_index, finish): + CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish)) key = f"h_cancel_watch_indexs-{code}" - RedisUtils.setex_async(cls.__db, key, tool.get_expire(), + RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((list(datas), process_index, finish))) # 淇濆瓨鎴愪氦杩涘害 - @classmethod - def __get_watch_index_set(cls, code): + + def __get_watch_index_set(self, code): key = f"h_cancel_watch_indexs-{code}" - val = RedisUtils.get(cls.__get_redis(), key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return None, -1, False val = json.loads(val) return val[0], val[1], val[2] - @classmethod - def __get_watch_index_set_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_indexs_cache, code) + def __get_watch_index_set_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_watch_index_set(code) - CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val) + val = self.__get_watch_index_set(code) + CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, val) return val # 淇濆瓨鎵ц浣嶇疆鍚庨潰鐨勫畧鎶ゆ暟鎹� - @classmethod - def __save_watch_index_set_after_exec(cls, code, datas, process_index, total_count, big_num_count, finished): - CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code, + + def __save_watch_index_set_after_exec(self, code, datas, process_index, total_count, big_num_count, finished): + CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_exec_cache, code, (list(datas), process_index, total_count, big_num_count, finished)) key = f"h_cancel_watch_indexs_exec-{code}" - RedisUtils.setex_async(cls.__db, key, tool.get_expire(), + RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((list(datas), process_index, total_count, big_num_count, finished))) # 淇濆瓨鎴愪氦杩涘害 - @classmethod - def __get_watch_index_set_after_exec(cls, code): + def __get_watch_index_set_after_exec(self, code): key = f"h_cancel_watch_indexs_exec-{code}" - val = RedisUtils.get(cls.__get_redis(), key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return [], -1, 0, 0, False val = json.loads(val) return val[0], val[1], val[2], val[3], val[4] - @classmethod - def __get_watch_index_set_after_exec_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_indexs_exec_cache, code) + def __get_watch_index_set_after_exec_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_exec_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_watch_index_set_after_exec(code) - CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code, val) + val = self.__get_watch_index_set_after_exec(code) + CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_exec_cache, code, val) return val # 淇濆瓨宸茬粡鎾ゅ崟鐨勭洃鍚綅缃� - @classmethod - def __add_watch_canceled_indexes(cls, code, indexes): - if code not in cls.__cancel_watch_canceled_indexs_cache: - cls.__cancel_watch_canceled_indexs_cache[code] = set() + def __add_watch_canceled_indexes(self, code, indexes): + if code not in self.__cancel_watch_canceled_indexs_cache: + self.__cancel_watch_canceled_indexs_cache[code] = set() key = f"h_cancel_watch_canceled_indexs-{code}" for index in indexes: - cls.__cancel_watch_canceled_indexs_cache[code].add(index) - RedisUtils.sadd_async(cls.__db, key, index) - RedisUtils.expire_async(cls.__db, key, tool.get_expire()) + self.__cancel_watch_canceled_indexs_cache[code].add(index) + RedisUtils.sadd_async(self.__db, key, index) + RedisUtils.expire_async(self.__db, key, tool.get_expire()) - @classmethod - def __get_watch_canceled_index(cls, code): + def __get_watch_canceled_index(self, code): key = f"h_cancel_watch_canceled_indexs-{code}" - return RedisUtils.smembers(cls.__get_redis(), key) + return RedisUtils.smembers(self.__get_redis(), key) - @classmethod - def __get_watch_canceled_index_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_canceled_indexs_cache, code) + def __get_watch_canceled_index_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_canceled_indexs_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_watch_canceled_index(code) - CodeDataCacheUtil.set_cache(cls.__cancel_watch_canceled_indexs_cache, code, val) + val = self.__get_watch_canceled_index(code) + CodeDataCacheUtil.set_cache(self.__cancel_watch_canceled_indexs_cache, code, val) return val - @classmethod - def __del_watch_canceled_index(cls, code): - CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code) + def __del_watch_canceled_index(self, code): + CodeDataCacheUtil.clear_cache(self.__cancel_watch_canceled_indexs_cache, code) key = f"h_cancel_watch_canceled_indexs-{code}" - RedisUtils.delete(cls.__get_redis(), key) + RedisUtils.delete(self.__get_redis(), key) # 淇濆瓨鎴愪氦杩涘害 - @classmethod - def __save_traded_progress(cls, code, origin_process_index, latest_process_index): - CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code, + def __save_traded_progress(self, code, origin_process_index, latest_process_index): + CodeDataCacheUtil.set_cache(self.__cancel_traded_progress_cache, code, (origin_process_index, latest_process_index)) key = "h_cancel_traded_progress-{}".format(code) - RedisUtils.setex_async(cls.__db, key, tool.get_expire(), + RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((origin_process_index, latest_process_index))) - @classmethod - def __get_traded_progress(cls, code): + def __get_traded_progress(self, code): key = "h_cancel_traded_progress-{}".format(code) - val = RedisUtils.get(cls.__get_redis(), key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return None, None val = json.loads(val) return val[0], val[1] - @classmethod - def __get_traded_progress_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_traded_progress_cache, code) + def __get_traded_progress_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_traded_progress_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_traded_progress(code) - CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code, val) + val = self.__get_traded_progress(code) + CodeDataCacheUtil.set_cache(self.__cancel_traded_progress_cache, code, val) return val # 淇濆瓨缁撶畻浣嶇疆 - @classmethod - def __save_compute_data(cls, code, process_index, cancel_num): - CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code, + def __save_compute_data(self, code, process_index, cancel_num): + CodeDataCacheUtil.set_cache(self.__cancel_compute_data_cache, code, (process_index, cancel_num)) key = "h_cancel_compute_data-{}".format(code) - RedisUtils.setex_async(cls.__db, key, tool.get_expire(), json.dumps((process_index, cancel_num))) + RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((process_index, cancel_num))) - @classmethod - def __get_compute_data(cls, code): + def __get_compute_data(self, code): key = "h_cancel_compute_data-{}".format(code) - val = RedisUtils.get(cls.__get_redis(), key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return -1, 0 val = json.loads(val) return val[0], val[1] - @classmethod - def __get_compute_data_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_compute_data_cache, code) + def __get_compute_data_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_compute_data_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_compute_data(code) - CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code, val) + val = self.__get_compute_data(code) + CodeDataCacheUtil.set_cache(self.__cancel_compute_data_cache, code, val) return val - @classmethod - def __del_compute_data(cls, code): - CodeDataCacheUtil.clear_cache(cls.__cancel_compute_data_cache, code) + def __del_compute_data(self, code): + CodeDataCacheUtil.clear_cache(self.__cancel_compute_data_cache, code) key = "h_cancel_compute_data-{}".format(code) - RedisUtils.delete(cls.__get_redis(), key) + RedisUtils.delete(self.__get_redis(), key) - @classmethod - def __clear_data(cls, code): - - CodeDataCacheUtil.clear_cache(cls.__cancel_watch_indexs_cache, code) - CodeDataCacheUtil.clear_cache(cls.__cancel_traded_progress_cache, code) - CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code) - CodeDataCacheUtil.clear_cache(cls.__cancel_watch_indexs_exec_cache, code) - CodeDataCacheUtil.clear_cache(cls.__cancel_compute_data_cache, code) + def __clear_data(self, code): + CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_cache, code) + CodeDataCacheUtil.clear_cache(self.__cancel_traded_progress_cache, code) + CodeDataCacheUtil.clear_cache(self.__cancel_watch_canceled_indexs_cache, code) + CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_exec_cache, code) + CodeDataCacheUtil.clear_cache(self.__cancel_compute_data_cache, code) ks = ["h_cancel_compute_data-{}".format(code), f"h_cancel_watch_indexs_exec-{code}", f"h_cancel_watch_indexs-{code}", f"h_cancel_traded_progress-{code}", f"h_cancel_watch_canceled_indexs-{code}"] for key in ks: - RedisUtils.delete(cls.__get_redis(), key) + RedisUtils.delete(self.__get_redis(), key) - @classmethod - def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, + def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map, buy_volume_index, volume_index, is_first_code): @@ -436,14 +480,14 @@ total_data[buy_exec_index]["val"]["time"]) if time_space >= constant.S_CANCEL_EXPIRE_TIME - 1: # 寮�濮嬭绠楅渶瑕佺洃鎺х殑鍗� - cls.__compute_watch_indexs_after_single(code, buy_single_index, buy_exec_index, total_data, - local_today_num_operate_map, buy_volume_index) + self.__compute_watch_indexs_after_single(code, buy_single_index, buy_exec_index, total_data, + local_today_num_operate_map, buy_volume_index) # 瀹堟姢30s浠ュ鐨勬暟鎹� if time_space <= constant.S_CANCEL_EXPIRE_TIME: return False, None # 鑾峰彇鎴愪氦杩涘害 - origin_progress_index, latest_progress_index = cls.__get_traded_progress_cache(code) + origin_progress_index, latest_progress_index = self.__get_traded_progress_cache(code) if latest_progress_index is None: latest_progress_index = -1 # 鐩戝惉鐨勬暟鎹� @@ -454,7 +498,7 @@ total_nums = 1 if origin_progress_index is not None: # 鑾峰彇鎴愪氦浣嶇疆鍒版墽琛屼綅缃殑鐩戞帶鏁版嵁 - watch_indexs = cls.__get_watch_index_set_cache(code)[0] + watch_indexs = self.__get_watch_index_set_cache(code)[0] # 鐩戝惉鐨勬�绘暟 for indexs in watch_indexs: index = indexs[0] @@ -464,7 +508,7 @@ watch_indexs_dict[index] = indexs total_nums += total_data[index]["val"]["num"] * indexs[2] # 鑾峰彇鍒版墽琛屼綅鍚庣殑鐩戝惉鏁版嵁 - datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec_cache(code) + datas, process_index, total_count, big_num_count, finished = self.__get_watch_index_set_after_exec_cache(code) if datas: for indexs in datas: index = indexs[0] @@ -473,17 +517,17 @@ watch_indexs_dict[index] = indexs total_nums += total_data[index]["val"]["num"] * indexs[2] - processed_index, cancel_num = cls.__get_compute_data_cache(code) + processed_index, cancel_num = self.__get_compute_data_cache(code) l2_log.cancel_debug(code, "H绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿}-{} ", start_index, end_index) # 鑾峰彇涓嬪崟娆℃暟 place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) - cancel_rate_threshold = cls.__hCancelParamsManager.get_cancel_rate(volume_index) + cancel_rate_threshold = self.__hCancelParamsManager.get_cancel_rate(volume_index) process_index = start_index # 鏄惁鏈夎娴嬬殑鏁版嵁鎾ゅ崟 has_watch_canceled = False # 鑾峰彇涔嬪墠宸茬粡鎾ゅ崟鐨勬暟鎹� - old_canceld_indexs = cls.__get_watch_canceled_index_cache(code) + old_canceld_indexs = self.__get_watch_canceled_index_cache(code) # 閲嶆柊璁$畻鎾ゅ崟 cancel_num = 0 if old_canceld_indexs: @@ -521,9 +565,9 @@ len(watch_indexs_dict.keys())) l2_log.trade_record(code, "H鎾�", "'index':{} , 'rate':{} ,'target_rate':{}", i, rate__, cancel_rate_threshold) - cls.__add_watch_canceled_indexes(code, temp_watch_canceled_index) + self.__add_watch_canceled_indexes(code, temp_watch_canceled_index) return True, data - cls.__add_watch_canceled_indexes(code, temp_watch_canceled_index) + self.__add_watch_canceled_indexes(code, temp_watch_canceled_index) rate__ = round(cancel_num / total_nums, 4) if rate__ > cancel_rate_threshold: @@ -543,9 +587,9 @@ logger_l2_h_cancel.info( f"code-{code} H绾ф挙鍗曡绠楃粨鏋� 鑼冨洿锛歿start_index}-{end_index} 澶勭悊杩涘害锛歿process_index} 鐩爣姣斾緥锛歿cancel_rate_threshold} 鍙栨秷璁$畻缁撴灉:{cancel_num}/{total_nums}") # H鎾ゅ凡鎾よ鍗� - logger_l2_h_cancel.info(f"code-{code} H鎾ゅ凡鎾よ鍗曪細{cls.__get_watch_canceled_index_cache(code)}") + logger_l2_h_cancel.info(f"code-{code} H鎾ゅ凡鎾よ鍗曪細{self.__get_watch_canceled_index_cache(code)}") # 淇濆瓨澶勭悊杩涘害涓庢暟鎹� - cls.__save_compute_data(code, process_index, cancel_num) + self.__save_compute_data(code, process_index, cancel_num) # 鏈夎娴嬫暟鎹挙鍗� if has_watch_canceled: now_rate = round(cancel_num / total_nums, 4) @@ -557,33 +601,30 @@ return False, None # 涓嬪崟鎴愬姛 - @classmethod - def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map): - cls.__clear_data(code) + def place_order_success(self, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map): + self.__clear_data(code) # 璁剧疆鎴愪氦杩涘害 - @classmethod - def set_trade_progress(cls, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map): - cls.__tradeBuyQueue.set_traded_index(code, index) + def set_trade_progress(self, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map): + self.__tradeBuyQueue.set_traded_index(code, index) # 濡傛灉鑾峰彇鏃堕棿涓庢墽琛屾椂闂村皬浜�29鍒欎笉闇�瑕佸鐞� if buy_exec_index is None or buy_exec_index < 0 or tool.trade_time_sub(data_time, total_data[buy_exec_index]["val"][ "time"]) < constant.S_CANCEL_EXPIRE_TIME - 1: return # 淇濆瓨鎴愪氦杩涘害 - origin_index, latest_index = cls.__get_traded_progress_cache(code) + origin_index, latest_index = self.__get_traded_progress_cache(code) if origin_index is None: - cls.__save_traded_progress(code, index, index) + self.__save_traded_progress(code, index, index) # 璁$畻鎻芥嫭鑼冨洿 - cls.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data, - local_today_num_operate_map) + self.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data, + local_today_num_operate_map) else: - cls.__save_traded_progress(code, origin_index, index) + self.__save_traded_progress(code, origin_index, index) logger_l2_h_cancel.info(f"code-{code} 鎴愪氦杩涘害锛歿index} 鏁版嵁缁撴潫浣嶇疆锛�" + str(total_data[-1]["index"])) # 娑ㄥ仠涔版槸鍚︽挙鍗� - @classmethod - def __get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map, + def __get_limit_up_buy_no_canceled_count(self, code, index, total_data, local_today_num_operate_map, MAX_EXPIRE_CANCEL_TIME=None): data = None try: @@ -619,13 +660,12 @@ # 璁$畻鎺掑悕鍓峃鐨勫ぇ鍗� # 杩囨椂鏁版嵁 - @classmethod - def __compute_top_n_num(cls, code, start_index, total_data, local_today_num_operate_map, count): + def __compute_top_n_num(self, code, start_index, total_data, local_today_num_operate_map, count): # 鎵惧埌杩樻湭鎾ょ殑TOPN澶у崟 watch_set = set() for i in range(start_index, total_data[-1]["index"] + 1): - not_cancel_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, - local_today_num_operate_map) + not_cancel_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data, + local_today_num_operate_map) if not_cancel_count > 0: watch_set.add((i, total_data[i]["val"]["num"], not_cancel_count)) # 閽堟寜鐓ф墜鏁版帓搴� @@ -637,14 +677,13 @@ return watch_set # 浠庢垚浜や綅缃埌鎵ц浣嶇疆 - @classmethod - def __compute_watch_indexs_between_traded_exec(cls, code, progress_index, buy_exec_index, total_data, + def __compute_watch_indexs_between_traded_exec(self, code, progress_index, buy_exec_index, total_data, local_today_num_operate_map): total_count = 0 watch_set = set() big_num_count = 0 for i in range(progress_index, buy_exec_index): - left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map) + left_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map) if left_count > 0: data = total_data[i] val = data["val"] @@ -659,15 +698,14 @@ final_watch_list = list(watch_set) final_watch_list.sort(key=lambda x: x[0]) logger_l2_h_cancel.info(f"code-{code} H鎾ょ洃鎺ф垚浜や綅鍒版墽琛屼綅:{final_watch_list}") - cls.__save_watch_index_set(code, final_watch_list, buy_exec_index, True) + self.__save_watch_index_set(code, final_watch_list, buy_exec_index, True) # 鍒犻櫎鍘熸潵鐨勮绠楁暟鎹� # cls.__del_compute_data(code) # 璁$畻鎵ц浣嶇疆涔嬪悗鐨勯渶瑕佺洃鍚殑鏁版嵁 - @classmethod - def __compute_watch_indexs_after_single(cls, code, buy_single_index, buy_exec_index, total_data, + def __compute_watch_indexs_after_single(self, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map, buy_volume_index): - watch_list, process_index_old, total_count_old, big_num_count_old, finish = cls.__get_watch_index_set_after_exec_cache( + watch_list, process_index_old, total_count_old, big_num_count_old, finish = self.__get_watch_index_set_after_exec_cache( code) if watch_list and finish: # 宸茬粡璁$畻瀹屼簡涓嶉渶瑕佸啀杩涜璁$畻 @@ -683,17 +721,17 @@ big_num_count = big_num_count_old total_count = total_count_old # H鎾ゅ崟 - MIN_H_COUNT = cls.__hCancelParamsManager.get_max_watch_count(buy_volume_index) + MIN_H_COUNT = self.__hCancelParamsManager.get_max_watch_count(buy_volume_index) # 浠庝拱鍏ヤ俊鍙蜂綅3鏉℃暟鎹紑濮嬭绠� for i in range(buy_single_index + 3, total_data[-1]["index"] + 1): if i <= process_index_old: continue process_index = i - left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, - local_today_num_operate_map, - tool.trade_time_add_second( - total_data[buy_exec_index]["val"]["time"], - constant.S_CANCEL_EXPIRE_TIME)) + left_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data, + local_today_num_operate_map, + tool.trade_time_add_second( + total_data[buy_exec_index]["val"]["time"], + constant.S_CANCEL_EXPIRE_TIME)) if left_count > 0: data = total_data[i] val = data["val"] @@ -720,22 +758,21 @@ final_watch_list.sort(key=lambda x: x[0]) logger_l2_h_cancel.info(f"code-{code} H鎾ょ洃鎺ф墽琛屼綅鐩搁偦鍗�:{final_watch_list} 鐩爣璁$畻鏁伴噺锛歿MIN_H_COUNT}") # 淇濆瓨璁$畻鑼冨洿 - cls.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count, - finished) + self.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count, + finished) # 鍒犻櫎鍘熸潵鐨勮绠楁暟鎹� # cls.__del_compute_data(code) # 鑾峰彇H鎾ょ洃鍚殑鏁版嵁绱㈠紩鑼冨洿 # 杩斿洖鐩戝惉鑼冨洿涓庡凡鎾ゅ崟绱㈠紩 - @classmethod - def get_watch_index_dict(cls, code): - origin_progress_index, latest_progress_index = cls.__get_traded_progress_cache(code) + def get_watch_index_dict(self, code): + origin_progress_index, latest_progress_index = self.__get_traded_progress_cache(code) # 鐩戝惉鐨勬暟鎹� watch_indexs_dict = {} total_nums = 0 if origin_progress_index is not None: # 鑾峰彇鎴愪氦浣嶇疆鍒版墽琛屼綅缃殑鐩戞帶鏁版嵁 - watch_indexs = cls.__get_watch_index_set_cache(code)[0] + watch_indexs = self.__get_watch_index_set_cache(code)[0] # 鐩戝惉鐨勬�绘暟 for indexs in watch_indexs: index = indexs[0] @@ -744,12 +781,12 @@ # 鍙绠楁渶杩戠殑鎵ц浣嶄箣鍚庣殑鏁版嵁 watch_indexs_dict[index] = indexs # 鑾峰彇鍒版墽琛屼綅鍚庣殑鐩戝惉鏁版嵁 - datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec_cache(code) + datas, process_index, total_count, big_num_count, finished = self.__get_watch_index_set_after_exec_cache(code) if datas: for indexs in datas: index = indexs[0] watch_indexs_dict[index] = indexs - return watch_indexs_dict, cls.__get_watch_canceled_index_cache(code) + return watch_indexs_dict, self.__get_watch_canceled_index_cache(code) # ---------------------------------D鎾�------------------------------- @@ -760,57 +797,71 @@ __redis_manager = redis_manager.RedisManager(0) __cancel_real_order_index_cache = {} + __instance = None + + def __new__(cls, *args, **kwargs): + if not cls.__instance: + cls.__instance = super(DCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) + cls.__load_datas() + return cls.__instance + + @classmethod + def __load_datas(cls): + __redis = cls.__get_redis() + try: + keys = RedisUtils.keys(__redis, "d_cancel_real_order_index-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, int(val)) + finally: + RedisUtils.realse(__redis) + @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() - @classmethod - def __set_real_order_index(cls, code, index): - CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, index) - RedisUtils.setex_async(cls.__db, f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}") + def __set_real_order_index(self, code, index): + CodeDataCacheUtil.set_cache(self.__cancel_real_order_index_cache, code, index) + RedisUtils.setex_async(self.__db, f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}") - @classmethod - def __del_real_order_index(cls, code): - CodeDataCacheUtil.clear_cache(cls.__cancel_real_order_index_cache, code) - RedisUtils.delete_async(cls.__db, f"d_cancel_real_order_index-{code}") + def __del_real_order_index(self, code): + CodeDataCacheUtil.clear_cache(self.__cancel_real_order_index_cache, code) + RedisUtils.delete_async(self.__db, f"d_cancel_real_order_index-{code}") - @classmethod - def __get_real_order_index(cls, code): - val = RedisUtils.get(cls.__db, f"d_cancel_real_order_index-{code}") + def __get_real_order_index(self, code): + val = RedisUtils.get(self.__db, f"d_cancel_real_order_index-{code}") if val: return int(val) return None - @classmethod - def __get_real_order_index_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_real_order_index_cache, code) + def __get_real_order_index_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_real_order_index_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_real_order_index(code) - CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, val) + val = self.__get_real_order_index(code) + CodeDataCacheUtil.set_cache(self.__cancel_real_order_index_cache, code, val) return val - @classmethod - def clear(cls, code=None): + def clear(self, code=None): if code: - cls.__del_real_order_index(code) + self.__del_real_order_index(code) else: - keys = RedisUtils.keys(cls.__get_redis(), "d_cancel_real_order_index-*") + keys = RedisUtils.keys(self.__get_redis(), "d_cancel_real_order_index-*") if keys: for k in keys: code = k.replace("d_cancel_real_order_index-", "") - cls.__del_real_order_index(code) + self.__del_real_order_index(code) # 璁剧疆鎴愪氦浣� - @classmethod - def set_trade_progress(cls, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value, + def set_trade_progress(self, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value, limit_up_price): # 绂讳笅鍗曟墽琛屼綅2鍒嗛挓鍐呯殑鏈夋晥 if tool.trade_time_sub(total_data[-1]['val']['time'], total_data[buy_exec_index]['val']['time']) > constant.D_CANCEL_EXPIRE_TIME: return False, "瓒呰繃D鎾ゅ畧鎶ゆ椂闂�" - real_order_index = cls.__get_real_order_index_cache(code) + real_order_index = self.__get_real_order_index_cache(code) if not real_order_index: return False, "灏氭湭鑾峰彇鍒扮湡瀹炰笅鍗曚綅缃�" @@ -837,18 +888,15 @@ return False, "" # 璁剧疆鐪熷疄鐨勪笅鍗曚綅缃� - @classmethod - def set_real_order_index(cls, code, index): - cls.__set_real_order_index(code, index) + def set_real_order_index(self, code, index): + self.__set_real_order_index(code, index) logger_l2_d_cancel.info(f"{code}涓嬪崟浣嶇疆璁剧疆锛歿index}") - @classmethod - def place_order_success(cls, code): - cls.clear(code) + def place_order_success(self, code): + self.clear(code) - @classmethod - def cancel_success(cls, code): - cls.clear(code) + def cancel_success(self, code): + self.clear(code) # ---------------------------------L鎾�------------------------------- @@ -859,64 +907,78 @@ __last_trade_progress_dict = {} __cancel_watch_index_cache = {} + __instance = None + + def __new__(cls, *args, **kwargs): + if not cls.__instance: + cls.__instance = super(LCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) + cls.__load_datas() + return cls.__instance + + @classmethod + def __load_datas(cls): + __redis = cls.__get_redis() + try: + keys = RedisUtils.keys(__redis, "l_cancel_watch_index-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_cache, code, int(val)) + finally: + RedisUtils.realse(__redis) + @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() - @classmethod - def __add_watch_indexes(cls, code, indexes): + def __add_watch_indexes(self, code, indexes): if not indexes: return - if code not in cls.__cancel_watch_index_cache: - cls.__cancel_watch_index_cache[code] = set() + if code not in self.__cancel_watch_index_cache: + self.__cancel_watch_index_cache[code] = set() for index in indexes: - cls.__cancel_watch_index_cache[code].add(index) - RedisUtils.sadd_async(cls.__db, f"l_cancel_watch_index-{code}", index) - RedisUtils.expire_async(cls.__db, f"l_cancel_watch_index-{code}", tool.get_expire()) + self.__cancel_watch_index_cache[code].add(index) + RedisUtils.sadd_async(self.__db, f"l_cancel_watch_index-{code}", index) + RedisUtils.expire_async(self.__db, f"l_cancel_watch_index-{code}", tool.get_expire()) - @classmethod - def __del_watch_indexes(cls, code, indexes): + def __del_watch_indexes(self, code, indexes): if not indexes: return for index in indexes: - if code in cls.__cancel_watch_index_cache: - cls.__cancel_watch_index_cache[code].discard(index) - RedisUtils.srem_async(cls.__db, f"l_cancel_watch_index-{code}", index) + if code in self.__cancel_watch_index_cache: + self.__cancel_watch_index_cache[code].discard(index) + RedisUtils.srem_async(self.__db, f"l_cancel_watch_index-{code}", index) - @classmethod - def __get_watch_indexes(cls, code): - return RedisUtils.smembers(cls.__get_redis(), f"l_cancel_watch_index-{code}") + def __get_watch_indexes(self, code): + return RedisUtils.smembers(self.__get_redis(), f"l_cancel_watch_index-{code}") - @classmethod - def __get_watch_indexes_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_index_cache, code) + def __get_watch_indexes_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_index_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_watch_indexes(code) - cls.__cancel_watch_index_cache[code] = val - CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_cache, code, val) + val = self.__get_watch_indexes(code) + self.__cancel_watch_index_cache[code] = val + CodeDataCacheUtil.set_cache(self.__cancel_watch_index_cache, code, val) return val - @classmethod - def del_watch_index(cls, code): - CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code) - RedisUtils.delete_async(cls.__db, f"l_cancel_watch_index-{code}") + def del_watch_index(self, code): + CodeDataCacheUtil.clear_cache(self.__cancel_watch_index_cache, code) + RedisUtils.delete_async(self.__db, f"l_cancel_watch_index-{code}") - @classmethod - def clear(cls, code=None): + def clear(self, code=None): if code: - cls.del_watch_index(code) + self.del_watch_index(code) else: - keys = RedisUtils.keys(cls.__get_redis(), f"l_cancel_watch_index-*") + keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index-*") for k in keys: code = k.replace("l_cancel_watch_index-", "") - cls.del_watch_index(code) + self.del_watch_index(code) # 璁剧疆鎴愪氦浣嶇疆,鎴愪氦浣嶇疆鍙樺寲涔嬪悗鐩稿簲鐨勭洃鍚暟鎹篃浼氬彂鐢熷彉鍖� - @classmethod - def set_trade_progress(cls, code, index, total_data): - old_watch_indexes = cls.__get_watch_indexes_cache(code) - if cls.__last_trade_progress_dict.get(code) == index and len( + + def set_trade_progress(self, code, index, total_data): + old_watch_indexes = self.__get_watch_indexes_cache(code) + if self.__last_trade_progress_dict.get(code) == index and len( old_watch_indexes) >= constant.L_CANCEL_MAX_WATCH_COUNT: # 鎴愪氦杩涘害灏氭湭鍙戠敓鍙樺寲涓斿凡缁忕洃鍚埌浜嗚冻澶熺殑鏁版嵁 return @@ -947,18 +1009,17 @@ # 鏁版嵁缁存姢 add_indexes = watch_indexes - old_watch_indexes delete_indexes = old_watch_indexes - watch_indexes - cls.__add_watch_indexes(code, add_indexes) - cls.__del_watch_indexes(code, delete_indexes) + self.__add_watch_indexes(code, add_indexes) + self.__del_watch_indexes(code, delete_indexes) - @classmethod - def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map, + def need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map, is_first_code): time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) # 瀹堟姢S鎾や互澶栫殑鏁版嵁 if time_space <= constant.S_CANCEL_EXPIRE_TIME or int(tool.get_now_time_str().replace(":", "")) > int("145000"): return False, None - watch_indexes = cls.__get_watch_indexes_cache(code) + watch_indexes = self.__get_watch_indexes_cache(code) if not watch_indexes: return False, None watch_indexes = set([int(i) for i in watch_indexes]) @@ -994,13 +1055,11 @@ return False, None - @classmethod - def place_order_success(cls, code): - cls.clear(code) + def place_order_success(self, code): + self.clear(code) - @classmethod - def cancel_success(cls, code): - cls.clear(code) + def cancel_success(self, code): + self.clear(code) # --------------------------------灏佸崟棰濆彉鍖栨挙------------------------ @@ -1032,9 +1091,7 @@ else: old_num += num old_to = to_index - key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", "")) - RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((old_num, old_from, old_to))) def __get_l2_second_money_record(self, code, time): -- Gitblit v1.8.0