From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 18 六月 2025 18:41:30 +0800 Subject: [PATCH] 异常保护 --- l2_data_util.py | 299 +++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 245 insertions(+), 54 deletions(-) diff --git a/l2_data_util.py b/l2_data_util.py index 65ac266..24c66ac 100644 --- a/l2_data_util.py +++ b/l2_data_util.py @@ -1,13 +1,77 @@ # l2鏁版嵁宸ュ叿 +""" +L2鏁版嵁澶勭悊宸ュ叿鍖� +""" + # 姣旇緝鏃堕棿鐨勫ぇ灏� -import hashlib import json import time -import l2_data_manager -import tool -from log import logger_l2_trade, logger_l2_big_data -from trade_gui import async_call +from db.redis_manager_delegate import RedisUtils +from utils.tool import async_call + +from l2 import l2_data_manager +from utils import tool + + +def run_time(): + def decorator(func): + def infunc(*args, **kwargs): + start = round(time.time() * 1000) + result = func(args, **kwargs) + print("鎵ц鏃堕棿", round(time.time() * 1000) - start) + return result + + return infunc + + return decorator + + +# 鏄惁涓哄ぇ鍗� +def is_big_money(val, is_ge=False): + """ + 鍒ゆ柇鏄惁涓哄ぇ鍗� + @param val: l2鏁版嵁 + @param is_ge: 鏄惁涓哄垱涓氭澘 + @return: + """ + price = float(val["price"]) + money = round(price * val["num"], 2) + if is_ge: + if money >= 29900 or val["num"] >= 2999: + return True + else: + return False + else: + if price > 3.0: + if money >= 29900 or val["num"] >= 7999: + return True + else: + return False + else: + max_money = price * 10000 + if money >= max_money * 0.95: + return True + else: + return False + + +# 鑾峰彇澶ц祫閲戠殑閲戦 +def get_big_money_val(limit_up_price, is_ge=False): + if is_ge: + return min(299 * 10000, round(limit_up_price * 2900 * 100)) + else: + if limit_up_price > 3.0: + return min(299 * 10000, round(limit_up_price * 7999 * 100)) + else: + max_money = limit_up_price * 10000 * 100 + return int(max_money * 0.95) + +# if int(val["num"]) >= constant.BIG_MONEY_NUM: +# return True +# if int(val["num"]) * limit_up_price >= constant.BIG_MONEY_AMOUNT: +# return True +# return False_ def compare_time(time1, time2): @@ -34,20 +98,6 @@ return _data -# 灏嗘暟鎹牴鎹畁um-operate鍒嗙被 -def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False): - if local_today_num_operate_map.get(code) is None: - local_today_num_operate_map[code] = {} - if clear: - local_today_num_operate_map[code] = {} - - for data in source_datas: - key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"],data["val"]["price"]) - if local_today_num_operate_map[code].get(key) is None: - local_today_num_operate_map[code].setdefault(key, []) - local_today_num_operate_map[code].get(key).append(data) - - # 鍑忓幓鏃堕棿 def __sub_time(time_str, seconds): time_seconds = get_time_as_seconds(time_str) - seconds @@ -64,7 +114,7 @@ # 璁$畻鏃堕棿鐨勫尯闂� -def __compute_time_space_as_second(cancel_time, cancel_time_unit): +def compute_time_space_as_second(cancel_time, cancel_time_unit): __time = int(cancel_time) if int(cancel_time) == 0: return 0, 0 @@ -80,30 +130,42 @@ return __time * 3600, (__time + 1) * 3600 -# 鏍规嵁涔版挙鏁版嵁(涓庝粖鏃ユ�荤殑鏁版嵁)璁$畻涔板叆鏁版嵁 -def get_buy_data_with_cancel_data(cancel_data, local_today_num_operate_map): +# 鑾峰彇涔板叆鏃堕棿鑼冨洿 +def get_buy_time_range(cancel_data): # 璁$畻鏃堕棿鍖洪棿 - min_space, max_space = __compute_time_space_as_second(cancel_data["val"]["cancelTime"], - cancel_data["val"]["cancelTimeUnit"]) + min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"], + cancel_data["val"]["cancelTimeUnit"]) max_time = __sub_time(cancel_data["val"]["time"], min_space) min_time = __sub_time(cancel_data["val"]["time"], max_space) - buy_datas = local_today_num_operate_map.get("{}-{}-{}".format(cancel_data["val"]["num"], "0",cancel_data["val"]["price"])) - if buy_datas is None: - # 鏃犳暟鎹� - return None, None - for i in range(0, len(buy_datas)): - data = buy_datas[i] - if int(data["val"]["operateType"]) != 0: - continue - if int(data["val"]["num"]) != int(cancel_data["val"]["num"]): - continue - if min_space == 0 and max_space == 0: - if compare_time(data["val"]["time"], min_time) == 0: - return data["index"], data + return min_time, max_time - elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0: - return data["index"], data - return None, None + +# 鍒ゆ柇鍗栨挙鐨勫崠淇″彿鏄惁鍦ㄧ洰鏍囦俊鍙蜂箣鍓� +def is_sell_index_before_target(sell_cancel_data, target_data, local_today_num_operate_map): + min_space, max_space = compute_time_space_as_second(sell_cancel_data["val"]["cancelTime"], + sell_cancel_data["val"]["cancelTimeUnit"]) + max_time = __sub_time(sell_cancel_data["val"]["time"], min_space) + min_time = __sub_time(sell_cancel_data["val"]["time"], max_space) + # 濡傛灉鏈�澶у�奸兘鍦ㄧ洰鏍囦俊鍙蜂箣鍓嶅垯淇″彿鑲畾鍦ㄧ洰鏍囦俊鍙蜂箣鍓� + if int(target_data["val"]["time"].replace(":", "")) > int(max_time.replace(":", "")): + return True + sell_datas = local_today_num_operate_map.get( + "{}-{}-{}".format(sell_cancel_data["val"]["num"], "2", sell_cancel_data["val"]["price"])) + if sell_datas: + for i in range(0, len(sell_datas)): + data = sell_datas[i] + if int(data["val"]["operateType"]) != 2: + continue + if int(data["val"]["num"]) != int(sell_cancel_data["val"]["num"]): + continue + if min_space == 0 and max_space == 0: + # 鏈鍐� + if compare_time(data["val"]["time"], min_time) == 0: + return data["index"] < target_data["index"] + # 鏁版嵁鍦ㄦ纭殑鍖洪棿 + elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0: + return data["index"] < target_data["index"] + return False __last_big_data = {} @@ -111,7 +173,6 @@ @async_call def save_big_data(code, same_time_nums, datas): - return None latest_datas = __last_big_data.get(code) d1 = json.dumps(datas) d2 = json.dumps(latest_datas) @@ -125,12 +186,150 @@ # 淇濆瓨蹇収 # logger_l2_big_data.debug("code:{} d1:{} d2:{}", code, d1[i - 60: i + 30], d2[i - 60: i + 30]) break + time_str = tool.get_now_time_str() - for key in same_time_nums: - if same_time_nums[key] > 20: - redis = l2_data_manager._redisManager.getRedis() - redis.setex("big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(), d1) + for time_ in same_time_nums: + # 鍙繚鐣欐渶杩�3s鍐呯殑澶ф暟鎹� + if abs(get_time_as_seconds(time_str) - get_time_as_seconds(time_)) > 3: + continue + if same_time_nums[time_] > 20: + RedisUtils.setex(l2_data_manager._redisManager.getRedis(), + "big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(), + d1) break + + +# 淇濆瓨l2鏈�鏂版暟鎹殑澶у皬 +# @async_call +def save_l2_latest_data_number(code, num): + RedisUtils.setex(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code), 3, num) + + +# 鑾峰彇鏈�鏂版暟鎹潯鏁� +def get_l2_latest_data_number(code): + num = RedisUtils.get(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code)) + if num is not None: + return int(num) + return None + + +# l2鏁版嵁鎷兼帴宸ュ叿 鏆傛椂杩樻湭鍚敤 +class L2DataConcatUtil: + + # 鍒濆鍖� + def __init__(self, code, last_datas, datas): + self.last_datas = last_datas + self.datas = datas + self.code = code + + def __get_data_identity(self, data_): + data = data_["val"] + return "{}-{}-{}-{}-{}-{}".format(data.get("time"), data.get("num"), data.get("price"), data.get("operateType"), + data.get("cancelTime"), data.get("cancelTimeUnit")) + + # 鑾峰彇鎷兼帴鐨勭壒寰�,鑾峰彇鏈�鍚�3绗� + def __get_concat_feature(self): + # 鏈�灏戦渶瑕�3鏉℃暟鎹�+2鏉¢渶瑕佹湁鐗瑰緛鐐圭殑鏁版嵁 + min_identity = 2 + min_count = 3 + + identity_set = set() + count = 0 + start_index = -1 + for i in range(len(self.last_datas) - 1, -1, -1): + identity_set.add(self.__get_data_identity(self.last_datas[i])) + count += 1 + start_index = i + if count >= min_count and len(identity_set) >= min_identity: + break + return start_index, len(self.last_datas) - 1 + + # 鑾峰彇鏂板鏁版嵁 + def get_add_datas(self): + # 鏌ヨ褰撳墠鏁版嵁鏄惁鍦ㄦ渶杩戜竴娆℃暟鎹箣鍚� + if self.last_datas and self.datas: + if int(self.datas[-1]["val"]["time"].replace(":", "")) - int( + self.last_datas[-1]["val"]["time"].replace(":", "")) < 0: + return [] + + # 鑾峰彇鎷兼帴鐐� + start_index, end_index = self.__get_concat_feature() + if start_index < 0: + return self.datas + print("鐗瑰緛浣嶇疆锛�", start_index, end_index) + # 鎻愬彇鐗瑰緛鐐圭殑鏍囪瘑鏁版嵁 + identity_list = [] + for i in range(start_index, end_index + 1): + identity_list.append(self.__get_data_identity(self.last_datas[i])) + + # 鏌ユ壘瀹屾暣鐨勭壒寰� + identity_count = len(identity_list) + for n in range(0, identity_count): + # 姣忔閬嶅巻鍑忓皯鏈�鍓嶉潰涓�涓壒寰侀噺 + for i in range(0, len(self.datas) - len(identity_list) + n): + if self.__get_data_identity(self.datas[i]) == identity_list[n]: + # n==0 琛ㄧず瀹屽叏鍖归厤 锛� i=0 琛ㄧず鍗充娇涓嶆槸瀹屽叏鍖归厤锛屼絾蹇呴』鏂版暟鎹涓�涓厓绱犲尮閰� + if n == 0 or i == 0: + find_identity = True + for j in range(n + 1, len(identity_list)): + if identity_list[j] != self.__get_data_identity(self.datas[i + j - n]): + find_identity = False + break + + if find_identity: + return self.datas[i + len(identity_list) - n:] + else: + continue + print("鏂版暟鎹腑鏈壘鍒扮壒寰佹爣璇�") + return self.datas + + +def test_add_datas(): + def load_data(datas): + data_list = [] + for data in datas: + data_list.append({"val": {"time": data}}) + return data_list + + # 涓嶅尮閰� + latest_datas = [] + datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) + + # 涓嶅尮閰� + latest_datas = ["10:00:02"] + datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) + + # 涓嶅尮閰� + latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"] + datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) + + # 鍖归厤 + latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"] + datas = ["10:00:01", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) + + latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"] + datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) + + latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"] + datas = ["10:00:02", "10:00:02", "10:00:00", "10:00:01", "10:00:02", "10:00:02", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) def test(datas): @@ -138,12 +337,4 @@ if __name__ == "__main__": - # cancel_data = {"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 1, "time": "09:32:30"}} - # today_datas=[{"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 0, "time": "09:32:30"}},{"val": {"operateType": 0, "num": 1520, "cancelTime": 0, "cancelTimeUnit": 0, "time": "09:31:31"}}] - # result= get_buy_data_with_cancel_data(cancel_data,today_datas) - # print(result) - code = "001209" - l2_data_manager.load_l2_data(code) - total_datas = l2_data_manager.local_today_datas[code] - index, data = get_buy_data_with_cancel_data(total_datas[118], l2_data_manager.local_today_num_operate_map.get(code)) - print(index, data) + test_add_datas() -- Gitblit v1.8.0