From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 18 六月 2025 18:41:30 +0800
Subject: [PATCH] 异常保护

---
 l2/l2_data_util.py |  194 ++++++++++++++++++++++++++++++++++-------------
 1 files changed, 139 insertions(+), 55 deletions(-)

diff --git a/l2/l2_data_util.py b/l2/l2_data_util.py
index 6a412dc..e12f024 100644
--- a/l2/l2_data_util.py
+++ b/l2/l2_data_util.py
@@ -13,12 +13,13 @@
 
 import constant
 from code_attribute import gpcode_manager
-from db.redis_manager import RedisUtils
+from db.redis_manager_delegate import RedisUtils
 from l2 import l2_data_log, l2_data_source_util
-from log_module import log, log_export
-from db import redis_manager
+from log_module import log, log_export, async_log_util
+from db import redis_manager_delegate as redis_manager
 from utils import tool
 
+__db = 1
 _redisManager = redis_manager.RedisManager(1)
 # l2鏁版嵁绠$悊
 # 鏈湴鏈�鏂颁竴娆′笂浼犵殑鏁版嵁
@@ -32,14 +33,19 @@
 # 涔板叆璁㈠崟鍙锋槧灏�,鍙湁鍘熺敓鐨凩2鏁版嵁鎵嶆湁
 local_today_buyno_map = {}
 
+# 鍗栧嚭璁㈠崟鍙锋槧灏勶紝鍙湁鍘熺敓鐨凩2鏁版嵁鎵嶆湁
+local_today_sellno_map = {}
+
+# 宸茬粡鎾ゅ崟鐨勮鍗曞彿
+local_today_canceled_buyno_map = {}
+
 
 def load_l2_data(code, load_latest=True, force=False):
-    redis = _redisManager.getRedis()
     # 鍔犺浇鏈�杩戠殑l2鏁版嵁
     if load_latest:
         if local_latest_datas.get(code) is None or force:
             # 鑾峰彇鏈�杩戠殑鏁版嵁
-            _data = RedisUtils.get(redis, "l2-data-latest-{}".format(code))
+            _data = RedisUtils.get(_redisManager.getRedis(), "l2-data-latest-{}".format(code))
             if _data is not None:
                 if code in local_latest_datas:
                     local_latest_datas[code] = json.loads(_data)
@@ -71,8 +77,43 @@
         # 鏍规嵁浠婃棩鏁版嵁鍔犺浇
         load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
         load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force)
+        load_sell_no_map(local_today_sellno_map, code, local_today_datas.get(code), force)
+        load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code), force)
         return data_normal
     return True
+
+
+# L2鏁版嵁鏄惁姝e父
+def l2_data_is_normal(code):
+    datas = local_today_datas.get(code)
+    if not datas:
+        # 鍒濆鍖�
+        local_today_datas[code] = []
+        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code))
+        load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code))
+        load_sell_no_map(local_today_sellno_map, code, local_today_datas.get(code))
+        load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code))
+
+    if datas and len(datas) != datas[-1]["index"] + 1:
+        return False
+    return True
+
+
+# 鍔犺浇鎵�鏈夌殑l2鏁版嵁
+def load_l2_data_all(force=False):
+    datas = log_export.load_l2_from_log()
+    for code in datas:
+        if force:
+            local_today_datas[code] = datas[code]
+        else:
+            if code not in local_today_datas:
+                local_today_datas[code] = datas[code]
+        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
+        load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force)
+        load_sell_no_map(local_today_sellno_map, code, local_today_datas.get(code), force)
+        load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code), force)
+
+    constant.L2_DATA_IS_LOADED = True
 
 
 # 灏嗘暟鎹牴鎹畁um-operate鍒嗙被
@@ -108,6 +149,45 @@
             local_today_buyno_map[code].setdefault(key, data)
 
 
