From 0e68e24f54db11d340785b17570fff2bc5fc7ac6 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 24 七月 2023 13:05:16 +0800
Subject: [PATCH] bug修复

---
 l2/l2_data_util.py |  229 +++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 172 insertions(+), 57 deletions(-)

diff --git a/l2/l2_data_util.py b/l2/l2_data_util.py
index 7c60dea..c7c25d4 100644
--- a/l2/l2_data_util.py
+++ b/l2/l2_data_util.py
@@ -9,12 +9,14 @@
 import logging
 import time
 
+import numpy
+
 import constant
-import gpcode_manager
-from l2 import l2_data_log
-import log
+from code_attribute import gpcode_manager
+from l2 import l2_data_log, l2_data_source_util
+from log_module import log
 from db import redis_manager
-import tool
+from utils import tool
 
 _redisManager = redis_manager.RedisManager(1)
 # l2鏁版嵁绠$悊
@@ -26,18 +28,22 @@
 # 鐢ㄤ簬鍔犲揩鏁版嵁澶勭悊锛岀敤绌烘崲鏃堕棿
 local_today_num_operate_map = {}
 
+# 涔板叆璁㈠崟鍙锋槧灏�,鍙湁鍘熺敓鐨凩2鏁版嵁鎵嶆湁
+local_today_buyno_map = {}
 
-def load_l2_data(code, force=False):
+
+def load_l2_data(code, load_latest=True, force=False):
     redis = _redisManager.getRedis()
     # 鍔犺浇鏈�杩戠殑l2鏁版嵁
-    if local_latest_datas.get(code) is None or force:
-        # 鑾峰彇鏈�杩戠殑鏁版嵁
-        _data = redis.get("l2-data-latest-{}".format(code))
-        if _data is not None:
-            if code in local_latest_datas:
-                local_latest_datas[code] = json.loads(_data)
-            else:
-                local_latest_datas.setdefault(code, json.loads(_data))
+    if load_latest :
+        if local_latest_datas.get(code) is None or force:
+            # 鑾峰彇鏈�杩戠殑鏁版嵁
+            _data = redis.get("l2-data-latest-{}".format(code))
+            if _data is not None:
+                if code in local_latest_datas:
+                    local_latest_datas[code] = json.loads(_data)
+                else:
+                    local_latest_datas.setdefault(code, json.loads(_data))
         # 鑾峰彇浠婃棩鐨勬暟鎹�
 
     if local_today_datas.get(code) is None or force:
@@ -46,6 +52,9 @@
         if datas is None:
             datas = []
         local_today_datas[code] = datas
+        data_normal = True
+        if datas and len(datas) < datas[-1]["index"] + 1:
+            data_normal = False
 
         # 浠庢暟鎹簱鍔犺浇
         # datas = []
@@ -60,6 +69,9 @@
         # local_today_datas[code] = new_datas
         # 鏍规嵁浠婃棩鏁版嵁鍔犺浇
         load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
+        load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force)
+        return data_normal
+    return True
 
 
 # 灏嗘暟鎹牴鎹畁um-operate鍒嗙被
@@ -74,6 +86,25 @@
         if local_today_num_operate_map[code].get(key) is None:
             local_today_num_operate_map[code].setdefault(key, [])
         local_today_num_operate_map[code].get(key).append(data)
+
+
+# 灏嗘暟鎹牴鎹畂rderNo鍒嗙被,鍘熺敓鏁版嵁鎵嶆湁
+def load_buy_no_map(local_today_buyno_map, code, source_datas, clear=False):
+    # 鍙湁鍘熺敓L2鏁版嵁鎵嶄細鏈夋鎿嶄綔
+    if constant.L2_SOURCE_TYPE != constant.L2_SOURCE_TYPE_HUAXIN:
+        return
+    if local_today_buyno_map.get(code) is None:
+        local_today_buyno_map[code] = {}
+    if clear:
+        local_today_buyno_map[code] = {}
+
+    for data in source_datas:
+        if data["val"]["operateType"] != 0:
+            continue
+        # 鍙~鍏呬拱鍏ユ暟鎹�
+        key = "{}".format(data["val"]["orderNo"])
+        if local_today_buyno_map[code].get(key) is None:
+            local_today_buyno_map[code].setdefault(key, data)
 
 
 @tool.async_call
@@ -115,26 +146,28 @@
 
 
 # 淇濆瓨l2鏁版嵁
-def save_l2_data(code, datas, add_datas, randomKey=None):
+def save_l2_data(code, datas, add_datas):
     redis = _redisManager.getRedis()
     # 鍙湁鏈夋柊鏇炬暟鎹墠闇�瑕佷繚瀛�
     if len(add_datas) > 0:
         # 淇濆瓨鏈�杩戠殑鏁版嵁
         __start_time = round(time.time() * 1000)
