| | |
| | | # l2数据工具 |
| | | """ |
| | | L2数据处理工具包 |
| | | """ |
| | | |
| | | # 比较时间的大小 |
| | | import hashlib |
| | | import json |
| | | import time |
| | | |
| | | import l2_data_manager |
| | | import tool |
| | | from log import logger_l2_trade, logger_l2_big_data |
| | | from trade_gui import async_call |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from utils.tool import async_call |
| | | |
| | | from l2 import l2_data_manager |
| | | from utils import tool |
| | | |
| | | |
| | | def run_time(): |
| | | def decorator(func): |
| | | def infunc(*args, **kwargs): |
| | | start = round(time.time() * 1000) |
| | | result = func(args, **kwargs) |
| | | print("执行时间", round(time.time() * 1000) - start) |
| | | return result |
| | | |
| | | return infunc |
| | | |
| | | return decorator |
| | | |
| | | |
| | | # 是否为大单 |
| | | def is_big_money(val, is_ge=False): |
| | | """ |
| | | 判断是否为大单 |
| | | @param val: l2数据 |
| | | @param is_ge: 是否为创业板 |
| | | @return: |
| | | """ |
| | | price = float(val["price"]) |
| | | money = round(price * val["num"], 2) |
| | | if is_ge: |
| | | if money >= 29900 or val["num"] >= 2999: |
| | | return True |
| | | else: |
| | | return False |
| | | else: |
| | | if price > 3.0: |
| | | if money >= 29900 or val["num"] >= 7999: |
| | | return True |
| | | else: |
| | | return False |
| | | else: |
| | | max_money = price * 10000 |
| | | if money >= max_money * 0.95: |
| | | return True |
| | | else: |
| | | return False |
| | | |
| | | |
| | | # 获取大资金的金额 |
| | | def get_big_money_val(limit_up_price, is_ge=False): |
| | | if is_ge: |
| | | return min(299 * 10000, round(limit_up_price * 2900 * 100)) |
| | | else: |
| | | if limit_up_price > 3.0: |
| | | return min(299 * 10000, round(limit_up_price * 7999 * 100)) |
| | | else: |
| | | max_money = limit_up_price * 10000 * 100 |
| | | return int(max_money * 0.95) |
| | | |
| | | # if int(val["num"]) >= constant.BIG_MONEY_NUM: |
| | | # return True |
| | | # if int(val["num"]) * limit_up_price >= constant.BIG_MONEY_AMOUNT: |
| | | # return True |
| | | # return False_ |
| | | |
| | | |
| | | def compare_time(time1, time2): |
| | |
| | | return _data |
| | | |
| | | |
| | | # 将数据根据num-operate分类 |
| | | def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False): |
| | | if local_today_num_operate_map.get(code) is None: |
| | | local_today_num_operate_map[code] = {} |
| | | if clear: |
| | | local_today_num_operate_map[code] = {} |
| | | |
| | | for data in source_datas: |
| | | key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"],data["val"]["price"]) |
| | | if local_today_num_operate_map[code].get(key) is None: |
| | | local_today_num_operate_map[code].setdefault(key, []) |
| | | local_today_num_operate_map[code].get(key).append(data) |
| | | |
| | | |
| | | # 减去时间 |
| | | def __sub_time(time_str, seconds): |
| | | time_seconds = get_time_as_seconds(time_str) - seconds |
| | |
| | | |
| | | |
| | | # 计算时间的区间 |
| | | def __compute_time_space_as_second(cancel_time, cancel_time_unit): |
| | | def compute_time_space_as_second(cancel_time, cancel_time_unit): |
| | | __time = int(cancel_time) |
| | | if int(cancel_time) == 0: |
| | | return 0, 0 |
| | |
| | | return __time * 3600, (__time + 1) * 3600 |
| | | |
| | | |
| | | # 根据买撤数据(与今日总的数据)计算买入数据 |
| | | def get_buy_data_with_cancel_data(cancel_data, local_today_num_operate_map): |
| | | # 获取买入时间范围 |
| | | def get_buy_time_range(cancel_data): |
| | | # 计算时间区间 |
| | | min_space, max_space = __compute_time_space_as_second(cancel_data["val"]["cancelTime"], |
| | | cancel_data["val"]["cancelTimeUnit"]) |
| | | min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"], |
| | | cancel_data["val"]["cancelTimeUnit"]) |
| | | max_time = __sub_time(cancel_data["val"]["time"], min_space) |
| | | min_time = __sub_time(cancel_data["val"]["time"], max_space) |
| | | buy_datas = local_today_num_operate_map.get("{}-{}-{}".format(cancel_data["val"]["num"], "0",cancel_data["val"]["price"])) |
| | | if buy_datas is None: |
| | | # 无数据 |
| | | return None, None |
| | | for i in range(0, len(buy_datas)): |
| | | data = buy_datas[i] |
| | | if int(data["val"]["operateType"]) != 0: |
| | | continue |
| | | if int(data["val"]["num"]) != int(cancel_data["val"]["num"]): |
| | | continue |
| | | if min_space == 0 and max_space == 0: |
| | | if compare_time(data["val"]["time"], min_time) == 0: |
| | | return data["index"], data |
| | | return min_time, max_time |
| | | |
| | | elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0: |
| | | return data["index"], data |
| | | return None, None |
| | | |
| | | # 判断卖撤的卖信号是否在目标信号之前 |
| | | def is_sell_index_before_target(sell_cancel_data, target_data, local_today_num_operate_map): |
| | | min_space, max_space = compute_time_space_as_second(sell_cancel_data["val"]["cancelTime"], |
| | | sell_cancel_data["val"]["cancelTimeUnit"]) |
| | | max_time = __sub_time(sell_cancel_data["val"]["time"], min_space) |
| | | min_time = __sub_time(sell_cancel_data["val"]["time"], max_space) |
| | | # 如果最大值都在目标信号之前则信号肯定在目标信号之前 |
| | | if int(target_data["val"]["time"].replace(":", "")) > int(max_time.replace(":", "")): |
| | | return True |
| | | sell_datas = local_today_num_operate_map.get( |
| | | "{}-{}-{}".format(sell_cancel_data["val"]["num"], "2", sell_cancel_data["val"]["price"])) |
| | | if sell_datas: |
| | | for i in range(0, len(sell_datas)): |
| | | data = sell_datas[i] |
| | | if int(data["val"]["operateType"]) != 2: |
| | | continue |
| | | if int(data["val"]["num"]) != int(sell_cancel_data["val"]["num"]): |
| | | continue |
| | | if min_space == 0 and max_space == 0: |
| | | # 本秒内 |
| | | if compare_time(data["val"]["time"], min_time) == 0: |
| | | return data["index"] < target_data["index"] |
| | | # 数据在正确的区间 |
| | | elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0: |
| | | return data["index"] < target_data["index"] |
| | | return False |
| | | |
| | | |
| | | __last_big_data = {} |
| | |
| | | |
| | | @async_call |
| | | def save_big_data(code, same_time_nums, datas): |
| | | return None |
| | | latest_datas = __last_big_data.get(code) |
| | | d1 = json.dumps(datas) |
| | | d2 = json.dumps(latest_datas) |
| | |
| | | # 保存快照 |
| | | # logger_l2_big_data.debug("code:{} d1:{} d2:{}", code, d1[i - 60: i + 30], d2[i - 60: i + 30]) |
| | | break |
| | | time_str = tool.get_now_time_str() |
| | | |
| | | for key in same_time_nums: |
| | | if same_time_nums[key] > 20: |
| | | redis = l2_data_manager._redisManager.getRedis() |
| | | redis.setex("big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(), d1) |
| | | for time_ in same_time_nums: |
| | | # 只保留最近3s内的大数据 |
| | | if abs(get_time_as_seconds(time_str) - get_time_as_seconds(time_)) > 3: |
| | | continue |
| | | if same_time_nums[time_] > 20: |
| | | RedisUtils.setex(l2_data_manager._redisManager.getRedis(), |
| | | "big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(), |
| | | d1) |
| | | break |
| | | |
| | | |
| | | # 保存l2最新数据的大小 |
| | | # @async_call |
| | | def save_l2_latest_data_number(code, num): |
| | | RedisUtils.setex(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code), 3, num) |
| | | |
| | | |
| | | # 获取最新数据条数 |
| | | def get_l2_latest_data_number(code): |
| | | num = RedisUtils.get(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code)) |
| | | if num is not None: |
| | | return int(num) |
| | | return None |
| | | |
| | | |
| | | # l2数据拼接工具 暂时还未启用 |
| | | class L2DataConcatUtil: |
| | | |
| | | # 初始化 |
| | | def __init__(self, code, last_datas, datas): |
| | | self.last_datas = last_datas |
| | | self.datas = datas |
| | | self.code = code |
| | | |
| | | def __get_data_identity(self, data_): |
| | | data = data_["val"] |
| | | return "{}-{}-{}-{}-{}-{}".format(data.get("time"), data.get("num"), data.get("price"), data.get("operateType"), |
| | | data.get("cancelTime"), data.get("cancelTimeUnit")) |
| | | |
| | | # 获取拼接的特征,获取最后3笔 |
| | | def __get_concat_feature(self): |
| | | # 最少需要3条数据+2条需要有特征点的数据 |
| | | min_identity = 2 |
| | | min_count = 3 |
| | | |
| | | identity_set = set() |
| | | count = 0 |
| | | start_index = -1 |
| | | for i in range(len(self.last_datas) - 1, -1, -1): |
| | | identity_set.add(self.__get_data_identity(self.last_datas[i])) |
| | | count += 1 |
| | | start_index = i |
| | | if count >= min_count and len(identity_set) >= min_identity: |
| | | break |
| | | return start_index, len(self.last_datas) - 1 |
| | | |
| | | # 获取新增数据 |
| | | def get_add_datas(self): |
| | | # 查询当前数据是否在最近一次数据之后 |
| | | if self.last_datas and self.datas: |
| | | if int(self.datas[-1]["val"]["time"].replace(":", "")) - int( |
| | | self.last_datas[-1]["val"]["time"].replace(":", "")) < 0: |
| | | return [] |
| | | |
| | | # 获取拼接点 |
| | | start_index, end_index = self.__get_concat_feature() |
| | | if start_index < 0: |
| | | return self.datas |
| | | print("特征位置:", start_index, end_index) |
| | | # 提取特征点的标识数据 |
| | | identity_list = [] |
| | | for i in range(start_index, end_index + 1): |
| | | identity_list.append(self.__get_data_identity(self.last_datas[i])) |
| | | |
| | | # 查找完整的特征 |
| | | identity_count = len(identity_list) |
| | | for n in range(0, identity_count): |
| | | # 每次遍历减少最前面一个特征量 |
| | | for i in range(0, len(self.datas) - len(identity_list) + n): |
| | | if self.__get_data_identity(self.datas[i]) == identity_list[n]: |
| | | # n==0 表示完全匹配 , i=0 表示即使不是完全匹配,但必须新数据第一个元素匹配 |
| | | if n == 0 or i == 0: |
| | | find_identity = True |
| | | for j in range(n + 1, len(identity_list)): |
| | | if identity_list[j] != self.__get_data_identity(self.datas[i + j - n]): |
| | | find_identity = False |
| | | break |
| | | |
| | | if find_identity: |
| | | return self.datas[i + len(identity_list) - n:] |
| | | else: |
| | | continue |
| | | print("新数据中未找到特征标识") |
| | | return self.datas |
| | | |
| | | |
| | | def test_add_datas(): |
| | | def load_data(datas): |
| | | data_list = [] |
| | | for data in datas: |
| | | data_list.append({"val": {"time": data}}) |
| | | return data_list |
| | | |
| | | # 不匹配 |
| | | latest_datas = [] |
| | | datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] |
| | | latest_datas = load_data(latest_datas) |
| | | datas = load_data(datas) |
| | | print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) |
| | | |
| | | # 不匹配 |
| | | latest_datas = ["10:00:02"] |
| | | datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] |
| | | latest_datas = load_data(latest_datas) |
| | | datas = load_data(datas) |
| | | print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) |
| | | |
| | | # 不匹配 |
| | | latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"] |
| | | datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] |
| | | latest_datas = load_data(latest_datas) |
| | | datas = load_data(datas) |
| | | print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) |
| | | |
| | | # 匹配 |
| | | latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"] |
| | | datas = ["10:00:01", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] |
| | | latest_datas = load_data(latest_datas) |
| | | datas = load_data(datas) |
| | | print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) |
| | | |
| | | latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"] |
| | | datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] |
| | | latest_datas = load_data(latest_datas) |
| | | datas = load_data(datas) |
| | | print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) |
| | | |
| | | latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"] |
| | | datas = ["10:00:02", "10:00:02", "10:00:00", "10:00:01", "10:00:02", "10:00:02", "10:00:04", "10:00:05"] |
| | | latest_datas = load_data(latest_datas) |
| | | datas = load_data(datas) |
| | | print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) |
| | | |
| | | |
| | | def test(datas): |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | # cancel_data = {"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 1, "time": "09:32:30"}} |
| | | # today_datas=[{"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 0, "time": "09:32:30"}},{"val": {"operateType": 0, "num": 1520, "cancelTime": 0, "cancelTimeUnit": 0, "time": "09:31:31"}}] |
| | | # result= get_buy_data_with_cancel_data(cancel_data,today_datas) |
| | | # print(result) |
| | | code = "001209" |
| | | l2_data_manager.load_l2_data(code) |
| | | total_datas = l2_data_manager.local_today_datas[code] |
| | | index, data = get_buy_data_with_cancel_data(total_datas[118], l2_data_manager.local_today_num_operate_map.get(code)) |
| | | print(index, data) |
| | | test_add_datas() |