From 736e61b89e87f7e3c224feca25e94cda459b9ae6 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 06 二月 2023 15:26:05 +0800
Subject: [PATCH] H撤完善,修改代码文件目录

---
 l2/l2_data_util.py |  376 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 372 insertions(+), 4 deletions(-)

diff --git a/l2/l2_data_util.py b/l2/l2_data_util.py
index 5495d40..8360263 100644
--- a/l2/l2_data_util.py
+++ b/l2/l2_data_util.py
@@ -2,20 +2,385 @@
 L2鐩稿叧鏁版嵁澶勭悊
 """
 
-
 # L2浜ゆ槗闃熷垪
+import datetime
+import decimal
+import json
+import logging
+import time
+
+import constant
+import gpcode_manager
+import l2_data_log
+import log
+import redis_manager
+import tool
+
+_redisManager = redis_manager.RedisManager(1)
+# l2鏁版嵁绠$悊
+# 鏈湴鏈�鏂颁竴娆′笂浼犵殑鏁版嵁
+local_latest_datas = {}
+# 鏈湴浠婃棩鏁版嵁
+local_today_datas = {}
+# 鏈湴鎵嬫暟+鎿嶄綔閭g被鍨嬬粍鎴愮殑涓存椂鍙橀噺
+# 鐢ㄤ簬鍔犲揩鏁版嵁澶勭悊锛岀敤绌烘崲鏃堕棿
+local_today_num_operate_map = {}
+
+
+def load_l2_data(code, force=False):
+    redis = _redisManager.getRedis()
+    # 鍔犺浇鏈�杩戠殑l2鏁版嵁
+    if local_latest_datas.get(code) is None or force:
+        # 鑾峰彇鏈�杩戠殑鏁版嵁
+        _data = redis.get("l2-data-latest-{}".format(code))
+        if _data is not None:
+            if code in local_latest_datas:
+                local_latest_datas[code] = json.loads(_data)
+            else:
+                local_latest_datas.setdefault(code, json.loads(_data))
+        # 鑾峰彇浠婃棩鐨勬暟鎹�
+
+    if local_today_datas.get(code) is None or force:
+        datas = log.load_l2_from_log()
+        datas = datas.get(code)
+        if datas is None:
+            datas = []
+        local_today_datas[code] = datas
+
+        # 浠庢暟鎹簱鍔犺浇
+        # datas = []
+        # keys = redis.keys("l2-{}-*".format(code))
+        # for k in keys:
+        #     value = redis.get(k)
+        #     _data = l2_data_util.l2_data_key_2_obj(k, value)
+        #     datas.append(_data)
+        # # 鎺掑簭
+        # new_datas = sorted(datas,
+        #                    key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
+        # local_today_datas[code] = new_datas
+        # 鏍规嵁浠婃棩鏁版嵁鍔犺浇
+        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
+
+
+# 灏嗘暟鎹牴鎹畁um-operate鍒嗙被
+def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False):
+    if local_today_num_operate_map.get(code) is None:
+        local_today_num_operate_map[code] = {}
+    if clear:
+        local_today_num_operate_map[code] = {}
+
+    for data in source_datas:
+        key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"])
+        if local_today_num_operate_map[code].get(key) is None:
+            local_today_num_operate_map[code].setdefault(key, [])
+        local_today_num_operate_map[code].get(key).append(data)
+
+
+@tool.async_call
+def saveL2Data(code, datas, msg=""):
+    start_time = round(time.time() * 1000)
+    # 鏌ヨ绁ㄦ槸鍚﹀湪寰呯洃鍚殑绁ㄩ噷闈�
+    if not gpcode_manager.is_in_gp_pool(code):
+        return None
+    # 楠岃瘉鑲′环鐨勬纭��
+    redis_instance = _redisManager.getRedis()
+
+    try:
+        if redis_instance.setnx("l2-save-{}".format(code), "1") > 0:
+
+            # 璁$畻淇濈暀鐨勬椂闂�
+            expire = tool.get_expire()
+            i = 0
+            for _data in datas:
+                i += 1
+                key = "l2-" + _data["key"]
+                value = redis_instance.get(key)
+                if value is None:
+                    # 鏂板
+                    try:
+                        value = {"index": _data["index"], "re": _data["re"]}
+                        redis_instance.setex(key, expire, json.dumps(value))
+                    except:
+                        logging.error("鏇存L2鏁版嵁鍑洪敊锛歿} key:{}".format(code, key))
+                else:
+                    json_value = json.loads(value)
+                    if json_value["re"] != _data["re"]:
+                        json_value["re"] = _data["re"]
+                        redis_instance.setex(key, expire, json.dumps(json_value))
+    finally:
+        redis_instance.delete("l2-save-{}".format(code))
+
+    print("淇濆瓨鏂版暟鎹敤鏃讹細", msg, "鑰楁椂锛歿}".format(round(time.time() * 1000) - start_time))
+    return datas
+
+
+# 淇濆瓨l2鏁版嵁
+def save_l2_data(code, datas, add_datas, randomKey=None):
+    redis = _redisManager.getRedis()
+    # 鍙湁鏈夋柊鏇炬暟鎹墠闇�瑕佷繚瀛�
+    if len(add_datas) > 0:
+        # 淇濆瓨鏈�杩戠殑鏁版嵁
+        __start_time = round(time.time() * 1000)
+        redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
+        l2_data_log.l2_time(code, randomKey, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂")
+        # 璁剧疆杩涘唴瀛�
+        local_latest_datas[code] = datas
+        __set_l2_data_latest_count(code, len(datas))
+        try:
+            log.logger_l2_data.info("{}-{}", code, add_datas)
+        except Exception as e:
+            logging.exception(e)
+        saveL2Data(code, add_datas)
+
+
+# 璁剧疆鏈�鏂扮殑l2鏁版嵁閲囬泦鐨勬暟閲�
+def __set_l2_data_latest_count(code, count):
+    redis = _redisManager.getRedis()
+    key = "latest-l2-count-{}".format(code)
+    redis.setex(key, 2, count)
+    pass
+
+
+# 鑾峰彇浠g爜鏈�杩戠殑l2鏁版嵁鏁伴噺
+def get_l2_data_latest_count(code):
+    if code is None or len(code) < 1:
+        return 0
+    redis = _redisManager.getRedis()
+    key = "latest-l2-count-{}".format(code)
+
+    result = redis.get(key)
+    if result is None:
+        return 0
+    else:
+        return int(result)
+
+
+def parseL2Data(str):
+    day = datetime.datetime.now().strftime("%Y%m%d")
+    dict = json.loads(str)
+    data = dict["data"]
+    client = dict["client"]
+    code = data["code"]
+    channel = data["channel"]
+    capture_time = data["captureTime"]
+    process_time = data["processTime"]
+    data = data["data"]
+    limit_up_price = gpcode_manager.get_limit_up_price(code)
+
+    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
+    # 鑾峰彇娑ㄥ仠浠�
+    return day, client, channel, code, capture_time, process_time, datas, data
+
+
+class L2DataUtil:
+    @classmethod
+    def is_same_time(cls, time1, time2):
+        if constant.TEST:
+            return True
+        time1_s = time1.split(":")
+        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
+        time2_s = time2.split(":")
+        time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
+        if abs(time2_second - time1_second) < 3:
+            return True
+        else:
+            return False
+
+    # 鑾峰彇澧為噺鏁版嵁
+    @classmethod
+    def get_add_data(cls, code, latest_datas, datas, _start_index):
+        if datas is not None and len(datas) < 1:
+            return []
+        last_data = None
+        latest_datas_ = latest_datas
+        if latest_datas_ is not None and len(latest_datas_) > 0:
+            last_data = latest_datas_[-1]
+
+        count = 0
+        start_index = -1
+        # 濡傛灉鍘熸潵娌℃湁鏁版嵁
+        # 璁剧疆add_data鐨勫簭鍙�
+        for n in reversed(datas):
+            count += 1
+            if n["key"] == (last_data["key"] if last_data is not None else ""):
+                start_index = len(datas) - count
+                break
+
+        _add_datas = []
+        if last_data is not None:
+            if start_index < 0:
+                if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second(
+                        last_data["val"]["time"]):
+                    _add_datas = datas
+                else:
+                    _add_datas = []
+            elif start_index + 1 >= len(datas):
+                _add_datas = []
+            else:
+                _add_datas = datas[start_index + 1:]
+        else:
+            _add_datas = datas[start_index + 1:]
+        for i in range(0, len(_add_datas)):
+            _add_datas[i]["index"] = _start_index + i
+
+        return _add_datas
+
+    # 绾犳鏁版嵁锛屽皢re瀛楁鏇挎崲涓鸿緝澶у��
+    @classmethod
+    def correct_data(cls, code, latest_datas, _datas):
+        latest_data = latest_datas
+        if latest_data is None:
+            latest_data = []
+        save_list = []
+        for data in _datas:
+            for _ldata in latest_data:
+                # 鏂版暟鎹潯鏁版瘮鏃ф暟鎹鎵嶄繚瀛�
+                if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]:
+                    max_re = max(_ldata["re"], data["re"])
+                    _ldata["re"] = max_re
+                    data["re"] = max_re
+                    # 淇濆瓨鍒版暟鎹簱锛屾洿鏂皉e鐨勬暟鎹�
+                    save_list.append(_ldata)
+        if len(save_list) > 0:
+            saveL2Data(code, save_list, "淇濆瓨绾犳鏁版嵁")
+            local_latest_datas[code] = latest_data
+        return _datas
+
+    # 澶勭悊l2鏁版嵁
+    @classmethod
+    def format_l2_data(cls, data, code, limit_up_price):
+        datas = []
+        dataIndexs = {}
+        same_time_num = {}
+        for item in data:
+            # 瑙f瀽鏁版嵁
+            time = item["time"]
+            if time in same_time_num:
+                same_time_num[time] = same_time_num[time] + 1
+            else:
+                same_time_num[time] = 1
+
+            price = float(item["price"])
+            num = item["num"]
+            limitPrice = item["limitPrice"]
+            # 娑ㄥ仠浠�
+            if limit_up_price is not None:
+                if limit_up_price == tool.to_price(decimal.Decimal(price)):
+                    limitPrice = 1
+                else:
+                    limitPrice = 0
+                item["limitPrice"] = "{}".format(limitPrice)
+            operateType = item["operateType"]
+            # 涓嶉渶瑕侀潪娑ㄥ仠涔颁笌涔版挙
+            if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1):
+                continue
+
+            cancelTime = item["cancelTime"]
+            cancelTimeUnit = item["cancelTimeUnit"]
+            key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
+                                                   cancelTimeUnit)
+            if key in dataIndexs:
+                # 鏁版嵁閲嶅娆℃暟+1
+                datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
+            else:
+                # 鏁版嵁閲嶅娆℃暟榛樿涓�1
+                datas.append({"key": key, "val": item, "re": 1})
+                dataIndexs.setdefault(key, len(datas) - 1)
+        # TODO 娴嬭瘯鐨勬椂鍊欏紑鍚紝鏂逛究璁板綍澶у崟鏁版嵁
+        # l2_data_util.save_big_data(code, same_time_num, data)
+        return datas
+
+    @classmethod
+    def get_time_as_second(cls, time_str):
+        ts = time_str.split(":")
+        return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
+
+    # @classmethod
+    # def get_time_as_str(cls, time_seconds):
+    #     ts = time_str.split(":")
+    #     return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
+
+    # 鏄惁鏄定鍋滀环涔�
+    @classmethod
+    def is_limit_up_price_buy(cls, val):
+        if int(val["limitPrice"]) != 1:
+            return False
+
+        if int(val["operateType"]) != 0:
+            return False
+
+        price = float(val["price"])
+        num = int(val["num"])
+        # if price * num * 100 < 50 * 10000:
+        #     return False
+        return True
+
+    # 鏄惁涓烘定鍋滃崠
+    @classmethod
+    def is_limit_up_price_sell(cls, val):
+        if int(val["limitPrice"]) != 1:
+            return False
+
+        if int(val["operateType"]) != 2:
+            return False
+
+        price = float(val["price"])
+        num = int(val["num"])
+        # if price * num * 100 < 50 * 10000:
+        #     return False
+        return True
+
+    # 鏄惁娑ㄥ仠涔版挙
+    @classmethod
+    def is_limit_up_price_buy_cancel(cls, val):
+        if int(val["limitPrice"]) != 1:
+            return False
+
+        if int(val["operateType"]) != 1:
+            return False
+
+        price = float(val["price"])
+        num = int(val["num"])
+        # if price * num * 100 < 50 * 10000:
+        #     return False
+        return True
+
+    # 鏄惁鍗栨挙
+    @classmethod
+    def is_sell_cancel(cls, val):
+        if int(val["operateType"]) == 3:
+            return True
+        return False
+
+    # 鏄惁涓哄崠
+    @classmethod
+    def is_sell(cls, val):
+        if int(val["operateType"]) == 2:
+            return True
+        return False
+
+
 class L2TradeQueueUtils(object):
     # 鑾峰彇鎴愪氦杩涘害绱㈠紩
-    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList):
+    @classmethod
+    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList,
+                                   latest_not_limit_up_time=None):
         if len(queueList) == 0:
             return None
+        # 琛ラ綈鏁存暟浣�5浣�
+        buy_1_price_format = f"{buy_1_price}"
+        while buy_1_price_format.find(".") < 4:
+            buy_1_price_format = "0" + buy_1_price_format
         index_set = set()
         for num in queueList:
             buy_datas = local_today_num_operate_map.get(
-                "{}-{}-{}".format(num, "0", buy_1_price))
+                "{}-{}-{}".format(num, "0", buy_1_price_format))
             if buy_datas is not None and len(buy_datas) > 0:
                 for data in buy_datas:
-                    index_set.add(data["index"])
+                    # 鍦ㄦ渶杩戜竴娆¢潪娑ㄥ仠涔�1鏇存柊鐨勬椂闂翠箣鍚庢墠鏈夋晥
+                    if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
+                                                                               latest_not_limit_up_time) >= 0:
+                        index_set.add(data["index"])
         index_list = list(index_set)
         index_list.sort()
         num_list = []
@@ -29,6 +394,9 @@
         find_index = index_list_str.find(queue_list_str)
         if find_index >= 0:
             temp_str = index_list_str[0:find_index]
+            if temp_str.endswith(","):
+                temp_str = temp_str[:-1]
+
             return new_index_list[len(temp_str.split(","))]
         raise Exception("灏氭湭鎵惧埌鎴愪氦杩涘害")
 

--
Gitblit v1.8.0