-        redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
-        l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂")
-        # 璁剧疆杩涘唴瀛�
-        local_latest_datas[code] = datas
-        __set_l2_data_latest_count(code, len(datas))
+        if datas:
+            redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
+            l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂")
+            # 璁剧疆杩涘唴瀛�
+            local_latest_datas[code] = datas
+            set_l2_data_latest_count(code, len(datas))
         try:
             log.logger_l2_data.info("{}-{}", code, add_datas)
         except Exception as e:
             logging.exception(e)
-        saveL2Data(code, add_datas)
+        # 鏆傛椂涓嶅皢鏁版嵁淇濆瓨鍒皉edis
+        # saveL2Data(code, add_datas)
 
 
 # 璁剧疆鏈�鏂扮殑l2鏁版嵁閲囬泦鐨勬暟閲�
-def __set_l2_data_latest_count(code, count):
+def set_l2_data_latest_count(code, count):
     redis = _redisManager.getRedis()
     key = "latest-l2-count-{}".format(code)
     redis.setex(key, 2, count)
@@ -164,12 +197,25 @@
     channel = data["channel"]
     capture_time = data["captureTime"]
     process_time = data["processTime"]
+    count = data["count"]
     data = data["data"]
-    limit_up_price = gpcode_manager.get_limit_up_price(code)
-
-    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
     # 鑾峰彇娑ㄥ仠浠�
-    return day, client, channel, code, capture_time, process_time, datas, data
+    return day, client, channel, code, capture_time, process_time, data, count
+
+
+# 鍏冩暟鎹槸鍚︽湁宸紓
+def is_origin_data_diffrent(data1, data2):
+    if data1 is None or data2 is None:
+        return True
+    if len(data1) != len(data2):
+        return True
+    # 姣旇緝
+    data_length = len(data1)
+    step = len(data1) // 10
+    for i in range(0, data_length, step):
+        if json.dumps(data1[i]) != json.dumps(data2[i]):
+            return True
+    return False
 
 
 # 鏄惁涓哄ぇ鍗�
@@ -259,7 +305,8 @@
                     # 淇濆瓨鍒版暟鎹簱锛屾洿鏂皉e鐨勬暟鎹�
                     save_list.append(_ldata)
         if len(save_list) > 0:
-            saveL2Data(code, save_list, "淇濆瓨绾犳鏁版嵁")
+            # 鏆傛椂涓嶅皢鏁版嵁淇濆瓨鍒皉edis
+            # saveL2Data(code, save_list, "淇濆瓨绾犳鏁版嵁")
             local_latest_datas[code] = latest_data
         return _datas
 
@@ -378,47 +425,115 @@
 
 
 class L2TradeQueueUtils(object):
+    # 涔板叆鏁版嵁鏄惁宸叉挙
+    @classmethod
+    def __is_cancel(cls, code, data, total_datas, local_today_num_operate_map):
+        val = data["val"]
+        cancel_datas = local_today_num_operate_map.get(
+            "{}-{}-{}".format(val["num"], "1", val["price"]))
+        # 鏄惁鏈変拱鎾ゆ暟鎹�
+        if cancel_datas:
+            for cancel_data in cancel_datas:
+                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
+                                                                                                 local_today_num_operate_map)
+                if buy_index == data["index"]:
+                    return True
+        return False
+
     # 鑾峰彇鎴愪氦杩涘害绱㈠紩
     @classmethod
-    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList, last_index,
+    def find_traded_progress_index(cls, code, buy_1_price, total_datas, local_today_num_operate_map, queueList,
+                                   last_index,
                                    latest_not_limit_up_time=None):
