From 72149b3076983701b17f3dc55fd3ca60243c1f58 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期二, 08 八月 2023 10:42:42 +0800
Subject: [PATCH] redis异步数据提交

---
 l2/l2_data_manager_new.py |   77 +++++++++++++++++++++-----------------
 1 files changed, 42 insertions(+), 35 deletions(-)

diff --git a/l2/l2_data_manager_new.py b/l2/l2_data_manager_new.py
index f5462bd..9a29fee 100644
--- a/l2/l2_data_manager_new.py
+++ b/l2/l2_data_manager_new.py
@@ -57,25 +57,31 @@
 
 
 # m鍊煎ぇ鍗曞鐞�
-m_big_money_begin_cache = {}
-m_big_money_process_index_cache = {}
 
 
 class L2BigNumForMProcessor:
+    _db = 1
+    _redis_manager = redis_manager.RedisManager(1)
+    m_big_money_begin_cache = {}
+    m_big_money_process_index_cache = {}
+    __instance = None
 
-    def __init__(self):
-        self._redis_manager = redis_manager.RedisManager(1)
+    def __new__(cls, *args, **kwargs):
+        if not cls.__instance:
+            cls.__instance = super(L2BigNumForMProcessor, cls).__new__(cls, *args, **kwargs)
+        return cls.__instance
 
-    def __get_redis(self):
-        return self._redis_manager.getRedis()
+    @classmethod
+    def __get_redis(cls):
+        return cls._redis_manager.getRedis()
 
     # 淇濆瓨璁$畻寮�濮嬩綅缃�
     def set_begin_pos(self, code, index):
         if self.__get_begin_pos_cache(code) is None:
-            tool.CodeDataCacheUtil.set_cache(m_big_money_begin_cache, code, index)
+            tool.CodeDataCacheUtil.set_cache(self.m_big_money_begin_cache, code, index)
             # 淇濆瓨浣嶇疆
             key = "m_big_money_begin-{}".format(code)
-            RedisUtils.setex_async(self.__get_redis(), key, tool.get_expire(), index)
+            RedisUtils.setex_async(self._db, key, tool.get_expire(), index)
 
     # 鑾峰彇璁$畻寮�濮嬩綅缃�
     def __get_begin_pos(self, code):
@@ -86,24 +92,24 @@
         return int(val)
 
     def __get_begin_pos_cache(self, code):
-        cache_result = tool.CodeDataCacheUtil.get_cache(m_big_money_begin_cache, code)
+        cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_begin_cache, code)
         if cache_result[0]:
             return cache_result[1]
         val = self.__get_begin_pos(code)
-        tool.CodeDataCacheUtil.set_cache(m_big_money_begin_cache, code, val)
+        tool.CodeDataCacheUtil.set_cache(self.m_big_money_begin_cache, code, val)
         return val
 
     # 娓呴櫎宸茬粡澶勭悊鐨勬暟鎹�
     def clear_processed_end_index(self, code):
-        tool.CodeDataCacheUtil.clear_cache(m_big_money_process_index_cache, code)
+        tool.CodeDataCacheUtil.clear_cache(self.m_big_money_process_index_cache, code)
         key = "m_big_money_process_index-{}".format(code)
-        RedisUtils.delete(self.__get_redis(), key)
+        RedisUtils.delete_async(self._db, key)
 
     # 娣诲姞宸茬粡澶勭悊杩囩殑鍗�
     def __set_processed_end_index(self, code, index):
-        tool.CodeDataCacheUtil.set_cache(m_big_money_process_index_cache, code, index)
+        tool.CodeDataCacheUtil.set_cache(self.m_big_money_process_index_cache, code, index)
         key = "m_big_money_process_index-{}".format(code)
-        RedisUtils.setex_async(self.__get_redis(), key, tool.get_expire(), index)
+        RedisUtils.setex_async(self._db, key, tool.get_expire(), index)
 
     # 鏄惁宸茬粡澶勭悊杩�
     def __get_processed_end_index(self, code):
@@ -114,11 +120,11 @@
         return int(val)
 
     def __get_processed_end_index_cache(self, code):
-        cache_result = tool.CodeDataCacheUtil.get_cache(m_big_money_process_index_cache, code)
+        cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_process_index_cache, code)
         if cache_result[0]:
             return cache_result[1]
         val = self.__get_processed_end_index(code)
-        tool.CodeDataCacheUtil.set_cache(m_big_money_process_index_cache, code, val)
+        tool.CodeDataCacheUtil.set_cache(self.m_big_money_process_index_cache, code, val)
         return val
 
     # 澶勭悊澶у崟