+# 灏嗘暟鎹牴鎹畂rderNo鍒嗙被,鍘熺敓鏁版嵁鎵嶆湁
+def load_sell_no_map(local_today_sellno_map, code, source_datas, clear=False):
+    # 鍙湁鍘熺敓L2鏁版嵁鎵嶄細鏈夋鎿嶄綔
+    if constant.L2_SOURCE_TYPE != constant.L2_SOURCE_TYPE_HUAXIN:
+        return
+    if local_today_sellno_map.get(code) is None:
+        local_today_sellno_map[code] = {}
+    if clear:
+        local_today_sellno_map[code] = {}
+
+    for data in source_datas:
+        if data["val"]["operateType"] != 2:
+            continue
+        # 鍙~鍏呬拱鍏ユ暟鎹�
+        key = "{}".format(data["val"]["orderNo"])
+        if local_today_sellno_map[code].get(key) is None:
+            local_today_sellno_map[code].setdefault(key, data)
+
+
+# 灏嗘暟鎹牴鎹畂rderNo鍒嗙被宸叉挙璁㈠崟,鍘熺敓鏁版嵁鎵嶆湁
+def load_canceled_buy_no_map(local_today_canceled_buyno_map, code, source_datas, clear=False):
+    # 鍙湁鍘熺敓L2鏁版嵁鎵嶄細鏈夋鎿嶄綔
+    if constant.L2_SOURCE_TYPE != constant.L2_SOURCE_TYPE_HUAXIN:
+        return
+    if local_today_canceled_buyno_map.get(code) is None:
+        local_today_canceled_buyno_map[code] = {}
+    if clear:
+        local_today_canceled_buyno_map[code] = {}
+
+    for data in source_datas:
+        # 鍙暀涓嬩拱鎾�
+        if data["val"]["operateType"] != 1:
+            continue
+        # 鍙~鍏呬拱鍏ユ暟鎹�
+        key = "{}".format(data["val"]["orderNo"])
+        if local_today_canceled_buyno_map[code].get(key) is None:
+            local_today_canceled_buyno_map[code].setdefault(key, data)
+
+
 @tool.async_call
 def saveL2Data(code, datas, msg=""):
     start_time = round(time.time() * 1000)
@@ -118,28 +198,29 @@
     redis_instance = _redisManager.getRedis()
 
     try:
-        if RedisUtils.setnx(redis_instance, "l2-save-{}".format(code), "1") > 0:
+        if RedisUtils.setnx(redis_instance, "l2-save-{}".format(code), "1", auto_free=False) > 0:
             # 璁$畻淇濈暀鐨勬椂闂�
             expire = tool.get_expire()
             i = 0
             for _data in datas:
                 i += 1
                 key = "l2-" + _data["key"]
-                value = RedisUtils.get(redis_instance, key)
+                value = RedisUtils.get(redis_instance, key, auto_free=False)
                 if value is None:
                     # 鏂板
                     try:
                         value = {"index": _data["index"], "re": _data["re"]}
-                        RedisUtils.setex(redis_instance, key, expire, json.dumps(value))
+                        RedisUtils.setex(redis_instance, key, expire, json.dumps(value), auto_free=False)
                     except:
                         logging.error("鏇存L2鏁版嵁鍑洪敊锛歿} key:{}".format(code, key))
                 else:
                     json_value = json.loads(value)
                     if json_value["re"] != _data["re"]:
                         json_value["re"] = _data["re"]
-                        RedisUtils.setex(redis_instance, key, expire, json.dumps(json_value))
+                        RedisUtils.setex(redis_instance, key, expire, json.dumps(json_value), auto_free=False)
     finally:
-        RedisUtils.delete(redis_instance, "l2-save-{}".format(code))
+        RedisUtils.delete(redis_instance, "l2-save-{}".format(code), auto_free=False)
+        RedisUtils.realse(redis_instance)
 
     print("淇濆瓨鏂版暟鎹敤鏃讹細", msg, "鑰楁椂锛歿}".format(round(time.time() * 1000) - start_time))
     return datas
@@ -147,30 +228,27 @@
 
 # 淇濆瓨l2鏁版嵁
 def save_l2_data(code, datas, add_datas):
-    redis = _redisManager.getRedis()
     # 鍙湁鏈夋柊鏇炬暟鎹墠闇�瑕佷繚瀛�
-    if len(add_datas) > 0:
+    if add_datas:
         # 淇濆瓨鏈�杩戠殑鏁版嵁
         __start_time = round(time.time() * 1000)
         if datas:
-            RedisUtils.setex(redis, "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
-            l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂")
+            RedisUtils.setex_async(__db, "l2-data-latest-{}".format(code), tool.get_expire(),
+                                   json.dumps(datas))
+            # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂")
             # 璁剧疆杩涘唴瀛�
             local_latest_datas[code] = datas
             set_l2_data_latest_count(code, len(datas))
         try:
-            log.logger_l2_data.info("{}-{}", code, add_datas)
+            async_log_util.l2_data_log.info(log.logger_l2_data, f"{code}-{add_datas}")
         except Exception as e:
             logging.exception(e)
-        # 鏆傛椂涓嶅皢鏁版嵁淇濆瓨鍒皉edis
-        # saveL2Data(code, add_datas)
 
 
 # 璁剧疆鏈�鏂扮殑l2鏁版嵁閲囬泦鐨勬暟閲�
 def set_l2_data_latest_count(code, count):
-    redis = _redisManager.getRedis()
     key = "latest-l2-count-{}".format(code)
-    RedisUtils.setex(redis, key, 2, count)
+    RedisUtils.setex(_redisManager.getRedis(), key, 2, count)
     pass
 
 
@@ -178,10 +256,9 @@
 def get_l2_data_latest_count(code):
     if code is None or len(code) < 1:
         return 0
-    redis = _redisManager.getRedis()
     key = "latest-l2-count-{}".format(code)
 
-    result = RedisUtils.get(redis, key)
+    result = RedisUtils.get(_redisManager.getRedis(), key)
     if result is None:
         return 0
     else:
@@ -218,33 +295,12 @@
     return False
 
 
-# 鏄惁涓哄ぇ鍗�
-def is_big_money(val):
-    price = float(val["price"])
-    money = price * int(val["num"])
-    if price > 3.0:
-        if money >= 30000:
-            return True
-        else:
-            return False
-    else:
-        max_money = price * 10000
-        if money >= max_money * 0.95:
-            return True
-        else:
-            return False
-
-
 class L2DataUtil:
     @classmethod
     def is_same_time(cls, time1, time2):
         if constant.TEST:
             return True
-        time1_s = time1.split(":")
-        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
-        time2_s = time2.split(":")
-        time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
-        if abs(time2_second - time1_second) < 3:
+        if abs(tool.trade_time_sub(time1, time2)) < 3:
             return True
         else:
             return False
@@ -350,7 +406,7 @@
                 # 鏁版嵁閲嶅娆℃暟榛樿涓�1
                 datas.append({"key": key, "val": item, "re": 1})
                 dataIndexs.setdefault(key, len(datas) - 1)
-        # TODO 娴嬭瘯鐨勬椂鍊欏紑鍚紝鏂逛究璁板綍澶у崟鏁版嵁
+        # 娴嬭瘯鐨勬椂鍊欏紑鍚紝鏂逛究璁板綍澶у崟鏁版嵁
         # l2_data_util.save_big_data(code, same_time_num, data)
         return datas
 
@@ -387,11 +443,16 @@
 
         if int(val["operateType"]) != 2:
             return False
+        return True
 
-        price = float(val["price"])
-        num = int(val["num"])
-        # if price * num * 100 < 50 * 10000:
-        #     return False
+    # 娑ㄥ仠鍗栨挙
+    @classmethod
+    def is_limit_up_price_sell_cancel(cls, val):
+        if int(val["limitPrice"]) != 1:
+            return False
+
+        if int(val["operateType"]) != 3:
+            return False
         return True
 
     # 鏄惁娑ㄥ仠涔版挙
@@ -403,10 +464,12 @@
         if int(val["operateType"]) != 1:
             return False
 
-        price = float(val["price"])
-        num = int(val["num"])
-        # if price * num * 100 < 50 * 10000:
-        #     return False
+        return True
+
+    @classmethod
+    def is_buy_cancel(cls, val):
+        if int(val["operateType"]) != 1:
+            return False
         return True
 
     # 鏄惁鍗栨挙
@@ -423,6 +486,26 @@
             return True
         return False
 
+    # 鏄惁涓轰拱
+    @classmethod
+    def is_buy(cls, val):
+        if int(val["operateType"]) == 0:
+            return True
+        return False
+
+    # l2鏃堕棿宸��
+    @classmethod
+    def time_sub_as_ms(cls, val1, val2):
+        # 璁$畻鏃堕棿宸��
+        sub_s = tool.trade_time_sub(val1["time"], val2["time"])
+        sub_ms = int(val1["tms"]) - int(val2["tms"])
+        fs = sub_s * 1000 + sub_ms
+        return fs
+
+    @classmethod
+    def get_time_with_ms(cls, val):
+        return val["time"] + "." + "{:0>3}".format(int(val["tms"]))
+
 
 class L2TradeQueueUtils(object):
     # 涔板叆鏁版嵁鏄惁宸叉挙
@@ -434,8 +517,9 @@
         # 鏄惁鏈変拱鎾ゆ暟鎹�
         if cancel_datas:
             for cancel_data in cancel_datas:
-                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
-                                                                                                 local_today_num_operate_map)
+                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(cancel_data,
+                                                                                                    local_today_buyno_map.get(
+                                                                                                        code))
                 if buy_index == data["index"]:
                     return True
         return False
@@ -536,4 +620,4 @@
 
 
 if __name__ == "__main__":
-    print(load_l2_data("002235"))
+    print(L2DataUtil.time_sub_as_ms({"time": "09:46:05", "tms": 480}, {"time": "09:46:04", "tms": 90}))

--
Gitblit v1.8.0