""" L2相关数据处理 """ # L2交易队列 import datetime import decimal import json import logging import time import numpy import constant import gpcode_manager import l2_data_util from l2 import l2_data_log, l2_data_source_util import log from db import redis_manager import tool _redisManager = redis_manager.RedisManager(1) # l2数据管理 # 本地最新一次上传的数据 local_latest_datas = {} # 本地今日数据 local_today_datas = {} # 本地手数+操作那类型组成的临时变量 # 用于加快数据处理,用空换时间 local_today_num_operate_map = {} def load_l2_data(code, force=False): redis = _redisManager.getRedis() # 加载最近的l2数据 if local_latest_datas.get(code) is None or force: # 获取最近的数据 _data = redis.get("l2-data-latest-{}".format(code)) if _data is not None: if code in local_latest_datas: local_latest_datas[code] = json.loads(_data) else: local_latest_datas.setdefault(code, json.loads(_data)) # 获取今日的数据 if local_today_datas.get(code) is None or force: datas = log.load_l2_from_log() datas = datas.get(code) if datas is None: datas = [] local_today_datas[code] = datas data_normal = True if datas and len(datas) < datas[-1]["index"] + 1: data_normal = False # 从数据库加载 # datas = [] # keys = redis.keys("l2-{}-*".format(code)) # for k in keys: # value = redis.get(k) # _data = l2_data_util.l2_data_key_2_obj(k, value) # datas.append(_data) # # 排序 # new_datas = sorted(datas, # key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index'))) # local_today_datas[code] = new_datas # 根据今日数据加载 load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) return data_normal return True # 将数据根据num-operate分类 def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False): if local_today_num_operate_map.get(code) is None: local_today_num_operate_map[code] = {} if clear: local_today_num_operate_map[code] = {} for data in source_datas: key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"]) if local_today_num_operate_map[code].get(key) is None: local_today_num_operate_map[code].setdefault(key, []) local_today_num_operate_map[code].get(key).append(data) @tool.async_call def saveL2Data(code, datas, msg=""): start_time = round(time.time() * 1000) # 查询票是否在待监听的票里面 if not gpcode_manager.is_in_gp_pool(code): return None # 验证股价的正确性 redis_instance = _redisManager.getRedis() try: if redis_instance.setnx("l2-save-{}".format(code), "1") > 0: # 计算保留的时间 expire = tool.get_expire() i = 0 for _data in datas: i += 1 key = "l2-" + _data["key"] value = redis_instance.get(key) if value is None: # 新增 try: value = {"index": _data["index"], "re": _data["re"]} redis_instance.setex(key, expire, json.dumps(value)) except: logging.error("更正L2数据出错:{} key:{}".format(code, key)) else: json_value = json.loads(value) if json_value["re"] != _data["re"]: json_value["re"] = _data["re"] redis_instance.setex(key, expire, json.dumps(json_value)) finally: redis_instance.delete("l2-save-{}".format(code)) print("保存新数据用时:", msg, "耗时:{}".format(round(time.time() * 1000) - start_time)) return datas # 保存l2数据 def save_l2_data(code, datas, add_datas): redis = _redisManager.getRedis() # 只有有新曾数据才需要保存 if len(add_datas) > 0: # 保存最近的数据 __start_time = round(time.time() * 1000) redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时") # 设置进内存 local_latest_datas[code] = datas set_l2_data_latest_count(code, len(datas)) try: log.logger_l2_data.info("{}-{}", code, add_datas) except Exception as e: logging.exception(e) saveL2Data(code, add_datas) # 设置最新的l2数据采集的数量 def set_l2_data_latest_count(code, count): redis = _redisManager.getRedis() key = "latest-l2-count-{}".format(code) redis.setex(key, 2, count) pass # 获取代码最近的l2数据数量 def get_l2_data_latest_count(code): if code is None or len(code) < 1: return 0 redis = _redisManager.getRedis() key = "latest-l2-count-{}".format(code) result = redis.get(key) if result is None: return 0 else: return int(result) def parseL2Data(str): day = datetime.datetime.now().strftime("%Y%m%d") dict = json.loads(str) data = dict["data"] client = dict["client"] code = data["code"] channel = data["channel"] capture_time = data["captureTime"] process_time = data["processTime"] count = data["count"] data = data["data"] # 获取涨停价 return day, client, channel, code, capture_time, process_time, data, count # 元数据是否有差异 def is_origin_data_diffrent(data1, data2): if data1 is None or data2 is None: return True if len(data1) != len(data2): return True # 比较 data_length = len(data1) step = len(data1) // 10 for i in range(0, data_length, step): if json.dumps(data1[i]) != json.dumps(data2[i]): return True return False # 是否为大单 def is_big_money(val): price = float(val["price"]) money = price * int(val["num"]) if price > 3.0: if money >= 30000: return True else: return False else: max_money = price * 10000 if money >= max_money * 0.95: return True else: return False class L2DataUtil: @classmethod def is_same_time(cls, time1, time2): if constant.TEST: return True time1_s = time1.split(":") time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) time2_s = time2.split(":") time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) if abs(time2_second - time1_second) < 3: return True else: return False # 获取增量数据 @classmethod def get_add_data(cls, code, latest_datas, datas, _start_index): if datas is not None and len(datas) < 1: return [] last_data = None latest_datas_ = latest_datas if latest_datas_ is not None and len(latest_datas_) > 0: last_data = latest_datas_[-1] count = 0 start_index = -1 # 如果原来没有数据 # 设置add_data的序号 for n in reversed(datas): count += 1 if n["key"] == (last_data["key"] if last_data is not None else ""): start_index = len(datas) - count break _add_datas = [] if last_data is not None: if start_index < 0: if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second( last_data["val"]["time"]): _add_datas = datas else: _add_datas = [] elif start_index + 1 >= len(datas): _add_datas = [] else: _add_datas = datas[start_index + 1:] else: _add_datas = datas[start_index + 1:] for i in range(0, len(_add_datas)): _add_datas[i]["index"] = _start_index + i return _add_datas # 纠正数据,将re字段替换为较大值 @classmethod def correct_data(cls, code, latest_datas, _datas): latest_data = latest_datas if latest_data is None: latest_data = [] save_list = [] for data in _datas: for _ldata in latest_data: # 新数据条数比旧数据多才保存 if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]: max_re = max(_ldata["re"], data["re"]) _ldata["re"] = max_re data["re"] = max_re # 保存到数据库,更新re的数据 save_list.append(_ldata) if len(save_list) > 0: saveL2Data(code, save_list, "保存纠正数据") local_latest_datas[code] = latest_data return _datas # 处理l2数据 @classmethod def format_l2_data(cls, data, code, limit_up_price): datas = [] dataIndexs = {} same_time_num = {} for item in data: # 解析数据 time = item["time"] if time in same_time_num: same_time_num[time] = same_time_num[time] + 1 else: same_time_num[time] = 1 price = float(item["price"]) num = item["num"] limitPrice = item["limitPrice"] # 涨停价 if limit_up_price is not None: if limit_up_price == tool.to_price(decimal.Decimal(price)): limitPrice = 1 else: limitPrice = 0 item["limitPrice"] = "{}".format(limitPrice) operateType = item["operateType"] # 不需要非涨停买与买撤 if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1): continue cancelTime = item["cancelTime"] cancelTimeUnit = item["cancelTimeUnit"] key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, cancelTimeUnit) if key in dataIndexs: # 数据重复次数+1 datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 else: # 数据重复次数默认为1 datas.append({"key": key, "val": item, "re": 1}) dataIndexs.setdefault(key, len(datas) - 1) # TODO 测试的时候开启,方便记录大单数据 # l2_data_util.save_big_data(code, same_time_num, data) return datas @classmethod def get_time_as_second(cls, time_str): ts = time_str.split(":") return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) # @classmethod # def get_time_as_str(cls, time_seconds): # ts = time_str.split(":") # return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) # 是否是涨停价买 @classmethod def is_limit_up_price_buy(cls, val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 0: return False price = float(val["price"]) num = int(val["num"]) # if price * num * 100 < 50 * 10000: # return False return True # 是否为涨停卖 @classmethod def is_limit_up_price_sell(cls, val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 2: return False price = float(val["price"]) num = int(val["num"]) # if price * num * 100 < 50 * 10000: # return False return True # 是否涨停买撤 @classmethod def is_limit_up_price_buy_cancel(cls, val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 1: return False price = float(val["price"]) num = int(val["num"]) # if price * num * 100 < 50 * 10000: # return False return True # 是否卖撤 @classmethod def is_sell_cancel(cls, val): if int(val["operateType"]) == 3: return True return False # 是否为卖 @classmethod def is_sell(cls, val): if int(val["operateType"]) == 2: return True return False class L2TradeQueueUtils(object): # 买入数据是否已撤 @classmethod def __is_cancel(cls, code, data, total_datas, local_today_num_operate_map): val = data["val"] cancel_datas = local_today_num_operate_map.get( "{}-{}-{}".format(val["num"], "1", val["price"])) # 是否有买撤数据 if cancel_datas: for cancel_data in cancel_datas: buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map) if buy_index == data["index"]: return True return False # 获取成交进度索引 @classmethod def find_traded_progress_index(cls, code, buy_1_price, total_datas, local_today_num_operate_map, queueList, last_index, latest_not_limit_up_time=None): def find_traded_progress_index_simple(queues): index_set = set() for num in queues: buy_datas = local_today_num_operate_map.get( "{}-{}-{}".format(num, "0", buy_1_price_format)) if buy_datas is not None and len(buy_datas) > 0: for data in buy_datas: # 在最近一次非涨停买1更新的时间之后才有效 if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"], latest_not_limit_up_time) >= 0: if data["index"] >= last_index: index_set.add(data["index"]) index_list = list(index_set) index_list.sort() num_list = [] new_index_list = [] for index in index_list: for i in range(0, total_datas[index]["re"]): num_list.append(total_datas[index]["val"]["num"]) new_index_list.append(index) index_list_str = ",".join(list(map(str, num_list))) queue_list_str = ",".join(list(map(str, queues))) find_index = index_list_str.find(queue_list_str) if find_index >= 0: temp_str = index_list_str[0:find_index] if temp_str.endswith(","): temp_str = temp_str[:-1] if temp_str == "": return new_index_list[0], new_index_list[0:len(queues)] start_index = len(temp_str.split(",")) return new_index_list[start_index], new_index_list[start_index:start_index + len(queues)] return None, None # 3个数据以上的不需要判断最近的一次未涨停时间 if len(queueList) >= 3: latest_not_limit_up_time = None # 判断匹配的位置是否可信 def is_trust(indexes): cha = [] for i in range(1, len(indexes)): cha.append(indexes[i] - indexes[i - 1] - 1) if len(cha) <= 1: return True # 标准差小于1 std_result = numpy.std(cha) if std_result < 10: # 绝对可信 return True for i in range(0, len(cha)): if abs(cha[i]) > 10: # 有超过10 的需要判断两个相临数据间的未撤的买入数量 buy_count = 0 for index in range(indexes[i] + 1, indexes[i + 1] - 1): if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]): if not cls.__is_cancel(code, total_datas[index], total_datas, local_today_num_operate_map): buy_count += total_datas[index]["re"] # 暂定3个误差范围 if buy_count >= 3: return False return True if len(queueList) == 0: return None # last_index不能撤,如果已撤就清零 if cls.__is_cancel(code, total_datas[last_index], total_datas, local_today_num_operate_map): last_index = 0 # 补齐整数位5位 buy_1_price_format = f"{buy_1_price}" while buy_1_price_format.find(".") < 4: buy_1_price_format = "0" + buy_1_price_format # --------因子查找法(因子的窗口最大为:len(queueList) ,最小为:len(queueList)/2)--------- max_win_len = len(queueList) min_win_len = len(queueList) // 2 if max_win_len == min_win_len: min_win_len = max_win_len - 1 for win_len in range(max_win_len, min_win_len, -1): # 窗口移动 for i in range(0, max_win_len - win_len + 1): queues = queueList[i:i + win_len] f_start_index, f_indexs = find_traded_progress_index_simple(queues) if f_start_index and is_trust(f_indexs): return f_start_index raise Exception("尚未找到成交进度") if __name__ == "__main__": print(load_l2_data("002235"))