From c20c3c10635ce78db4a86ce9c0bb1d02e90f525d Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 08 八月 2023 17:40:42 +0800 Subject: [PATCH] 单例+缓存优化 --- trade/trade_result_manager.py | 26 +- l2/huaxin/huaxin_delegate_postion_manager.py | 7 constant.py | 2 trade/huaxin/trade_server.py | 19 test/l2_trade_test.py | 32 +- server.py | 21 third_data/data_server.py | 2 third_data/block_info.py | 13 inited_data.py | 4 log_module/log.py | 6 l2/cancel_buy_strategy.py | 557 +++++++++++++++++++++++------------------- l2/l2_data_manager_new.py | 68 ++-- output/code_info_output.py | 2 13 files changed, 420 insertions(+), 339 deletions(-) diff --git a/constant.py b/constant.py index ec29210..c3ca5ea 100644 --- a/constant.py +++ b/constant.py @@ -109,7 +109,7 @@ D_CANCEL_RATE = 0.5 # L鎾� -L_CANCEL_MAX_WATCH_COUNT = 5 +L_CANCEL_MAX_WATCH_COUNT = 15 # 鎾ゅ崟姣斾緥 L_CANCEL_RATE = 0.6 # 鏈�灏忛噾棰� diff --git a/inited_data.py b/inited_data.py index b38f6b8..ff0753b 100644 --- a/inited_data.py +++ b/inited_data.py @@ -88,9 +88,9 @@ # 娓呯┖鏆傚仠浜ゆ槗浠g爜 gpcode_manager.PauseBuyCodesManager().clear() # 娓呴櫎L鎾ゆ暟鎹� - LCancelBigNumComputer.clear() + LCancelBigNumComputer().clear() # 娓呴櫎D鎾ゆ暟鎹� - DCancelBigNumComputer.clear() + DCancelBigNumComputer().clear() # 姣忔棩鍒濆鍖� diff --git a/l2/cancel_buy_strategy.py b/l2/cancel_buy_strategy.py index a349020..663e63d 100644 --- a/l2/cancel_buy_strategy.py +++ b/l2/cancel_buy_strategy.py @@ -32,64 +32,78 @@ __sCancelParamsManager = l2_trade_factor.SCancelParamsManager __s_big_num_cancel_compute_data_cache = {} + __instance = None + + def __new__(cls, *args, **kwargs): + if not cls.__instance: + cls.__instance = super(SecondCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) + cls.__load_datas() + return cls.__instance + @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() - # 淇濆瓨缁撴潫浣嶇疆 @classmethod - def __save_compute_data(cls, code, process_index, buy_num, cancel_num): - CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code, + def __load_datas(cls): + __redis = cls.__get_redis() + try: + keys = RedisUtils.keys(__redis, "s_big_num_cancel_compute_data-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + val = json.loads(val) + tool.CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code, val) + finally: + RedisUtils.realse(__redis) + + # 淇濆瓨缁撴潫浣嶇疆 + def __save_compute_data(self, code, process_index, buy_num, cancel_num): + CodeDataCacheUtil.set_cache(self.__s_big_num_cancel_compute_data_cache, code, (process_index, buy_num, cancel_num)) key = "s_big_num_cancel_compute_data-{}".format(code) - RedisUtils.setex_async(cls.__db, key, tool.get_expire(), + RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((process_index, buy_num, cancel_num))) - @classmethod - def __get_compute_data(cls, code): + def __get_compute_data(self, code): key = "s_big_num_cancel_compute_data-{}".format(code) - val = RedisUtils.get(cls.__get_redis(), key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return -1, 0, 0 val = json.loads(val) return val[0], val[1], val[2] - @classmethod - def __get_compute_data_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__s_big_num_cancel_compute_data_cache, code) + def __get_compute_data_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__s_big_num_cancel_compute_data_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_compute_data(code) - CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code, val) + val = self.__get_compute_data(code) + CodeDataCacheUtil.set_cache(self.__s_big_num_cancel_compute_data_cache, code, val) return val - @classmethod - def __clear_data(cls, code): - CodeDataCacheUtil.clear_cache(cls.__s_big_num_cancel_compute_data_cache, code) + def __clear_data(self, code): + CodeDataCacheUtil.clear_cache(self.__s_big_num_cancel_compute_data_cache, code) ks = ["s_big_num_cancel_compute_data-{}".format(code)] for key in ks: - RedisUtils.delete_async(cls.__db, key) + RedisUtils.delete_async(self.__db, key) - @classmethod - def clear_data(cls): + def clear_data(self): ks = ["s_big_num_cancel_compute_data-*"] for key in ks: - keys = RedisUtils.keys(cls.__get_redis(), key) + keys = RedisUtils.keys(self.__get_redis(), key) for k in keys: code = k.replace("s_big_num_cancel_compute_data-", "") - cls.__clear_data(code) + self.__clear_data(code) # 璁$畻鍑�澶у崟 - @classmethod - def __compute_left_big_num(cls, code, buy_single_index, start_index, end_index, total_data, volume_rate_index): + def __compute_left_big_num(self, code, buy_single_index, start_index, end_index, total_data, volume_rate_index): # 鐐圭伀澶у崟鏁伴噺 - fire_count = cls.__sCancelParamsManager.get_max_exclude_count(volume_rate_index) - return cls.compute_left_big_num(code, buy_single_index, start_index, end_index, total_data, fire_count, - constant.S_CANCEL_MIN_MONEY) + fire_count = self.__sCancelParamsManager.get_max_exclude_count(volume_rate_index) + return self.compute_left_big_num(code, buy_single_index, start_index, end_index, total_data, fire_count, + constant.S_CANCEL_MIN_MONEY) # 璁$畻鏈挙鐨勬�绘墜鏁� - @classmethod - def compute_left_big_num(cls, code, buy_single_index, start_index, end_index, total_data, fire_count, min_money_w): + def compute_left_big_num(self, code, buy_single_index, start_index, end_index, total_data, fire_count, min_money_w): # 鑾峰彇澶у崟鐨勬渶灏忔墜鏁� left_big_num = 0 for i in range(start_index, end_index + 1): @@ -128,8 +142,7 @@ left_big_num -= val["num"] * data["re"] return left_big_num - @classmethod - def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, is_first_code, + def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, is_first_code, buy_volume_rate_index, volume_rate_index, need_cancel=True): @@ -150,7 +163,7 @@ break # 鑾峰彇澶勭悊杩涘害 - process_index_old, buy_num, cancel_num = cls.__get_compute_data_cache(code) + process_index_old, buy_num, cancel_num = self.__get_compute_data_cache(code) # 濡傛灉start_index涓巄uy_single_index鐩稿悓锛屽嵆鏄笅鍗曞悗鐨勭涓�娆¤绠� # 闇�瑕佹煡璇拱鍏ヤ俊鍙蜂箣鍓嶇殑鍚�1s鏄惁鏈夋定鍋滄挙鐨勬暟鎹� @@ -160,8 +173,8 @@ if buy_single_index == start_index: # 绗�1娆¤绠楅渶瑕佽绠椾拱鍏ヤ俊鍙�-鎵ц浣嶇殑鍑�鍊� - left_big_num = cls.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index, - total_data, place_order_count) + left_big_num = self.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index, + total_data, place_order_count) buy_num += left_big_num # 璁剧疆涔板叆淇″彿-涔板叆鎵ц浣嶇殑鏁版嵁涓嶉渶瑕佸鐞� start_index = end_index + 1 @@ -180,8 +193,8 @@ # if buy_index is not None and a_start_index <= buy_index <= a_end_index: # # 鍦ㄤ拱鍏ヤ俊鍙蜂箣鍚� # cls.__save_cancel_data(code, i) - range_seconds = cls.__sCancelParamsManager.get_buy_time_range(buy_volume_rate_index) - cancel_rate_threshold = cls.__sCancelParamsManager.get_cancel_rate(volume_rate_index) + range_seconds = self.__sCancelParamsManager.get_buy_time_range(buy_volume_rate_index) + cancel_rate_threshold = self.__sCancelParamsManager.get_cancel_rate(volume_rate_index) try: for i in range(start_index, end_index + 1): data = total_data[i] @@ -240,13 +253,12 @@ buy_num, round(cancel_num / max(buy_num, 1), 2), cancel_rate_threshold, range_seconds) # 淇濆瓨澶勭悊杩涘害涓庢暟鎹� - cls.__save_compute_data(code, process_index, buy_num, cancel_num) + self.__save_compute_data(code, process_index, buy_num, cancel_num) return False, None # 涓嬪崟鎴愬姛 - @classmethod - def cancel_success(cls, code): - cls.__clear_data(code) + def cancel_success(self, code): + self.__clear_data(code) # --------------------------------H鎾�------------------------------- @@ -263,172 +275,204 @@ __cancel_traded_progress_cache = {} __cancel_compute_data_cache = {} + __instance = None + + def __new__(cls, *args, **kwargs): + if not cls.__instance: + cls.__instance = super(HourCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) + cls.__load_datas() + return cls.__instance + @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() - # 淇濆瓨鎴愪氦浣嶇疆鍒版墽琛屼綅缃殑鎻芥嫭鑼冨洿鏁版嵁 @classmethod - def __save_watch_index_set(cls, code, datas, process_index, finish): - CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish)) + def __load_datas(cls): + __redis = cls.__get_redis() + try: + keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + val = json.loads(val) + CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val) + + keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs_exec-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + val = json.loads(val) + CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code, val) + + keys = RedisUtils.keys(__redis, "h_cancel_watch_canceled_indexs-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.smembers(__redis, k) + CodeDataCacheUtil.set_cache(cls.__cancel_watch_canceled_indexs_cache, code, val) + + keys = RedisUtils.keys(__redis, "h_cancel_traded_progress-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + val = json.loads(val) + CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code, val) + + keys = RedisUtils.keys(__redis, "h_cancel_compute_data-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + val = json.loads(val) + CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code, val) + + finally: + RedisUtils.realse(__redis) + + # 淇濆瓨鎴愪氦浣嶇疆鍒版墽琛屼綅缃殑鎻芥嫭鑼冨洿鏁版嵁 + + def __save_watch_index_set(self, code, datas, process_index, finish): + CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish)) key = f"h_cancel_watch_indexs-{code}" - RedisUtils.setex_async(cls.__db, key, tool.get_expire(), + RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((list(datas), process_index, finish))) # 淇濆瓨鎴愪氦杩涘害 - @classmethod - def __get_watch_index_set(cls, code): + + def __get_watch_index_set(self, code): key = f"h_cancel_watch_indexs-{code}" - val = RedisUtils.get(cls.__get_redis(), key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return None, -1, False val = json.loads(val) return val[0], val[1], val[2] - @classmethod - def __get_watch_index_set_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_indexs_cache, code) + def __get_watch_index_set_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_watch_index_set(code) - CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val) + val = self.__get_watch_index_set(code) + CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, val) return val # 淇濆瓨鎵ц浣嶇疆鍚庨潰鐨勫畧鎶ゆ暟鎹� - @classmethod - def __save_watch_index_set_after_exec(cls, code, datas, process_index, total_count, big_num_count, finished): - CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code, + + def __save_watch_index_set_after_exec(self, code, datas, process_index, total_count, big_num_count, finished): + CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_exec_cache, code, (list(datas), process_index, total_count, big_num_count, finished)) key = f"h_cancel_watch_indexs_exec-{code}" - RedisUtils.setex_async(cls.__db, key, tool.get_expire(), + RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((list(datas), process_index, total_count, big_num_count, finished))) # 淇濆瓨鎴愪氦杩涘害 - @classmethod - def __get_watch_index_set_after_exec(cls, code): + def __get_watch_index_set_after_exec(self, code): key = f"h_cancel_watch_indexs_exec-{code}" - val = RedisUtils.get(cls.__get_redis(), key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return [], -1, 0, 0, False val = json.loads(val) return val[0], val[1], val[2], val[3], val[4] - @classmethod - def __get_watch_index_set_after_exec_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_indexs_exec_cache, code) + def __get_watch_index_set_after_exec_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_exec_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_watch_index_set_after_exec(code) - CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code, val) + val = self.__get_watch_index_set_after_exec(code) + CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_exec_cache, code, val) return val # 淇濆瓨宸茬粡鎾ゅ崟鐨勭洃鍚綅缃� - @classmethod - def __add_watch_canceled_indexes(cls, code, indexes): - if code not in cls.__cancel_watch_canceled_indexs_cache: - cls.__cancel_watch_canceled_indexs_cache[code] = set() + def __add_watch_canceled_indexes(self, code, indexes): + if code not in self.__cancel_watch_canceled_indexs_cache: + self.__cancel_watch_canceled_indexs_cache[code] = set() key = f"h_cancel_watch_canceled_indexs-{code}" for index in indexes: - cls.__cancel_watch_canceled_indexs_cache[code].add(index) - RedisUtils.sadd_async(cls.__db, key, index) - RedisUtils.expire_async(cls.__db, key, tool.get_expire()) + self.__cancel_watch_canceled_indexs_cache[code].add(index) + RedisUtils.sadd_async(self.__db, key, index) + RedisUtils.expire_async(self.__db, key, tool.get_expire()) - @classmethod - def __get_watch_canceled_index(cls, code): + def __get_watch_canceled_index(self, code): key = f"h_cancel_watch_canceled_indexs-{code}" - return RedisUtils.smembers(cls.__get_redis(), key) + return RedisUtils.smembers(self.__get_redis(), key) - @classmethod - def __get_watch_canceled_index_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_canceled_indexs_cache, code) + def __get_watch_canceled_index_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_canceled_indexs_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_watch_canceled_index(code) - CodeDataCacheUtil.set_cache(cls.__cancel_watch_canceled_indexs_cache, code, val) + val = self.__get_watch_canceled_index(code) + CodeDataCacheUtil.set_cache(self.__cancel_watch_canceled_indexs_cache, code, val) return val - @classmethod - def __del_watch_canceled_index(cls, code): - CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code) + def __del_watch_canceled_index(self, code): + CodeDataCacheUtil.clear_cache(self.__cancel_watch_canceled_indexs_cache, code) key = f"h_cancel_watch_canceled_indexs-{code}" - RedisUtils.delete(cls.__get_redis(), key) + RedisUtils.delete(self.__get_redis(), key) # 淇濆瓨鎴愪氦杩涘害 - @classmethod - def __save_traded_progress(cls, code, origin_process_index, latest_process_index): - CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code, + def __save_traded_progress(self, code, origin_process_index, latest_process_index): + CodeDataCacheUtil.set_cache(self.__cancel_traded_progress_cache, code, (origin_process_index, latest_process_index)) key = "h_cancel_traded_progress-{}".format(code) - RedisUtils.setex_async(cls.__db, key, tool.get_expire(), + RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((origin_process_index, latest_process_index))) - @classmethod - def __get_traded_progress(cls, code): + def __get_traded_progress(self, code): key = "h_cancel_traded_progress-{}".format(code) - val = RedisUtils.get(cls.__get_redis(), key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return None, None val = json.loads(val) return val[0], val[1] - @classmethod - def __get_traded_progress_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_traded_progress_cache, code) + def __get_traded_progress_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_traded_progress_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_traded_progress(code) - CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code, val) + val = self.__get_traded_progress(code) + CodeDataCacheUtil.set_cache(self.__cancel_traded_progress_cache, code, val) return val # 淇濆瓨缁撶畻浣嶇疆 - @classmethod - def __save_compute_data(cls, code, process_index, cancel_num): - CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code, + def __save_compute_data(self, code, process_index, cancel_num): + CodeDataCacheUtil.set_cache(self.__cancel_compute_data_cache, code, (process_index, cancel_num)) key = "h_cancel_compute_data-{}".format(code) - RedisUtils.setex_async(cls.__db, key, tool.get_expire(), json.dumps((process_index, cancel_num))) + RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((process_index, cancel_num))) - @classmethod - def __get_compute_data(cls, code): + def __get_compute_data(self, code): key = "h_cancel_compute_data-{}".format(code) - val = RedisUtils.get(cls.__get_redis(), key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return -1, 0 val = json.loads(val) return val[0], val[1] - @classmethod - def __get_compute_data_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_compute_data_cache, code) + def __get_compute_data_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_compute_data_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_compute_data(code) - CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code, val) + val = self.__get_compute_data(code) + CodeDataCacheUtil.set_cache(self.__cancel_compute_data_cache, code, val) return val - @classmethod - def __del_compute_data(cls, code): - CodeDataCacheUtil.clear_cache(cls.__cancel_compute_data_cache, code) + def __del_compute_data(self, code): + CodeDataCacheUtil.clear_cache(self.__cancel_compute_data_cache, code) key = "h_cancel_compute_data-{}".format(code) - RedisUtils.delete(cls.__get_redis(), key) + RedisUtils.delete(self.__get_redis(), key) - @classmethod - def __clear_data(cls, code): - - CodeDataCacheUtil.clear_cache(cls.__cancel_watch_indexs_cache, code) - CodeDataCacheUtil.clear_cache(cls.__cancel_traded_progress_cache, code) - CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code) - CodeDataCacheUtil.clear_cache(cls.__cancel_watch_indexs_exec_cache, code) - CodeDataCacheUtil.clear_cache(cls.__cancel_compute_data_cache, code) + def __clear_data(self, code): + CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_cache, code) + CodeDataCacheUtil.clear_cache(self.__cancel_traded_progress_cache, code) + CodeDataCacheUtil.clear_cache(self.__cancel_watch_canceled_indexs_cache, code) + CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_exec_cache, code) + CodeDataCacheUtil.clear_cache(self.__cancel_compute_data_cache, code) ks = ["h_cancel_compute_data-{}".format(code), f"h_cancel_watch_indexs_exec-{code}", f"h_cancel_watch_indexs-{code}", f"h_cancel_traded_progress-{code}", f"h_cancel_watch_canceled_indexs-{code}"] for key in ks: - RedisUtils.delete(cls.__get_redis(), key) + RedisUtils.delete(self.__get_redis(), key) - @classmethod - def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, + def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map, buy_volume_index, volume_index, is_first_code): @@ -436,14 +480,14 @@ total_data[buy_exec_index]["val"]["time"]) if time_space >= constant.S_CANCEL_EXPIRE_TIME - 1: # 寮�濮嬭绠楅渶瑕佺洃鎺х殑鍗� - cls.__compute_watch_indexs_after_single(code, buy_single_index, buy_exec_index, total_data, - local_today_num_operate_map, buy_volume_index) + self.__compute_watch_indexs_after_single(code, buy_single_index, buy_exec_index, total_data, + local_today_num_operate_map, buy_volume_index) # 瀹堟姢30s浠ュ鐨勬暟鎹� if time_space <= constant.S_CANCEL_EXPIRE_TIME: return False, None # 鑾峰彇鎴愪氦杩涘害 - origin_progress_index, latest_progress_index = cls.__get_traded_progress_cache(code) + origin_progress_index, latest_progress_index = self.__get_traded_progress_cache(code) if latest_progress_index is None: latest_progress_index = -1 # 鐩戝惉鐨勬暟鎹� @@ -454,7 +498,7 @@ total_nums = 1 if origin_progress_index is not None: # 鑾峰彇鎴愪氦浣嶇疆鍒版墽琛屼綅缃殑鐩戞帶鏁版嵁 - watch_indexs = cls.__get_watch_index_set_cache(code)[0] + watch_indexs = self.__get_watch_index_set_cache(code)[0] # 鐩戝惉鐨勬�绘暟 for indexs in watch_indexs: index = indexs[0] @@ -464,7 +508,7 @@ watch_indexs_dict[index] = indexs total_nums += total_data[index]["val"]["num"] * indexs[2] # 鑾峰彇鍒版墽琛屼綅鍚庣殑鐩戝惉鏁版嵁 - datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec_cache(code) + datas, process_index, total_count, big_num_count, finished = self.__get_watch_index_set_after_exec_cache(code) if datas: for indexs in datas: index = indexs[0] @@ -473,17 +517,17 @@ watch_indexs_dict[index] = indexs total_nums += total_data[index]["val"]["num"] * indexs[2] - processed_index, cancel_num = cls.__get_compute_data_cache(code) + processed_index, cancel_num = self.__get_compute_data_cache(code) l2_log.cancel_debug(code, "H绾ф槸鍚﹂渶瑕佹挙鍗曪紝鏁版嵁鑼冨洿锛歿}-{} ", start_index, end_index) # 鑾峰彇涓嬪崟娆℃暟 place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) - cancel_rate_threshold = cls.__hCancelParamsManager.get_cancel_rate(volume_index) + cancel_rate_threshold = self.__hCancelParamsManager.get_cancel_rate(volume_index) process_index = start_index # 鏄惁鏈夎娴嬬殑鏁版嵁鎾ゅ崟 has_watch_canceled = False # 鑾峰彇涔嬪墠宸茬粡鎾ゅ崟鐨勬暟鎹� - old_canceld_indexs = cls.__get_watch_canceled_index_cache(code) + old_canceld_indexs = self.__get_watch_canceled_index_cache(code) # 閲嶆柊璁$畻鎾ゅ崟 cancel_num = 0 if old_canceld_indexs: @@ -521,9 +565,9 @@ len(watch_indexs_dict.keys())) l2_log.trade_record(code, "H鎾�", "'index':{} , 'rate':{} ,'target_rate':{}", i, rate__, cancel_rate_threshold) - cls.__add_watch_canceled_indexes(code, temp_watch_canceled_index) + self.__add_watch_canceled_indexes(code, temp_watch_canceled_index) return True, data - cls.__add_watch_canceled_indexes(code, temp_watch_canceled_index) + self.__add_watch_canceled_indexes(code, temp_watch_canceled_index) rate__ = round(cancel_num / total_nums, 4) if rate__ > cancel_rate_threshold: @@ -543,9 +587,9 @@ logger_l2_h_cancel.info( f"code-{code} H绾ф挙鍗曡绠楃粨鏋� 鑼冨洿锛歿start_index}-{end_index} 澶勭悊杩涘害锛歿process_index} 鐩爣姣斾緥锛歿cancel_rate_threshold} 鍙栨秷璁$畻缁撴灉:{cancel_num}/{total_nums}") # H鎾ゅ凡鎾よ鍗� - logger_l2_h_cancel.info(f"code-{code} H鎾ゅ凡鎾よ鍗曪細{cls.__get_watch_canceled_index_cache(code)}") + logger_l2_h_cancel.info(f"code-{code} H鎾ゅ凡鎾よ鍗曪細{self.__get_watch_canceled_index_cache(code)}") # 淇濆瓨澶勭悊杩涘害涓庢暟鎹� - cls.__save_compute_data(code, process_index, cancel_num) + self.__save_compute_data(code, process_index, cancel_num) # 鏈夎娴嬫暟鎹挙鍗� if has_watch_canceled: now_rate = round(cancel_num / total_nums, 4) @@ -557,33 +601,30 @@ return False, None # 涓嬪崟鎴愬姛 - @classmethod - def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map): - cls.__clear_data(code) + def place_order_success(self, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map): + self.__clear_data(code) # 璁剧疆鎴愪氦杩涘害 - @classmethod - def set_trade_progress(cls, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map): - cls.__tradeBuyQueue.set_traded_index(code, index) + def set_trade_progress(self, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map): + self.__tradeBuyQueue.set_traded_index(code, index) # 濡傛灉鑾峰彇鏃堕棿涓庢墽琛屾椂闂村皬浜�29鍒欎笉闇�瑕佸鐞� if buy_exec_index is None or buy_exec_index < 0 or tool.trade_time_sub(data_time, total_data[buy_exec_index]["val"][ "time"]) < constant.S_CANCEL_EXPIRE_TIME - 1: return # 淇濆瓨鎴愪氦杩涘害 - origin_index, latest_index = cls.__get_traded_progress_cache(code) + origin_index, latest_index = self.__get_traded_progress_cache(code) if origin_index is None: - cls.__save_traded_progress(code, index, index) + self.__save_traded_progress(code, index, index) # 璁$畻鎻芥嫭鑼冨洿 - cls.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data, - local_today_num_operate_map) + self.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data, + local_today_num_operate_map) else: - cls.__save_traded_progress(code, origin_index, index) + self.__save_traded_progress(code, origin_index, index) logger_l2_h_cancel.info(f"code-{code} 鎴愪氦杩涘害锛歿index} 鏁版嵁缁撴潫浣嶇疆锛�" + str(total_data[-1]["index"])) # 娑ㄥ仠涔版槸鍚︽挙鍗� - @classmethod - def __get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map, + def __get_limit_up_buy_no_canceled_count(self, code, index, total_data, local_today_num_operate_map, MAX_EXPIRE_CANCEL_TIME=None): data = None try: @@ -619,13 +660,12 @@ # 璁$畻鎺掑悕鍓峃鐨勫ぇ鍗� # 杩囨椂鏁版嵁 - @classmethod - def __compute_top_n_num(cls, code, start_index, total_data, local_today_num_operate_map, count): + def __compute_top_n_num(self, code, start_index, total_data, local_today_num_operate_map, count): # 鎵惧埌杩樻湭鎾ょ殑TOPN澶у崟 watch_set = set() for i in range(start_index, total_data[-1]["index"] + 1): - not_cancel_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, - local_today_num_operate_map) + not_cancel_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data, + local_today_num_operate_map) if not_cancel_count > 0: watch_set.add((i, total_data[i]["val"]["num"], not_cancel_count)) # 閽堟寜鐓ф墜鏁版帓搴� @@ -637,14 +677,13 @@ return watch_set # 浠庢垚浜や綅缃埌鎵ц浣嶇疆 - @classmethod - def __compute_watch_indexs_between_traded_exec(cls, code, progress_index, buy_exec_index, total_data, + def __compute_watch_indexs_between_traded_exec(self, code, progress_index, buy_exec_index, total_data, local_today_num_operate_map): total_count = 0 watch_set = set() big_num_count = 0 for i in range(progress_index, buy_exec_index): - left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map) + left_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map) if left_count > 0: data = total_data[i] val = data["val"] @@ -659,15 +698,14 @@ final_watch_list = list(watch_set) final_watch_list.sort(key=lambda x: x[0]) logger_l2_h_cancel.info(f"code-{code} H鎾ょ洃鎺ф垚浜や綅鍒版墽琛屼綅:{final_watch_list}") - cls.__save_watch_index_set(code, final_watch_list, buy_exec_index, True) + self.__save_watch_index_set(code, final_watch_list, buy_exec_index, True) # 鍒犻櫎鍘熸潵鐨勮绠楁暟鎹� # cls.__del_compute_data(code) # 璁$畻鎵ц浣嶇疆涔嬪悗鐨勯渶瑕佺洃鍚殑鏁版嵁 - @classmethod - def __compute_watch_indexs_after_single(cls, code, buy_single_index, buy_exec_index, total_data, + def __compute_watch_indexs_after_single(self, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map, buy_volume_index): - watch_list, process_index_old, total_count_old, big_num_count_old, finish = cls.__get_watch_index_set_after_exec_cache( + watch_list, process_index_old, total_count_old, big_num_count_old, finish = self.__get_watch_index_set_after_exec_cache( code) if watch_list and finish: # 宸茬粡璁$畻瀹屼簡涓嶉渶瑕佸啀杩涜璁$畻 @@ -683,17 +721,17 @@ big_num_count = big_num_count_old total_count = total_count_old # H鎾ゅ崟 - MIN_H_COUNT = cls.__hCancelParamsManager.get_max_watch_count(buy_volume_index) + MIN_H_COUNT = self.__hCancelParamsManager.get_max_watch_count(buy_volume_index) # 浠庝拱鍏ヤ俊鍙蜂綅3鏉℃暟鎹紑濮嬭绠� for i in range(buy_single_index + 3, total_data[-1]["index"] + 1): if i <= process_index_old: continue process_index = i - left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, - local_today_num_operate_map, - tool.trade_time_add_second( - total_data[buy_exec_index]["val"]["time"], - constant.S_CANCEL_EXPIRE_TIME)) + left_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data, + local_today_num_operate_map, + tool.trade_time_add_second( + total_data[buy_exec_index]["val"]["time"], + constant.S_CANCEL_EXPIRE_TIME)) if left_count > 0: data = total_data[i] val = data["val"] @@ -720,22 +758,21 @@ final_watch_list.sort(key=lambda x: x[0]) logger_l2_h_cancel.info(f"code-{code} H鎾ょ洃鎺ф墽琛屼綅鐩搁偦鍗�:{final_watch_list} 鐩爣璁$畻鏁伴噺锛歿MIN_H_COUNT}") # 淇濆瓨璁$畻鑼冨洿 - cls.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count, - finished) + self.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count, + finished) # 鍒犻櫎鍘熸潵鐨勮绠楁暟鎹� # cls.__del_compute_data(code) # 鑾峰彇H鎾ょ洃鍚殑鏁版嵁绱㈠紩鑼冨洿 # 杩斿洖鐩戝惉鑼冨洿涓庡凡鎾ゅ崟绱㈠紩 - @classmethod - def get_watch_index_dict(cls, code): - origin_progress_index, latest_progress_index = cls.__get_traded_progress_cache(code) + def get_watch_index_dict(self, code): + origin_progress_index, latest_progress_index = self.__get_traded_progress_cache(code) # 鐩戝惉鐨勬暟鎹� watch_indexs_dict = {} total_nums = 0 if origin_progress_index is not None: # 鑾峰彇鎴愪氦浣嶇疆鍒版墽琛屼綅缃殑鐩戞帶鏁版嵁 - watch_indexs = cls.__get_watch_index_set_cache(code)[0] + watch_indexs = self.__get_watch_index_set_cache(code)[0] # 鐩戝惉鐨勬�绘暟 for indexs in watch_indexs: index = indexs[0] @@ -744,12 +781,12 @@ # 鍙绠楁渶杩戠殑鎵ц浣嶄箣鍚庣殑鏁版嵁 watch_indexs_dict[index] = indexs # 鑾峰彇鍒版墽琛屼綅鍚庣殑鐩戝惉鏁版嵁 - datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec_cache(code) + datas, process_index, total_count, big_num_count, finished = self.__get_watch_index_set_after_exec_cache(code) if datas: for indexs in datas: index = indexs[0] watch_indexs_dict[index] = indexs - return watch_indexs_dict, cls.__get_watch_canceled_index_cache(code) + return watch_indexs_dict, self.__get_watch_canceled_index_cache(code) # ---------------------------------D鎾�------------------------------- @@ -760,57 +797,71 @@ __redis_manager = redis_manager.RedisManager(0) __cancel_real_order_index_cache = {} + __instance = None + + def __new__(cls, *args, **kwargs): + if not cls.__instance: + cls.__instance = super(DCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) + cls.__load_datas() + return cls.__instance + + @classmethod + def __load_datas(cls): + __redis = cls.__get_redis() + try: + keys = RedisUtils.keys(__redis, "d_cancel_real_order_index-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, int(val)) + finally: + RedisUtils.realse(__redis) + @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() - @classmethod - def __set_real_order_index(cls, code, index): - CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, index) - RedisUtils.setex_async(cls.__db, f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}") + def __set_real_order_index(self, code, index): + CodeDataCacheUtil.set_cache(self.__cancel_real_order_index_cache, code, index) + RedisUtils.setex_async(self.__db, f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}") - @classmethod - def __del_real_order_index(cls, code): - CodeDataCacheUtil.clear_cache(cls.__cancel_real_order_index_cache, code) - RedisUtils.delete_async(cls.__db, f"d_cancel_real_order_index-{code}") + def __del_real_order_index(self, code): + CodeDataCacheUtil.clear_cache(self.__cancel_real_order_index_cache, code) + RedisUtils.delete_async(self.__db, f"d_cancel_real_order_index-{code}") - @classmethod - def __get_real_order_index(cls, code): - val = RedisUtils.get(cls.__db, f"d_cancel_real_order_index-{code}") + def __get_real_order_index(self, code): + val = RedisUtils.get(self.__db, f"d_cancel_real_order_index-{code}") if val: return int(val) return None - @classmethod - def __get_real_order_index_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_real_order_index_cache, code) + def __get_real_order_index_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_real_order_index_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_real_order_index(code) - CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, val) + val = self.__get_real_order_index(code) + CodeDataCacheUtil.set_cache(self.__cancel_real_order_index_cache, code, val) return val - @classmethod - def clear(cls, code=None): + def clear(self, code=None): if code: - cls.__del_real_order_index(code) + self.__del_real_order_index(code) else: - keys = RedisUtils.keys(cls.__get_redis(), "d_cancel_real_order_index-*") + keys = RedisUtils.keys(self.__get_redis(), "d_cancel_real_order_index-*") if keys: for k in keys: code = k.replace("d_cancel_real_order_index-", "") - cls.__del_real_order_index(code) + self.__del_real_order_index(code) # 璁剧疆鎴愪氦浣� - @classmethod - def set_trade_progress(cls, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value, + def set_trade_progress(self, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value, limit_up_price): # 绂讳笅鍗曟墽琛屼綅2鍒嗛挓鍐呯殑鏈夋晥 if tool.trade_time_sub(total_data[-1]['val']['time'], total_data[buy_exec_index]['val']['time']) > constant.D_CANCEL_EXPIRE_TIME: return False, "瓒呰繃D鎾ゅ畧鎶ゆ椂闂�" - real_order_index = cls.__get_real_order_index_cache(code) + real_order_index = self.__get_real_order_index_cache(code) if not real_order_index: return False, "灏氭湭鑾峰彇鍒扮湡瀹炰笅鍗曚綅缃�" @@ -837,18 +888,15 @@ return False, "" # 璁剧疆鐪熷疄鐨勪笅鍗曚綅缃� - @classmethod - def set_real_order_index(cls, code, index): - cls.__set_real_order_index(code, index) + def set_real_order_index(self, code, index): + self.__set_real_order_index(code, index) logger_l2_d_cancel.info(f"{code}涓嬪崟浣嶇疆璁剧疆锛歿index}") - @classmethod - def place_order_success(cls, code): - cls.clear(code) + def place_order_success(self, code): + self.clear(code) - @classmethod - def cancel_success(cls, code): - cls.clear(code) + def cancel_success(self, code): + self.clear(code) # ---------------------------------L鎾�------------------------------- @@ -859,64 +907,78 @@ __last_trade_progress_dict = {} __cancel_watch_index_cache = {} + __instance = None + + def __new__(cls, *args, **kwargs): + if not cls.__instance: + cls.__instance = super(LCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) + cls.__load_datas() + return cls.__instance + + @classmethod + def __load_datas(cls): + __redis = cls.__get_redis() + try: + keys = RedisUtils.keys(__redis, "l_cancel_watch_index-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_cache, code, int(val)) + finally: + RedisUtils.realse(__redis) + @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() - @classmethod - def __add_watch_indexes(cls, code, indexes): + def __add_watch_indexes(self, code, indexes): if not indexes: return - if code not in cls.__cancel_watch_index_cache: - cls.__cancel_watch_index_cache[code] = set() + if code not in self.__cancel_watch_index_cache: + self.__cancel_watch_index_cache[code] = set() for index in indexes: - cls.__cancel_watch_index_cache[code].add(index) - RedisUtils.sadd_async(cls.__db, f"l_cancel_watch_index-{code}", index) - RedisUtils.expire_async(cls.__db, f"l_cancel_watch_index-{code}", tool.get_expire()) + self.__cancel_watch_index_cache[code].add(index) + RedisUtils.sadd_async(self.__db, f"l_cancel_watch_index-{code}", index) + RedisUtils.expire_async(self.__db, f"l_cancel_watch_index-{code}", tool.get_expire()) - @classmethod - def __del_watch_indexes(cls, code, indexes): + def __del_watch_indexes(self, code, indexes): if not indexes: return for index in indexes: - if code in cls.__cancel_watch_index_cache: - cls.__cancel_watch_index_cache[code].discard(index) - RedisUtils.srem_async(cls.__db, f"l_cancel_watch_index-{code}", index) + if code in self.__cancel_watch_index_cache: + self.__cancel_watch_index_cache[code].discard(index) + RedisUtils.srem_async(self.__db, f"l_cancel_watch_index-{code}", index) - @classmethod - def __get_watch_indexes(cls, code): - return RedisUtils.smembers(cls.__get_redis(), f"l_cancel_watch_index-{code}") + def __get_watch_indexes(self, code): + return RedisUtils.smembers(self.__get_redis(), f"l_cancel_watch_index-{code}") - @classmethod - def __get_watch_indexes_cache(cls, code): - cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_index_cache, code) + def __get_watch_indexes_cache(self, code): + cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_index_cache, code) if cache_result[0]: return cache_result[1] - val = cls.__get_watch_indexes(code) - cls.__cancel_watch_index_cache[code] = val - CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_cache, code, val) + val = self.__get_watch_indexes(code) + self.__cancel_watch_index_cache[code] = val + CodeDataCacheUtil.set_cache(self.__cancel_watch_index_cache, code, val) return val - @classmethod - def del_watch_index(cls, code): - CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code) - RedisUtils.delete_async(cls.__db, f"l_cancel_watch_index-{code}") + def del_watch_index(self, code): + CodeDataCacheUtil.clear_cache(self.__cancel_watch_index_cache, code) + RedisUtils.delete_async(self.__db, f"l_cancel_watch_index-{code}") - @classmethod - def clear(cls, code=None): + def clear(self, code=None): if code: - cls.del_watch_index(code) + self.del_watch_index(code) else: - keys = RedisUtils.keys(cls.__get_redis(), f"l_cancel_watch_index-*") + keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index-*") for k in keys: code = k.replace("l_cancel_watch_index-", "") - cls.del_watch_index(code) + self.del_watch_index(code) # 璁剧疆鎴愪氦浣嶇疆,鎴愪氦浣嶇疆鍙樺寲涔嬪悗鐩稿簲鐨勭洃鍚暟鎹篃浼氬彂鐢熷彉鍖� - @classmethod - def set_trade_progress(cls, code, index, total_data): - old_watch_indexes = cls.__get_watch_indexes_cache(code) - if cls.__last_trade_progress_dict.get(code) == index and len( + + def set_trade_progress(self, code, index, total_data): + old_watch_indexes = self.__get_watch_indexes_cache(code) + if self.__last_trade_progress_dict.get(code) == index and len( old_watch_indexes) >= constant.L_CANCEL_MAX_WATCH_COUNT: # 鎴愪氦杩涘害灏氭湭鍙戠敓鍙樺寲涓斿凡缁忕洃鍚埌浜嗚冻澶熺殑鏁版嵁 return @@ -947,18 +1009,17 @@ # 鏁版嵁缁存姢 add_indexes = watch_indexes - old_watch_indexes delete_indexes = old_watch_indexes - watch_indexes - cls.__add_watch_indexes(code, add_indexes) - cls.__del_watch_indexes(code, delete_indexes) + self.__add_watch_indexes(code, add_indexes) + self.__del_watch_indexes(code, delete_indexes) - @classmethod - def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map, + def need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map, is_first_code): time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) # 瀹堟姢S鎾や互澶栫殑鏁版嵁 if time_space <= constant.S_CANCEL_EXPIRE_TIME or int(tool.get_now_time_str().replace(":", "")) > int("145000"): return False, None - watch_indexes = cls.__get_watch_indexes_cache(code) + watch_indexes = self.__get_watch_indexes_cache(code) if not watch_indexes: return False, None watch_indexes = set([int(i) for i in watch_indexes]) @@ -994,13 +1055,11 @@ return False, None - @classmethod - def place_order_success(cls, code): - cls.clear(code) + def place_order_success(self, code): + self.clear(code) - @classmethod - def cancel_success(cls, code): - cls.clear(code) + def cancel_success(self, code): + self.clear(code) # --------------------------------灏佸崟棰濆彉鍖栨挙------------------------ @@ -1032,9 +1091,7 @@ else: old_num += num old_to = to_index - key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", "")) - RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((old_num, old_from, old_to))) def __get_l2_second_money_record(self, code, time): diff --git a/l2/huaxin/huaxin_delegate_postion_manager.py b/l2/huaxin/huaxin_delegate_postion_manager.py index e45245f..6a318f3 100644 --- a/l2/huaxin/huaxin_delegate_postion_manager.py +++ b/l2/huaxin/huaxin_delegate_postion_manager.py @@ -3,20 +3,24 @@ """ import time -from log_module.log import hx_logger_trade_debug +from log_module.log import hx_logger_trade_debug, logger_real_place_order_position _place_order_info_dict = {} # 涓嬪崟 def place_order(code, price, volume, exec_index): + logger_real_place_order_position.info("涓嬪崟锛歝ode-{} price-{} volume-{} exec-index-{}", code, price, volume, + exec_index) _place_order_info_dict[code] = (price, volume, exec_index, time.time()) # 鑾峰彇涓嬪崟淇℃伅 def get_order_info(code): info = _place_order_info_dict.get(code) + logger_real_place_order_position.info("get_order_info锛歞ata-{}", info) if info and time.time() - info[3] > 3: + logger_real_place_order_position.info("get_order_info 闂撮殧3s浠ヤ笂锛歝ode-{}", code) # 闂撮殧3s浠ヤ笂灏辨棤鏁堜簡 info = None _place_order_info_dict.pop(code) @@ -43,5 +47,6 @@ continue # 鑾峰彇鍒颁簡涓嬪崟浣嶇疆 hx_logger_trade_debug.info(f"鐪熷疄涓嬪崟浣嶇疆锛歿code}-{d['index']}") + logger_real_place_order_position.info(f"鐪熷疄涓嬪崟浣嶇疆锛歿code}-{d['index']}") return d["index"] return None diff --git a/l2/l2_data_manager_new.py b/l2/l2_data_manager_new.py index 9889e06..fcf9098 100644 --- a/l2/l2_data_manager_new.py +++ b/l2/l2_data_manager_new.py @@ -214,6 +214,7 @@ __l2PlaceOrderParamsManagerDict = {} __last_buy_single_dict = {} __TradeBuyQueue = transaction_progress.TradeBuyQueue() + __latest_process_unique_keys = {} # 鑾峰彇浠g爜璇勫垎 @classmethod @@ -290,7 +291,7 @@ place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, datas) if place_order_index: logger_l2_process.info("code:{} 鑾峰彇鍒颁笅鍗曠湡瀹炰綅缃細{}", code, place_order_index) - DCancelBigNumComputer.set_real_order_index(code, place_order_index) + DCancelBigNumComputer().set_real_order_index(code, place_order_index) __start_time = round(t.time() * 1000) if len(datas) > 0: cls.process_add_datas(code, datas, 0, __start_time) @@ -431,6 +432,13 @@ # 澶勭悊宸叉寕鍗� @classmethod def __process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True): + # 澧炲姞鎺ㄥ嚭鏈哄埗 + unique_key = f"{start_index}-{end_index}" + if cls.__latest_process_unique_keys.get(code) == unique_key: + logger_l2_error.error(f"閲嶅澶勭悊鏁版嵁锛歝ode-{code} start_index-{start_index} end_index-{end_index}") + return + cls.__latest_process_unique_keys[code] = unique_key + # 璁$畻瀹夊叏绗旀暟 @dask.delayed def compute_safe_count(): @@ -475,13 +483,13 @@ _start_time = round(t.time() * 1000) # S鎾ゅ崟璁$畻锛岀湅绉掔骇澶у崟鎾ゅ崟 try: - b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index, - buy_exec_index, start_index, - end_index, total_data, - code_volumn_manager.get_volume_rate_index( - buy_volume_rate), - cls.volume_rate_info[code][1], - is_first_code) + b_need_cancel, b_cancel_data = SecondCancelBigNumComputer().need_cancel(code, buy_single_index, + buy_exec_index, start_index, + end_index, total_data, + code_volumn_manager.get_volume_rate_index( + buy_volume_rate), + cls.volume_rate_info[code][1], + is_first_code) if b_need_cancel: return b_cancel_data, "S澶у崟鎾ら攢姣斾緥瑙﹀彂闃堝��" except Exception as e: @@ -496,7 +504,7 @@ def h_cancel(): _start_time = round(t.time() * 1000) try: - b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_single_index, + b_need_cancel, b_cancel_data = HourCancelBigNumComputer().need_cancel(code, buy_single_index, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map.get( @@ -518,7 +526,7 @@ def l_cancel(): _start_time = round(t.time() * 1000) try: - b_need_cancel, b_cancel_data = LCancelBigNumComputer.need_cancel(code, + b_need_cancel, b_cancel_data = LCancelBigNumComputer().need_cancel(code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map.get( @@ -840,8 +848,8 @@ trade_price = current_price_process_manager.get_trade_price(code) if trade_price is None: return False, True, f"灏氭湭鑾峰彇鍒板綋鍓嶆垚浜や环" - if float(limit_up_price) - float(trade_price) > 0.02001: - return False, False, f"褰撳墠鎴愪氦浠凤紙{trade_price}锛夊皻鏈湪2妗e強浠ュ唴" + if float(limit_up_price) - float(trade_price) > 0.04001: + return False, False, f"褰撳墠鎴愪氦浠凤紙{trade_price}锛夊皻鏈湪4妗e強浠ュ唴" # 鍒ゆ柇鎴愪氦杩涘害鏄惁璺濈鎴戜滑鐨勪綅缃緢杩� total_data = local_today_datas.get(code) @@ -892,13 +900,13 @@ # 涔嬪墠娌℃湁娑ㄥ仠杩� # 缁熻涔板叆淇″彿浣嶅埌褰撳墠浣嶇疆娌℃湁鎾ょ殑澶у崟閲戦 min_money_w = l2_data_util.get_big_money_val(float(total_data[buy_single_index]["val"]["price"])) // 10000 - left_big_num = l2.cancel_buy_strategy.SecondCancelBigNumComputer.compute_left_big_num(code, - buy_single_index, - buy_exec_index, - total_data[-1][ - "index"], - total_data, - 0, min_money_w) + left_big_num = l2.cancel_buy_strategy.SecondCancelBigNumComputer().compute_left_big_num(code, + buy_single_index, + buy_exec_index, + total_data[-1][ + "index"], + total_data, + 0, min_money_w) if left_big_num > 0: # 閲嶆柊鑾峰彇鍒嗘暟涓庡垎鏁扮储寮� limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code) @@ -1150,13 +1158,13 @@ # 鏁版嵁鏄惁澶勭悊瀹屾瘯 if compute_index >= compute_end_index: - need_cancel, cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index, - compute_index, - buy_single_index, compute_index, - total_datas, is_first_code, - cls.volume_rate_info[code][1], - cls.volume_rate_info[code][1], - True) + need_cancel, cancel_data = SecondCancelBigNumComputer().need_cancel(code, buy_single_index, + compute_index, + buy_single_index, compute_index, + total_datas, is_first_code, + cls.volume_rate_info[code][1], + cls.volume_rate_info[code][1], + True) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "S绾уぇ鍗曞鐞嗚�楁椂", force=True) l2_log.debug(code, "鏁版嵁澶勭悊瀹屾瘯锛屼笅鍗�, 鏁版嵁鎴浘鏃堕棿-{}", capture_time) @@ -1168,10 +1176,10 @@ else: cls.__buy(code, capture_time, total_datas[compute_index], compute_index, is_first_code) else: - SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index, - compute_index, total_datas, is_first_code, - cls.volume_rate_info[code][1], - cls.volume_rate_info[code][1], False) + SecondCancelBigNumComputer().need_cancel(code, buy_single_index, compute_index, buy_single_index, + compute_index, total_datas, is_first_code, + cls.volume_rate_info[code][1], + cls.volume_rate_info[code][1], False) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "S绾уぇ鍗曞鐞嗚�楁椂", force=True) diff --git a/log_module/log.py b/log_module/log.py index 6ca928a..8b115bb 100644 --- a/log_module/log.py +++ b/log_module/log.py @@ -82,6 +82,10 @@ filter=lambda record: record["extra"].get("name") == "l2_trade_buy_progress", rotation="00:00", compression="zip", enqueue=True) + logger.add(self.get_path("l2", "l2_real_place_order_position"), + filter=lambda record: record["extra"].get("name") == "l2_real_place_order_position", + rotation="00:00", compression="zip", enqueue=True) + logger.add(self.get_path("juejin", "juejin_tick"), filter=lambda record: record["extra"].get("name") == "juejin_tick", rotation="00:00", compression="zip", enqueue=True) @@ -232,6 +236,8 @@ logger_l2_trade_queue = __mylogger.get_logger("l2_trade_queue") logger_l2_trade_buy_queue = __mylogger.get_logger("l2_trade_buy_queue") logger_l2_trade_buy_progress = __mylogger.get_logger("l2_trade_buy_progress") +logger_real_place_order_position = __mylogger.get_logger("l2_real_place_order_position") + logger_l2_big_data = __mylogger.get_logger("l2_big_data") logger_juejin_tick = __mylogger.get_logger("juejin_tick") diff --git a/output/code_info_output.py b/output/code_info_output.py index 422db4f..dc2e1ba 100644 --- a/output/code_info_output.py +++ b/output/code_info_output.py @@ -365,7 +365,7 @@ # H鎾ょ洃鍚寖鍥� if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS: - hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer.get_watch_index_dict(code) + hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer().get_watch_index_dict(code) # 鏍规嵁鏃ュ織璇诲彇瀹炴椂鐨勮绠楁暟鎹� h_cancel_latest_compute_info = log_export.get_h_cancel_compute_info(code) if hcancel_datas_dict: diff --git a/server.py b/server.py index 8eb7c7a..eb8eff7 100644 --- a/server.py +++ b/server.py @@ -390,7 +390,7 @@ if limit_up_price is not None: code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_time, limit_up_price, - sell_one_price, sell_one_volumn) + sell_one_price, sell_one_volumn) _start_time = time.time() msg += "涔�1浠锋牸澶勭悊锛�" + f"{_start_time - __start_time} " @@ -425,15 +425,16 @@ buy_queue_result_list, exec_time) if buy_progress_index is not None: - HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index, - buy_progress_index, - l2.l2_data_util.local_today_datas.get( - code), - l2.l2_data_util.local_today_num_operate_map.get( - code)) - LCancelBigNumComputer.set_trade_progress(code, buy_progress_index, - l2.l2_data_util.local_today_datas.get( - code)) + HourCancelBigNumComputer().set_trade_progress(code, buy_time, + buy_exec_index, + buy_progress_index, + l2.l2_data_util.local_today_datas.get( + code), + l2.l2_data_util.local_today_num_operate_map.get( + code)) + LCancelBigNumComputer().set_trade_progress(code, buy_progress_index, + l2.l2_data_util.local_today_datas.get( + code)) logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} 鏁版嵁-{}", code, buy_progress_index, diff --git a/test/l2_trade_test.py b/test/l2_trade_test.py index 1026ecb..ac02c7b 100644 --- a/test/l2_trade_test.py +++ b/test/l2_trade_test.py @@ -32,7 +32,7 @@ RedisUtils.realse(redis_l2) l2.l2_data_manager.TradePointManager().delete_buy_point(code) big_money_num_manager.reset(code) - RedisUtils.delete( redis_manager.RedisManager(2).getRedis(), "trade-state-{}".format(code)) + RedisUtils.delete(redis_manager.RedisManager(2).getRedis(), "trade-state-{}".format(code)) trade_data_manager.PlaceOrderCountManager().clear_place_order_count(code) redis_info = redis_manager.RedisManager(0).getRedis() keys = RedisUtils.keys(redis_info, "*{}*".format(code), auto_free=False) @@ -46,7 +46,6 @@ BuyL2SafeCountManager().clear_data(code) transaction_progress.TradeBuyQueue().set_traded_index(code, 0) - class VirtualTrade(unittest.TestCase): @@ -76,12 +75,13 @@ buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_, buy_queue_result_list, exec_time) if buy_progress_index is not None: - l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, time_, buy_exec_index, - buy_progress_index, - l2.l2_data_util.local_today_datas.get( - code), - l2.l2_data_util.local_today_num_operate_map.get( - code)) + l2.cancel_buy_strategy.HourCancelBigNumComputer().set_trade_progress(code, time_, + buy_exec_index, + buy_progress_index, + l2.l2_data_util.local_today_datas.get( + code), + l2.l2_data_util.local_today_num_operate_map.get( + code)) log.logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} 鏁版嵁-{}", code, buy_progress_index, json.dumps(buy_queue_result_list)) @@ -137,7 +137,8 @@ RealTimeKplMarketData().set_top_5_reasons(jingxuan_ranks) RealTimeKplMarketData().set_top_5_industry(industry_ranks) - LimitUpCodesPlateKeyManager().set_today_limit_up(KPLDataManager().get_from_file(kpl_util.KPLDataType.LIMIT_UP, tool.get_now_date_str())) + LimitUpCodesPlateKeyManager().set_today_limit_up( + KPLDataManager().get_from_file(kpl_util.KPLDataType.LIMIT_UP, tool.get_now_date_str())) for indexs in pos_list: l2_log.threadIds[code] = mock.Mock( @@ -148,7 +149,8 @@ time_s = tool.get_time_as_second(time_) - i - 1 volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s)) if volumn is not None: - l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil().verify_num(code, int(volumn), tool.time_seconds_format(time_s)) + l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil().verify_num(code, int(volumn), + tool.time_seconds_format(time_s)) break # 璁剧疆濮斾拱闃熷垪 for i in range(0, len(buy_queues)): @@ -176,11 +178,11 @@ l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True) buy_progress_index = 523 - l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index, - l2.l2_data_util.local_today_datas.get( - code), - l2.l2_data_util.local_today_num_operate_map.get( - code)) + l2.cancel_buy_strategy.HourCancelBigNumComputer().set_trade_progress(code, buy_progress_index, + l2.l2_data_util.local_today_datas.get( + code), + l2.l2_data_util.local_today_num_operate_map.get( + code)) # class TestTrade(unittest.TestCase): diff --git a/third_data/block_info.py b/third_data/block_info.py index 6d8c787..7682066 100644 --- a/third_data/block_info.py +++ b/third_data/block_info.py @@ -204,12 +204,13 @@ code_ = data[0] break_codes.add(code_) # 缁熻鍥炲皝 - for data in latest_datas: - if data[5] != target_block: - continue - # 鍥炲皝 - if data[2] != data[3]: - re_limit_codes.add(data[0]) + if latest_datas: + for data in latest_datas: + if data[5] != target_block: + continue + # 鍥炲皝 + if data[2] != data[3]: + re_limit_codes.add(data[0]) # 鎺掗櫎鑷繁 break_codes.discard(code) diff --git a/third_data/data_server.py b/third_data/data_server.py index 997abef..210f7a7 100644 --- a/third_data/data_server.py +++ b/third_data/data_server.py @@ -393,7 +393,7 @@ trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS: - hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer.get_watch_index_dict(code) + hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer().get_watch_index_dict(code) # 鏍规嵁鏃ュ織璇诲彇瀹炴椂鐨勮绠楁暟鎹� h_cancel_latest_compute_info = log_export.get_h_cancel_compute_info(code) if hcancel_datas_dict: diff --git a/trade/huaxin/trade_server.py b/trade/huaxin/trade_server.py index 7b3c85c..df52064 100644 --- a/trade/huaxin/trade_server.py +++ b/trade/huaxin/trade_server.py @@ -211,7 +211,7 @@ "time"] limit_up_price = gpcode_manager.get_limit_up_price(code) if buy_exec_index: - need_cancel, msg = DCancelBigNumComputer.set_trade_progress(code, + need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code, buy_progress_index, buy_exec_index, total_datas, @@ -222,12 +222,12 @@ if need_cancel: L2TradeDataProcessor.cancel_buy(code, f"D鎾�:{msg}", source="d_cancel") - f1 = dask.delayed(HourCancelBigNumComputer.set_trade_progress)(code, buy_time, - buy_exec_index, - buy_progress_index, - total_datas, - num_operate_map) - f2 = dask.delayed(LCancelBigNumComputer.set_trade_progress)(code, + f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time, + buy_exec_index, + buy_progress_index, + total_datas, + num_operate_map) + f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code, buy_progress_index, total_datas) f3 = dask.delayed(deal_big_money_manager.set_trade_progress)(code, @@ -255,8 +255,9 @@ if limit_up_price is not None: # 澶勭悊涔�1,鍗�1淇℃伅 - code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str, limit_up_price, - sell_1_price, sell_1_volume // 100) + code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str, + limit_up_price, + sell_1_price, sell_1_volume // 100) pre_close_price = round(float(limit_up_price) / 1.1, 2) # 濡傛灉娑ㄥ箙澶т簬8%灏辫鍙栨澘鍧� if (buy_1_price - pre_close_price) / pre_close_price > 0.08: diff --git a/trade/trade_result_manager.py b/trade/trade_result_manager.py index e3f98e3..16709ce 100644 --- a/trade/trade_result_manager.py +++ b/trade/trade_result_manager.py @@ -30,9 +30,9 @@ # 瀹夊叏绗旀暟璁$畻 f5 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index, total_datas[-1]["index"]) - f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code) - f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code) - f8 = dask.delayed(LCancelBigNumComputer.cancel_success)(code) + f6 = dask.delayed(SecondCancelBigNumComputer().cancel_success)(code) + f7 = dask.delayed(DCancelBigNumComputer().cancel_success)(code) + f8 = dask.delayed(LCancelBigNumComputer().cancel_success)(code) dask.compute(f1, f2, f5, f6, f7, f8) @@ -54,9 +54,9 @@ @dask.delayed def h_cancel(code, buy_single_index, buy_exec_index): try: - HourCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index, - local_today_datas.get(code), - local_today_num_operate_map.get(code)) + HourCancelBigNumComputer().place_order_success(code, buy_single_index, buy_exec_index, + local_today_datas.get(code), + local_today_num_operate_map.get(code)) except Exception as e: logging.exception(e) logger_l2_error.exception(e) @@ -64,7 +64,7 @@ @dask.delayed def l_cancel(code): try: - LCancelBigNumComputer.del_watch_index(code) + LCancelBigNumComputer().del_watch_index(code) except Exception as e: logging.exception(e) logger_l2_error.exception(e) @@ -88,9 +88,9 @@ # 鍙栨秷涔板叆鏍囪瘑 f2 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_point)(code) f3 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_cancel_point)(code) - f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code) - f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code) - f8 = dask.delayed(LCancelBigNumComputer.cancel_success)(code) + f6 = dask.delayed(SecondCancelBigNumComputer().cancel_success)(code) + f7 = dask.delayed(DCancelBigNumComputer().cancel_success)(code) + f8 = dask.delayed(LCancelBigNumComputer().cancel_success)(code) dask.compute(f1, f2, f3, f6, f7, f8) @@ -99,7 +99,7 @@ code = "600246" f2 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_point)(code) f3 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_cancel_point)(code) - f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code) - f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code) - f8 = dask.delayed(LCancelBigNumComputer.cancel_success)(code) + f6 = dask.delayed(SecondCancelBigNumComputer().cancel_success)(code) + f7 = dask.delayed(DCancelBigNumComputer().cancel_success)(code) + f8 = dask.delayed(LCancelBigNumComputer().cancel_success)(code) dask.compute(f2, f3, f6, f7, f8) -- Gitblit v1.8.0