From ba52d7ac92a36f413eacaa686f8535e859664ec6 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 28 八月 2023 09:45:11 +0800 Subject: [PATCH] bug修改 --- l2/l2_data_util.py | 267 +++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 199 insertions(+), 68 deletions(-) diff --git a/l2/l2_data_util.py b/l2/l2_data_util.py index 8360263..633de90 100644 --- a/l2/l2_data_util.py +++ b/l2/l2_data_util.py @@ -9,12 +9,15 @@ import logging import time +import numpy + import constant -import gpcode_manager -import l2_data_log -import log -import redis_manager -import tool +from code_attribute import gpcode_manager +from db.redis_manager_delegate import RedisUtils +from l2 import l2_data_log, l2_data_source_util +from log_module import log, log_export +from db import redis_manager_delegate as redis_manager +from utils import tool _redisManager = redis_manager.RedisManager(1) # l2鏁版嵁绠$悊 @@ -26,26 +29,32 @@ # 鐢ㄤ簬鍔犲揩鏁版嵁澶勭悊锛岀敤绌烘崲鏃堕棿 local_today_num_operate_map = {} +# 涔板叆璁㈠崟鍙锋槧灏�,鍙湁鍘熺敓鐨凩2鏁版嵁鎵嶆湁 +local_today_buyno_map = {} -def load_l2_data(code, force=False): - redis = _redisManager.getRedis() + +def load_l2_data(code, load_latest=True, force=False): # 鍔犺浇鏈�杩戠殑l2鏁版嵁 - if local_latest_datas.get(code) is None or force: - # 鑾峰彇鏈�杩戠殑鏁版嵁 - _data = redis.get("l2-data-latest-{}".format(code)) - if _data is not None: - if code in local_latest_datas: - local_latest_datas[code] = json.loads(_data) - else: - local_latest_datas.setdefault(code, json.loads(_data)) + if load_latest: + if local_latest_datas.get(code) is None or force: + # 鑾峰彇鏈�杩戠殑鏁版嵁 + _data = RedisUtils.get(_redisManager.getRedis(), "l2-data-latest-{}".format(code)) + if _data is not None: + if code in local_latest_datas: + local_latest_datas[code] = json.loads(_data) + else: + local_latest_datas.setdefault(code, json.loads(_data)) # 鑾峰彇浠婃棩鐨勬暟鎹� if local_today_datas.get(code) is None or force: - datas = log.load_l2_from_log() + datas = log_export.load_l2_from_log() datas = datas.get(code) if datas is None: datas = [] local_today_datas[code] = datas + data_normal = True + if datas and len(datas) < datas[-1]["index"] + 1: + data_normal = False # 浠庢暟鎹簱鍔犺浇 # datas = [] @@ -60,6 +69,9 @@ # local_today_datas[code] = new_datas # 鏍规嵁浠婃棩鏁版嵁鍔犺浇 load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) + load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force) + return data_normal + return True # 灏嗘暟鎹牴鎹畁um-operate鍒嗙被 @@ -76,6 +88,25 @@ local_today_num_operate_map[code].get(key).append(data) +# 灏嗘暟鎹牴鎹畂rderNo鍒嗙被,鍘熺敓鏁版嵁鎵嶆湁 +def load_buy_no_map(local_today_buyno_map, code, source_datas, clear=False): + # 鍙湁鍘熺敓L2鏁版嵁鎵嶄細鏈夋鎿嶄綔 + if constant.L2_SOURCE_TYPE != constant.L2_SOURCE_TYPE_HUAXIN: + return + if local_today_buyno_map.get(code) is None: + local_today_buyno_map[code] = {} + if clear: + local_today_buyno_map[code] = {} + + for data in source_datas: + if data["val"]["operateType"] != 0: + continue + # 鍙~鍏呬拱鍏ユ暟鎹� + key = "{}".format(data["val"]["orderNo"]) + if local_today_buyno_map[code].get(key) is None: + local_today_buyno_map[code].setdefault(key, data) + + @tool.async_call def saveL2Data(code, datas, msg=""): start_time = round(time.time() * 1000) @@ -86,58 +117,58 @@ redis_instance = _redisManager.getRedis() try: - if redis_instance.setnx("l2-save-{}".format(code), "1") > 0: - + if RedisUtils.setnx(redis_instance, "l2-save-{}".format(code), "1", auto_free=False) > 0: # 璁$畻淇濈暀鐨勬椂闂� expire = tool.get_expire() i = 0 for _data in datas: i += 1 key = "l2-" + _data["key"] - value = redis_instance.get(key) + value = RedisUtils.get(redis_instance, key, auto_free=False) if value is None: # 鏂板 try: value = {"index": _data["index"], "re": _data["re"]} - redis_instance.setex(key, expire, json.dumps(value)) + RedisUtils.setex(redis_instance, key, expire, json.dumps(value), auto_free=False) except: logging.error("鏇存L2鏁版嵁鍑洪敊锛歿} key:{}".format(code, key)) else: json_value = json.loads(value) if json_value["re"] != _data["re"]: json_value["re"] = _data["re"] - redis_instance.setex(key, expire, json.dumps(json_value)) + RedisUtils.setex(redis_instance, key, expire, json.dumps(json_value), auto_free=False) finally: - redis_instance.delete("l2-save-{}".format(code)) + RedisUtils.delete(redis_instance, "l2-save-{}".format(code), auto_free=False) + RedisUtils.realse(redis_instance) print("淇濆瓨鏂版暟鎹敤鏃讹細", msg, "鑰楁椂锛歿}".format(round(time.time() * 1000) - start_time)) return datas # 淇濆瓨l2鏁版嵁 -def save_l2_data(code, datas, add_datas, randomKey=None): - redis = _redisManager.getRedis() +def save_l2_data(code, datas, add_datas): # 鍙湁鏈夋柊鏇炬暟鎹墠闇�瑕佷繚瀛� if len(add_datas) > 0: # 淇濆瓨鏈�杩戠殑鏁版嵁 __start_time = round(time.time() * 1000) - redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) - l2_data_log.l2_time(code, randomKey, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂") - # 璁剧疆杩涘唴瀛� - local_latest_datas[code] = datas - __set_l2_data_latest_count(code, len(datas)) + if datas: + RedisUtils.setex_async(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) + # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂") + # 璁剧疆杩涘唴瀛� + local_latest_datas[code] = datas + set_l2_data_latest_count(code, len(datas)) try: log.logger_l2_data.info("{}-{}", code, add_datas) except Exception as e: logging.exception(e) - saveL2Data(code, add_datas) + # 鏆傛椂涓嶅皢鏁版嵁淇濆瓨鍒皉edis + # saveL2Data(code, add_datas) # 璁剧疆鏈�鏂扮殑l2鏁版嵁閲囬泦鐨勬暟閲� -def __set_l2_data_latest_count(code, count): - redis = _redisManager.getRedis() +def set_l2_data_latest_count(code, count): key = "latest-l2-count-{}".format(code) - redis.setex(key, 2, count) + RedisUtils.setex(_redisManager.getRedis(), key, 2, count) pass @@ -145,10 +176,9 @@ def get_l2_data_latest_count(code): if code is None or len(code) < 1: return 0 - redis = _redisManager.getRedis() key = "latest-l2-count-{}".format(code) - result = redis.get(key) + result = RedisUtils.get(_redisManager.getRedis(), key) if result is None: return 0 else: @@ -164,12 +194,42 @@ channel = data["channel"] capture_time = data["captureTime"] process_time = data["processTime"] + count = data["count"] data = data["data"] - limit_up_price = gpcode_manager.get_limit_up_price(code) - - datas = L2DataUtil.format_l2_data(data, code, limit_up_price) # 鑾峰彇娑ㄥ仠浠� - return day, client, channel, code, capture_time, process_time, datas, data + return day, client, channel, code, capture_time, process_time, data, count + + +# 鍏冩暟鎹槸鍚︽湁宸紓 +def is_origin_data_diffrent(data1, data2): + if data1 is None or data2 is None: + return True + if len(data1) != len(data2): + return True + # 姣旇緝 + data_length = len(data1) + step = len(data1) // 10 + for i in range(0, data_length, step): + if json.dumps(data1[i]) != json.dumps(data2[i]): + return True + return False + + +# 鏄惁涓哄ぇ鍗� +def is_big_money(val): + price = float(val["price"]) + money = price * int(val["num"]) + if price > 3.0: + if money >= 30000: + return True + else: + return False + else: + max_money = price * 10000 + if money >= max_money * 0.95: + return True + else: + return False class L2DataUtil: @@ -242,7 +302,8 @@ # 淇濆瓨鍒版暟鎹簱锛屾洿鏂皉e鐨勬暟鎹� save_list.append(_ldata) if len(save_list) > 0: - saveL2Data(code, save_list, "淇濆瓨绾犳鏁版嵁") + # 鏆傛椂涓嶅皢鏁版嵁淇濆瓨鍒皉edis + # saveL2Data(code, save_list, "淇濆瓨绾犳鏁版嵁") local_latest_datas[code] = latest_data return _datas @@ -361,45 +422,115 @@ class L2TradeQueueUtils(object): + # 涔板叆鏁版嵁鏄惁宸叉挙 + @classmethod + def __is_cancel(cls, code, data, total_datas, local_today_num_operate_map): + val = data["val"] + cancel_datas = local_today_num_operate_map.get( + "{}-{}-{}".format(val["num"], "1", val["price"])) + # 鏄惁鏈変拱鎾ゆ暟鎹� + if cancel_datas: + for cancel_data in cancel_datas: + buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data, + local_today_num_operate_map) + if buy_index == data["index"]: + return True + return False + # 鑾峰彇鎴愪氦杩涘害绱㈠紩 @classmethod - def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList, + def find_traded_progress_index(cls, code, buy_1_price, total_datas, local_today_num_operate_map, queueList, + last_index, latest_not_limit_up_time=None): + + def find_traded_progress_index_simple(queues): + index_set = set() + for num in queues: + buy_datas = local_today_num_operate_map.get( + "{}-{}-{}".format(num, "0", buy_1_price_format)) + if buy_datas is not None and len(buy_datas) > 0: + for data in buy_datas: + # 鍦ㄦ渶杩戜竴娆¢潪娑ㄥ仠涔�1鏇存柊鐨勬椂闂翠箣鍚庢墠鏈夋晥 + if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"], + latest_not_limit_up_time) >= 0: + if data["index"] >= last_index: + index_set.add(data["index"]) + index_list = list(index_set) + index_list.sort() + num_list = [] + new_index_list = [] + for index in index_list: + for i in range(0, total_datas[index]["re"]): + num_list.append(total_datas[index]["val"]["num"]) + new_index_list.append(index) + index_list_str = ",".join(list(map(str, num_list))) + queue_list_str = ",".join(list(map(str, queues))) + find_index = index_list_str.find(queue_list_str) + if find_index >= 0: + temp_str = index_list_str[0:find_index] + if temp_str.endswith(","): + temp_str = temp_str[:-1] + if temp_str == "": + return new_index_list[0], new_index_list[0:len(queues)] + start_index = len(temp_str.split(",")) + return new_index_list[start_index], new_index_list[start_index:start_index + len(queues)] + return None, None + + # 3涓暟鎹互涓婄殑涓嶉渶瑕佸垽鏂渶杩戠殑涓�娆℃湭娑ㄥ仠鏃堕棿 + if len(queueList) >= 3: + latest_not_limit_up_time = None + + # 鍒ゆ柇鍖归厤鐨勪綅缃槸鍚﹀彲淇� + def is_trust(indexes): + cha = [] + for i in range(1, len(indexes)): + cha.append(indexes[i] - indexes[i - 1] - 1) + if len(cha) <= 1: + return True + # 鏍囧噯宸皬浜�1 + std_result = numpy.std(cha) + if std_result < 10: + # 缁濆鍙俊 + return True + + for i in range(0, len(cha)): + if abs(cha[i]) > 10: + # 鏈夎秴杩�10 鐨勯渶瑕佸垽鏂袱涓浉涓存暟鎹棿鐨勬湭鎾ょ殑涔板叆鏁伴噺 + buy_count = 0 + for index in range(indexes[i] + 1, indexes[i + 1] - 1): + if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]): + if not cls.__is_cancel(code, total_datas[index], total_datas, local_today_num_operate_map): + buy_count += total_datas[index]["re"] + # 鏆傚畾3涓宸寖鍥� + if buy_count >= 3: + return False + return True + if len(queueList) == 0: return None + # last_index涓嶈兘鎾わ紝濡傛灉宸叉挙灏辨竻闆� + if cls.__is_cancel(code, total_datas[last_index], total_datas, local_today_num_operate_map): + last_index = 0 # 琛ラ綈鏁存暟浣�5浣� buy_1_price_format = f"{buy_1_price}" while buy_1_price_format.find(".") < 4: buy_1_price_format = "0" + buy_1_price_format - index_set = set() - for num in queueList: - buy_datas = local_today_num_operate_map.get( - "{}-{}-{}".format(num, "0", buy_1_price_format)) - if buy_datas is not None and len(buy_datas) > 0: - for data in buy_datas: - # 鍦ㄦ渶杩戜竴娆¢潪娑ㄥ仠涔�1鏇存柊鐨勬椂闂翠箣鍚庢墠鏈夋晥 - if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"], - latest_not_limit_up_time) >= 0: - index_set.add(data["index"]) - index_list = list(index_set) - index_list.sort() - num_list = [] - new_index_list = [] - for index in index_list: - for i in range(0, total_datas[index]["re"]): - num_list.append(total_datas[index]["val"]["num"]) - new_index_list.append(index) - index_list_str = ",".join(list(map(str, num_list))) - queue_list_str = ",".join(list(map(str, queueList))) - find_index = index_list_str.find(queue_list_str) - if find_index >= 0: - temp_str = index_list_str[0:find_index] - if temp_str.endswith(","): - temp_str = temp_str[:-1] - return new_index_list[len(temp_str.split(","))] + # --------鍥犲瓙鏌ユ壘娉曪紙鍥犲瓙鐨勭獥鍙f渶澶т负锛歭en(queueList) ,鏈�灏忎负锛歭en(queueList)/2锛�--------- + max_win_len = len(queueList) + min_win_len = len(queueList) // 2 + if max_win_len == min_win_len: + min_win_len = max_win_len - 1 + for win_len in range(max_win_len, min_win_len, -1): + # 绐楀彛绉诲姩 + for i in range(0, max_win_len - win_len + 1): + queues = queueList[i:i + win_len] + f_start_index, f_indexs = find_traded_progress_index_simple(queues) + if f_start_index and is_trust(f_indexs): + return f_start_index + raise Exception("灏氭湭鎵惧埌鎴愪氦杩涘害") if __name__ == "__main__": - pass + print(load_l2_data("002235")) -- Gitblit v1.8.0