+
+        def find_traded_progress_index_simple(queues):
+            index_set = set()
+            for num in queues:
+                buy_datas = local_today_num_operate_map.get(
+                    "{}-{}-{}".format(num, "0", buy_1_price_format))
+                if buy_datas is not None and len(buy_datas) > 0:
+                    for data in buy_datas:
+                        # 鍦ㄦ渶杩戜竴娆¢潪娑ㄥ仠涔�1鏇存柊鐨勬椂闂翠箣鍚庢墠鏈夋晥
+                        if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
+                                                                                   latest_not_limit_up_time) >= 0:
+                            if data["index"] >= last_index:
+                                index_set.add(data["index"])
+            index_list = list(index_set)
+            index_list.sort()
+            num_list = []
+            new_index_list = []
+            for index in index_list:
+                for i in range(0, total_datas[index]["re"]):
+                    num_list.append(total_datas[index]["val"]["num"])
+                    new_index_list.append(index)
+            index_list_str = ",".join(list(map(str, num_list)))
+            queue_list_str = ",".join(list(map(str, queues)))
+            find_index = index_list_str.find(queue_list_str)
+            if find_index >= 0:
+                temp_str = index_list_str[0:find_index]
+                if temp_str.endswith(","):
+                    temp_str = temp_str[:-1]
+                if temp_str == "":
+                    return new_index_list[0], new_index_list[0:len(queues)]
+                start_index = len(temp_str.split(","))
+                return new_index_list[start_index], new_index_list[start_index:start_index + len(queues)]
+            return None, None
+
+        # 3涓暟鎹互涓婄殑涓嶉渶瑕佸垽鏂渶杩戠殑涓�娆℃湭娑ㄥ仠鏃堕棿
+        if len(queueList) >= 3:
+            latest_not_limit_up_time = None
+
+        # 鍒ゆ柇鍖归厤鐨勪綅缃槸鍚﹀彲淇�
+        def is_trust(indexes):
+            cha = []
+            for i in range(1, len(indexes)):
+                cha.append(indexes[i] - indexes[i - 1] - 1)
+            if len(cha) <= 1:
+                return True
+            # 鏍囧噯宸皬浜�1
+            std_result = numpy.std(cha)
+            if std_result < 10:
+                # 缁濆鍙俊
+                return True
+
+            for i in range(0, len(cha)):
+                if abs(cha[i]) > 10:
+                    # 鏈夎秴杩�10 鐨勯渶瑕佸垽鏂袱涓浉涓存暟鎹棿鐨勬湭鎾ょ殑涔板叆鏁伴噺
+                    buy_count = 0
+                    for index in range(indexes[i] + 1, indexes[i + 1] - 1):
+                        if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]):
+                            if not cls.__is_cancel(code, total_datas[index], total_datas, local_today_num_operate_map):
+                                buy_count += total_datas[index]["re"]
+                    # 鏆傚畾3涓宸寖鍥�
+                    if buy_count >= 3:
+                        return False
+            return True
+
         if len(queueList) == 0:
             return None
+        # last_index涓嶈兘鎾わ紝濡傛灉宸叉挙灏辨竻闆�
+        if cls.__is_cancel(code, total_datas[last_index], total_datas, local_today_num_operate_map):
+            last_index = 0
         # 琛ラ綈鏁存暟浣�5浣�
         buy_1_price_format = f"{buy_1_price}"
         while buy_1_price_format.find(".") < 4:
             buy_1_price_format = "0" + buy_1_price_format
-        index_set = set()
-        for num in queueList:
-            buy_datas = local_today_num_operate_map.get(
-                "{}-{}-{}".format(num, "0", buy_1_price_format))
-            if buy_datas is not None and len(buy_datas) > 0:
-                for data in buy_datas:
-                    # 鍦ㄦ渶杩戜竴娆¢潪娑ㄥ仠涔�1鏇存柊鐨勬椂闂翠箣鍚庢墠鏈夋晥
-                    if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
-                                                                               latest_not_limit_up_time) >= 0:
-                        if data["index"] >= last_index:
-                            index_set.add(data["index"])
-        index_list = list(index_set)
-        index_list.sort()
-        num_list = []
-        new_index_list = []
-        for index in index_list:
-            for i in range(0, total_datas[index]["re"]):
-                num_list.append(total_datas[index]["val"]["num"])
-                new_index_list.append(index)
-        index_list_str = ",".join(list(map(str, num_list)))
-        queue_list_str = ",".join(list(map(str, queueList)))
-        find_index = index_list_str.find(queue_list_str)
-        if find_index >= 0:
-            temp_str = index_list_str[0:find_index]
-            if temp_str.endswith(","):
-                temp_str = temp_str[:-1]
-            if temp_str == "":
-                return new_index_list[0]
-            return new_index_list[len(temp_str.split(","))]
+
+        # --------鍥犲瓙鏌ユ壘娉曪紙鍥犲瓙鐨勭獥鍙f渶澶т负锛歭en(queueList) ,鏈�灏忎负锛歭en(queueList)/2锛�---------
+        max_win_len = len(queueList)
+        min_win_len = len(queueList) // 2
+        if max_win_len == min_win_len:
+            min_win_len = max_win_len - 1
+        for win_len in range(max_win_len, min_win_len, -1):
+            # 绐楀彛绉诲姩
+            for i in range(0, max_win_len - win_len + 1):
+                queues = queueList[i:i + win_len]
+                f_start_index, f_indexs = find_traded_progress_index_simple(queues)
+                if f_start_index and is_trust(f_indexs):
+                    return f_start_index
+
         raise Exception("灏氭湭鎵惧埌鎴愪氦杩涘害")
 
 
 if __name__ == "__main__":
-    pass
+    print(load_l2_data("002235"))

--
Gitblit v1.8.0