# l2数据工具 """ L2数据处理工具包 """ # 比较时间的大小 import json import time from db.redis_manager_delegate import RedisUtils from utils.tool import async_call from l2 import l2_data_manager from utils import tool def run_time(): def decorator(func): def infunc(*args, **kwargs): start = round(time.time() * 1000) result = func(args, **kwargs) print("执行时间", round(time.time() * 1000) - start) return result return infunc return decorator # 是否为大单 def is_big_money(val, is_ge=False): """ 判断是否为大单 @param val: l2数据 @param is_ge: 是否为创业板 @return: """ price = float(val["price"]) money = price * val["num"] if is_ge: if money >= 29900 or val["num"] >= 2999: return True else: return False else: if price > 3.0: if money >= 29900 or val["num"] >= 7999: return True else: return False else: max_money = price * 10000 if money >= max_money * 0.95: return True else: return False # 获取大资金的金额 def get_big_money_val(limit_up_price, is_ge=False): if is_ge: return min(299 * 10000, round(limit_up_price * 2999 * 100)) else: if limit_up_price > 3.0: return min(299 * 10000, round(limit_up_price * 7999 * 100)) else: max_money = limit_up_price * 10000 * 100 return int(max_money * 0.95) # if int(val["num"]) >= constant.BIG_MONEY_NUM: # return True # if int(val["num"]) * limit_up_price >= constant.BIG_MONEY_AMOUNT: # return True # return False_ def compare_time(time1, time2): result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2)) return result # 将key转为l2数据对象 def l2_data_key_2_obj(k, value): key = k.replace("l2-", "") split_data = key.split("-") code = split_data[0] operateType = split_data[1] time = split_data[2] num = split_data[3] price = split_data[4] limitPrice = split_data[5] cancelTime = split_data[6] cancelTimeUnit = split_data[7] item = {"operateType": operateType, "time": time, "num": num, "price": price, "limitPrice": limitPrice, "cancelTime": cancelTime, "cancelTimeUnit": cancelTimeUnit} json_value = json.loads(value) _data = {"key": key, "val": item, "re": json_value["re"], "index": int(json_value["index"])} return _data # 减去时间 def __sub_time(time_str, seconds): time_seconds = get_time_as_seconds(time_str) - seconds h = time_seconds // 3600 m = time_seconds % 3600 // 60 s = time_seconds % 60 return "{0:0>2}:{1:0>2}:{2:0>2}".format(h, m, s) def get_time_as_seconds(time_str): times = time_str.split(":") time_seconds = int(times[0]) * 3600 + int(times[1]) * 60 + int(times[2]) return time_seconds # 计算时间的区间 def compute_time_space_as_second(cancel_time, cancel_time_unit): __time = int(cancel_time) if int(cancel_time) == 0: return 0, 0 unit = int(cancel_time_unit) if unit == 0: # 秒 return __time, (__time + 1) elif unit == 1: # 分钟 return __time * 60, (__time + 1) * 60 elif unit == 2: # 小时 return __time * 3600, (__time + 1) * 3600 # 获取买入时间范围 def get_buy_time_range(cancel_data): # 计算时间区间 min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"], cancel_data["val"]["cancelTimeUnit"]) max_time = __sub_time(cancel_data["val"]["time"], min_space) min_time = __sub_time(cancel_data["val"]["time"], max_space) return min_time, max_time # 判断卖撤的卖信号是否在目标信号之前 def is_sell_index_before_target(sell_cancel_data, target_data, local_today_num_operate_map): min_space, max_space = compute_time_space_as_second(sell_cancel_data["val"]["cancelTime"], sell_cancel_data["val"]["cancelTimeUnit"]) max_time = __sub_time(sell_cancel_data["val"]["time"], min_space) min_time = __sub_time(sell_cancel_data["val"]["time"], max_space) # 如果最大值都在目标信号之前则信号肯定在目标信号之前 if int(target_data["val"]["time"].replace(":", "")) > int(max_time.replace(":", "")): return True sell_datas = local_today_num_operate_map.get( "{}-{}-{}".format(sell_cancel_data["val"]["num"], "2", sell_cancel_data["val"]["price"])) if sell_datas: for i in range(0, len(sell_datas)): data = sell_datas[i] if int(data["val"]["operateType"]) != 2: continue if int(data["val"]["num"]) != int(sell_cancel_data["val"]["num"]): continue if min_space == 0 and max_space == 0: # 本秒内 if compare_time(data["val"]["time"], min_time) == 0: return data["index"] < target_data["index"] # 数据在正确的区间 elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0: return data["index"] < target_data["index"] return False __last_big_data = {} @async_call def save_big_data(code, same_time_nums, datas): latest_datas = __last_big_data.get(code) d1 = json.dumps(datas) d2 = json.dumps(latest_datas) if latest_datas is not None and d1.strip() == d2.strip(): return None __last_big_data[code] = datas # 获取不一样的快照 if latest_datas is not None: for i in range(len(d1)): if d1[i] != d2[i]: # 保存快照 # logger_l2_big_data.debug("code:{} d1:{} d2:{}", code, d1[i - 60: i + 30], d2[i - 60: i + 30]) break time_str = tool.get_now_time_str() for time_ in same_time_nums: # 只保留最近3s内的大数据 if abs(get_time_as_seconds(time_str) - get_time_as_seconds(time_)) > 3: continue if same_time_nums[time_] > 20: RedisUtils.setex(l2_data_manager._redisManager.getRedis(), "big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(), d1) break # 保存l2最新数据的大小 # @async_call def save_l2_latest_data_number(code, num): RedisUtils.setex(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code), 3, num) # 获取最新数据条数 def get_l2_latest_data_number(code): num = RedisUtils.get(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code)) if num is not None: return int(num) return None # l2数据拼接工具 暂时还未启用 class L2DataConcatUtil: # 初始化 def __init__(self, code, last_datas, datas): self.last_datas = last_datas self.datas = datas self.code = code def __get_data_identity(self, data_): data = data_["val"] return "{}-{}-{}-{}-{}-{}".format(data.get("time"), data.get("num"), data.get("price"), data.get("operateType"), data.get("cancelTime"), data.get("cancelTimeUnit")) # 获取拼接的特征,获取最后3笔 def __get_concat_feature(self): # 最少需要3条数据+2条需要有特征点的数据 min_identity = 2 min_count = 3 identity_set = set() count = 0 start_index = -1 for i in range(len(self.last_datas) - 1, -1, -1): identity_set.add(self.__get_data_identity(self.last_datas[i])) count += 1 start_index = i if count >= min_count and len(identity_set) >= min_identity: break return start_index, len(self.last_datas) - 1 # 获取新增数据 def get_add_datas(self): # 查询当前数据是否在最近一次数据之后 if self.last_datas and self.datas: if int(self.datas[-1]["val"]["time"].replace(":", "")) - int( self.last_datas[-1]["val"]["time"].replace(":", "")) < 0: return [] # 获取拼接点 start_index, end_index = self.__get_concat_feature() if start_index < 0: return self.datas print("特征位置:", start_index, end_index) # 提取特征点的标识数据 identity_list = [] for i in range(start_index, end_index + 1): identity_list.append(self.__get_data_identity(self.last_datas[i])) # 查找完整的特征 identity_count = len(identity_list) for n in range(0, identity_count): # 每次遍历减少最前面一个特征量 for i in range(0, len(self.datas) - len(identity_list) + n): if self.__get_data_identity(self.datas[i]) == identity_list[n]: # n==0 表示完全匹配 , i=0 表示即使不是完全匹配,但必须新数据第一个元素匹配 if n == 0 or i == 0: find_identity = True for j in range(n + 1, len(identity_list)): if identity_list[j] != self.__get_data_identity(self.datas[i + j - n]): find_identity = False break if find_identity: return self.datas[i + len(identity_list) - n:] else: continue print("新数据中未找到特征标识") return self.datas def test_add_datas(): def load_data(datas): data_list = [] for data in datas: data_list.append({"val": {"time": data}}) return data_list # 不匹配 latest_datas = [] datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] latest_datas = load_data(latest_datas) datas = load_data(datas) print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) # 不匹配 latest_datas = ["10:00:02"] datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] latest_datas = load_data(latest_datas) datas = load_data(datas) print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) # 不匹配 latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"] datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] latest_datas = load_data(latest_datas) datas = load_data(datas) print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) # 匹配 latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"] datas = ["10:00:01", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] latest_datas = load_data(latest_datas) datas = load_data(datas) print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"] datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] latest_datas = load_data(latest_datas) datas = load_data(datas) print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"] datas = ["10:00:02", "10:00:02", "10:00:00", "10:00:01", "10:00:02", "10:00:02", "10:00:04", "10:00:05"] latest_datas = load_data(latest_datas) datas = load_data(datas) print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) def test(datas): datas["code"] = "test" if __name__ == "__main__": test_add_datas()