From 72149b3076983701b17f3dc55fd3ca60243c1f58 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 08 八月 2023 10:42:42 +0800 Subject: [PATCH] redis异步数据提交 --- l2/l2_data_manager_new.py | 77 +++++++++++++++++++++----------------- 1 files changed, 42 insertions(+), 35 deletions(-) diff --git a/l2/l2_data_manager_new.py b/l2/l2_data_manager_new.py index f5462bd..9a29fee 100644 --- a/l2/l2_data_manager_new.py +++ b/l2/l2_data_manager_new.py @@ -57,25 +57,31 @@ # m鍊煎ぇ鍗曞鐞� -m_big_money_begin_cache = {} -m_big_money_process_index_cache = {} class L2BigNumForMProcessor: + _db = 1 + _redis_manager = redis_manager.RedisManager(1) + m_big_money_begin_cache = {} + m_big_money_process_index_cache = {} + __instance = None - def __init__(self): - self._redis_manager = redis_manager.RedisManager(1) + def __new__(cls, *args, **kwargs): + if not cls.__instance: + cls.__instance = super(L2BigNumForMProcessor, cls).__new__(cls, *args, **kwargs) + return cls.__instance - def __get_redis(self): - return self._redis_manager.getRedis() + @classmethod + def __get_redis(cls): + return cls._redis_manager.getRedis() # 淇濆瓨璁$畻寮�濮嬩綅缃� def set_begin_pos(self, code, index): if self.__get_begin_pos_cache(code) is None: - tool.CodeDataCacheUtil.set_cache(m_big_money_begin_cache, code, index) + tool.CodeDataCacheUtil.set_cache(self.m_big_money_begin_cache, code, index) # 淇濆瓨浣嶇疆 key = "m_big_money_begin-{}".format(code) - RedisUtils.setex_async(self.__get_redis(), key, tool.get_expire(), index) + RedisUtils.setex_async(self._db, key, tool.get_expire(), index) # 鑾峰彇璁$畻寮�濮嬩綅缃� def __get_begin_pos(self, code): @@ -86,24 +92,24 @@ return int(val) def __get_begin_pos_cache(self, code): - cache_result = tool.CodeDataCacheUtil.get_cache(m_big_money_begin_cache, code) + cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_begin_cache, code) if cache_result[0]: return cache_result[1] val = self.__get_begin_pos(code) - tool.CodeDataCacheUtil.set_cache(m_big_money_begin_cache, code, val) + tool.CodeDataCacheUtil.set_cache(self.m_big_money_begin_cache, code, val) return val # 娓呴櫎宸茬粡澶勭悊鐨勬暟鎹� def clear_processed_end_index(self, code): - tool.CodeDataCacheUtil.clear_cache(m_big_money_process_index_cache, code) + tool.CodeDataCacheUtil.clear_cache(self.m_big_money_process_index_cache, code) key = "m_big_money_process_index-{}".format(code) - RedisUtils.delete(self.__get_redis(), key) + RedisUtils.delete_async(self._db, key) # 娣诲姞宸茬粡澶勭悊杩囩殑鍗� def __set_processed_end_index(self, code, index): - tool.CodeDataCacheUtil.set_cache(m_big_money_process_index_cache, code, index) + tool.CodeDataCacheUtil.set_cache(self.m_big_money_process_index_cache, code, index) key = "m_big_money_process_index-{}".format(code) - RedisUtils.setex_async(self.__get_redis(), key, tool.get_expire(), index) + RedisUtils.setex_async(self._db, key, tool.get_expire(), index) # 鏄惁宸茬粡澶勭悊杩� def __get_processed_end_index(self, code): @@ -114,11 +120,11 @@ return int(val) def __get_processed_end_index_cache(self, code): - cache_result = tool.CodeDataCacheUtil.get_cache(m_big_money_process_index_cache, code) + cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_process_index_cache, code) if cache_result[0]: return cache_result[1] val = self.__get_processed_end_index(code) - tool.CodeDataCacheUtil.set_cache(m_big_money_process_index_cache, code, val) + tool.CodeDataCacheUtil.set_cache(self.m_big_money_process_index_cache, code, val) return val # 澶勭悊澶у崟 @@ -310,7 +316,7 @@ # 褰撳墠娑ㄥ仠浠凤紝璁剧疆娑ㄥ仠鏃堕棿 logger_l2_process.info("寮�鐩樻定鍋滐細{}", code) # 淇濆瓨娑ㄥ仠鏃堕棿 - limit_up_time_manager.save_limit_up_time(code, "09:30:00") + limit_up_time_manager.LimitUpTimeManager().save_limit_up_time(code, "09:30:00") total_datas = local_today_datas[code] __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, @@ -323,7 +329,7 @@ volume_rate = code_volumn_manager.get_volume_rate(code) volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate) # 璁$畻鍒嗗�� - limit_up_time = limit_up_time_manager.get_limit_up_time(code) + limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code) if limit_up_time is None: limit_up_time = tool.get_now_time_str() score = first_code_score_manager.get_score(code, volume_rate, limit_up_time, True) @@ -681,10 +687,10 @@ # is_limited_up = gpcode_manager.FirstCodeManager().is_limited_up(code) # if not is_limited_up: # gpcode_manager.FirstCodeManager().add_limited_up_record([code]) - # place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count( + # place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count( # code) # if place_order_count == 0: - # trade_data_manager.placeordercountmanager.place_order(code) + # trade_data_manager.PlaceOrderCountManager.place_order(code) # return False, True, "棣栨澘浠g爜锛屼笖灏氭湭娑ㄥ仠杩�" try: @@ -729,7 +735,7 @@ if volumn_rate >= 1.3: return False, False, "鏈�澶ч噺姣旇秴杩�1.3涓嶈兘涔�" - limit_up_time = limit_up_time_manager.get_limit_up_time(code) + limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code) if limit_up_time is not None: limit_up_time_seconds = l2.l2_data_util.L2DataUtil.get_time_as_second( limit_up_time) @@ -877,7 +883,7 @@ 0, min_money_w) if left_big_num > 0: # 閲嶆柊鑾峰彇鍒嗘暟涓庡垎鏁扮储寮� - limit_up_time = limit_up_time_manager.get_limit_up_time(code) + limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code) if limit_up_time is None: limit_up_time = tool.get_now_time_str() score = first_code_score_manager.get_score(code, cls.volume_rate_info[code][0], limit_up_time, True, @@ -895,17 +901,18 @@ score = cls.__l2PlaceOrderParamsManagerDict[code].score score_info = cls.__l2PlaceOrderParamsManagerDict[code].score_info - lp = LineProfiler() - lp.enable() - lp_wrap = lp(cls.can_buy_first) - results = lp_wrap(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code]) - output = io.StringIO() - lp.print_stats(stream=output) - lp.disable() - with open(f"{constant.get_path_prefix()}/logs/profile/{code}_can_buy_first.txt", 'w') as f: - f.write(output.getvalue()) - # return cls.can_buy_first(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code]) - return results + # lp = LineProfiler() + # lp.enable() + # lp_wrap = lp(cls.can_buy_first) + # results = lp_wrap(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code]) + # output = io.StringIO() + # lp.print_stats(stream=output) + # lp.disable() + # with open(f"{constant.get_path_prefix()}/logs/profile/{code}_can_buy_first.txt", 'w') as f: + # f.write(output.getvalue()) + # return results + return cls.can_buy_first(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code]) + else: return True, False, "鍦ㄦ兂涔板悕鍗曚腑" @@ -1092,7 +1099,7 @@ f1 = dask.delayed(cls.__save_order_begin_data)(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count, max_num_set_new, cls.volume_rate_info[code][0]) - f2 = dask.delayed(limit_up_time_manager.save_limit_up_time)(code, total_datas[compute_index]["val"]["time"]) + f2 = dask.delayed(limit_up_time_manager.LimitUpTimeManager().save_limit_up_time)(code, total_datas[compute_index]["val"]["time"]) f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time) f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code) f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index, @@ -1262,7 +1269,7 @@ # 鐩爣鎵嬫暟 threshold_num = round(threshold_money / (limit_up_price * 100)) - # place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code) + # place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(code) # 鐩爣璁㈠崟鏁伴噺 threshold_count = cls.__l2PlaceOrderParamsManagerDict[code].get_safe_count() -- Gitblit v1.8.0