@@ -310,7 +316,7 @@
                         # 褰撳墠娑ㄥ仠浠凤紝璁剧疆娑ㄥ仠鏃堕棿
                         logger_l2_process.info("寮�鐩樻定鍋滐細{}", code)
                         # 淇濆瓨娑ㄥ仠鏃堕棿
-                        limit_up_time_manager.save_limit_up_time(code, "09:30:00")
+                        limit_up_time_manager.LimitUpTimeManager().save_limit_up_time(code, "09:30:00")
 
         total_datas = local_today_datas[code]
         __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
@@ -323,7 +329,7 @@
             volume_rate = code_volumn_manager.get_volume_rate(code)
             volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate)
             # 璁$畻鍒嗗��
-            limit_up_time = limit_up_time_manager.get_limit_up_time(code)
+            limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
             if limit_up_time is None:
                 limit_up_time = tool.get_now_time_str()
             score = first_code_score_manager.get_score(code, volume_rate, limit_up_time, True)
@@ -681,10 +687,10 @@
         # is_limited_up = gpcode_manager.FirstCodeManager().is_limited_up(code)
         # if not is_limited_up:
         #     gpcode_manager.FirstCodeManager().add_limited_up_record([code])
-        #     place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(
+        #     place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(
         #         code)
         #     if place_order_count == 0:
-        #         trade_data_manager.placeordercountmanager.place_order(code)
+        #         trade_data_manager.PlaceOrderCountManager.place_order(code)
         #     return False, True, "棣栨澘浠g爜锛屼笖灏氭湭娑ㄥ仠杩�"
 
         try:
@@ -729,7 +735,7 @@
             if volumn_rate >= 1.3:
                 return False, False, "鏈�澶ч噺姣旇秴杩�1.3涓嶈兘涔�"
 
-            limit_up_time = limit_up_time_manager.get_limit_up_time(code)
+            limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
             if limit_up_time is not None:
                 limit_up_time_seconds = l2.l2_data_util.L2DataUtil.get_time_as_second(
                     limit_up_time)
@@ -877,7 +883,7 @@
                                                                                                   0, min_money_w)
             if left_big_num > 0:
                 # 閲嶆柊鑾峰彇鍒嗘暟涓庡垎鏁扮储寮�
-                limit_up_time = limit_up_time_manager.get_limit_up_time(code)
+                limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
                 if limit_up_time is None:
                     limit_up_time = tool.get_now_time_str()
                 score = first_code_score_manager.get_score(code, cls.volume_rate_info[code][0], limit_up_time, True,
@@ -895,17 +901,18 @@
             score = cls.__l2PlaceOrderParamsManagerDict[code].score
             score_info = cls.__l2PlaceOrderParamsManagerDict[code].score_info
 
-            lp = LineProfiler()
-            lp.enable()
-            lp_wrap = lp(cls.can_buy_first)
-            results = lp_wrap(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code])
-            output = io.StringIO()
-            lp.print_stats(stream=output)
-            lp.disable()
-            with open(f"{constant.get_path_prefix()}/logs/profile/{code}_can_buy_first.txt", 'w') as f:
-                f.write(output.getvalue())
-            # return cls.can_buy_first(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code])
-            return results
+            # lp = LineProfiler()
+            # lp.enable()
+            # lp_wrap = lp(cls.can_buy_first)
+            # results = lp_wrap(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code])
+            # output = io.StringIO()
+            # lp.print_stats(stream=output)
+            # lp.disable()
+            # with open(f"{constant.get_path_prefix()}/logs/profile/{code}_can_buy_first.txt", 'w') as f:
+            #     f.write(output.getvalue())
+            # return results
+            return cls.can_buy_first(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code])
+
         else:
             return True, False, "鍦ㄦ兂涔板悕鍗曚腑"
 
@@ -1092,7 +1099,7 @@
             f1 = dask.delayed(cls.__save_order_begin_data)(code, buy_single_index, compute_index, compute_index,
                                                            buy_nums, buy_count, max_num_set_new,
                                                            cls.volume_rate_info[code][0])
-            f2 = dask.delayed(limit_up_time_manager.save_limit_up_time)(code, total_datas[compute_index]["val"]["time"])
+            f2 = dask.delayed(limit_up_time_manager.LimitUpTimeManager().save_limit_up_time)(code, total_datas[compute_index]["val"]["time"])
             f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time)
             f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
             f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index,
@@ -1262,7 +1269,7 @@
         # 鐩爣鎵嬫暟
         threshold_num = round(threshold_money / (limit_up_price * 100))
 
-        # place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
+        # place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(code)
         # 鐩爣璁㈠崟鏁伴噺
         threshold_count = cls.__l2PlaceOrderParamsManagerDict[code].get_safe_count()
 

--
Gitblit v1.8.0