From 736e61b89e87f7e3c224feca25e94cda459b9ae6 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 06 二月 2023 15:26:05 +0800 Subject: [PATCH] H撤完善,修改代码文件目录 --- l2/l2_data_util.py | 376 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 372 insertions(+), 4 deletions(-) diff --git a/l2/l2_data_util.py b/l2/l2_data_util.py index 5495d40..8360263 100644 --- a/l2/l2_data_util.py +++ b/l2/l2_data_util.py @@ -2,20 +2,385 @@ L2鐩稿叧鏁版嵁澶勭悊 """ - # L2浜ゆ槗闃熷垪 +import datetime +import decimal +import json +import logging +import time + +import constant +import gpcode_manager +import l2_data_log +import log +import redis_manager +import tool + +_redisManager = redis_manager.RedisManager(1) +# l2鏁版嵁绠$悊 +# 鏈湴鏈�鏂颁竴娆′笂浼犵殑鏁版嵁 +local_latest_datas = {} +# 鏈湴浠婃棩鏁版嵁 +local_today_datas = {} +# 鏈湴鎵嬫暟+鎿嶄綔閭g被鍨嬬粍鎴愮殑涓存椂鍙橀噺 +# 鐢ㄤ簬鍔犲揩鏁版嵁澶勭悊锛岀敤绌烘崲鏃堕棿 +local_today_num_operate_map = {} + + +def load_l2_data(code, force=False): + redis = _redisManager.getRedis() + # 鍔犺浇鏈�杩戠殑l2鏁版嵁 + if local_latest_datas.get(code) is None or force: + # 鑾峰彇鏈�杩戠殑鏁版嵁 + _data = redis.get("l2-data-latest-{}".format(code)) + if _data is not None: + if code in local_latest_datas: + local_latest_datas[code] = json.loads(_data) + else: + local_latest_datas.setdefault(code, json.loads(_data)) + # 鑾峰彇浠婃棩鐨勬暟鎹� + + if local_today_datas.get(code) is None or force: + datas = log.load_l2_from_log() + datas = datas.get(code) + if datas is None: + datas = [] + local_today_datas[code] = datas + + # 浠庢暟鎹簱鍔犺浇 + # datas = [] + # keys = redis.keys("l2-{}-*".format(code)) + # for k in keys: + # value = redis.get(k) + # _data = l2_data_util.l2_data_key_2_obj(k, value) + # datas.append(_data) + # # 鎺掑簭 + # new_datas = sorted(datas, + # key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index'))) + # local_today_datas[code] = new_datas + # 鏍规嵁浠婃棩鏁版嵁鍔犺浇 + load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) + + +# 灏嗘暟鎹牴鎹畁um-operate鍒嗙被 +def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False): + if local_today_num_operate_map.get(code) is None: + local_today_num_operate_map[code] = {} + if clear: + local_today_num_operate_map[code] = {} + + for data in source_datas: + key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"]) + if local_today_num_operate_map[code].get(key) is None: + local_today_num_operate_map[code].setdefault(key, []) + local_today_num_operate_map[code].get(key).append(data) + + +@tool.async_call +def saveL2Data(code, datas, msg=""): + start_time = round(time.time() * 1000) + # 鏌ヨ绁ㄦ槸鍚﹀湪寰呯洃鍚殑绁ㄩ噷闈� + if not gpcode_manager.is_in_gp_pool(code): + return None + # 楠岃瘉鑲′环鐨勬纭�� + redis_instance = _redisManager.getRedis() + + try: + if redis_instance.setnx("l2-save-{}".format(code), "1") > 0: + + # 璁$畻淇濈暀鐨勬椂闂� + expire = tool.get_expire() + i = 0 + for _data in datas: + i += 1 + key = "l2-" + _data["key"] + value = redis_instance.get(key) + if value is None: + # 鏂板 + try: + value = {"index": _data["index"], "re": _data["re"]} + redis_instance.setex(key, expire, json.dumps(value)) + except: + logging.error("鏇存L2鏁版嵁鍑洪敊锛歿} key:{}".format(code, key)) + else: + json_value = json.loads(value) + if json_value["re"] != _data["re"]: + json_value["re"] = _data["re"] + redis_instance.setex(key, expire, json.dumps(json_value)) + finally: + redis_instance.delete("l2-save-{}".format(code)) + + print("淇濆瓨鏂版暟鎹敤鏃讹細", msg, "鑰楁椂锛歿}".format(round(time.time() * 1000) - start_time)) + return datas + + +# 淇濆瓨l2鏁版嵁 +def save_l2_data(code, datas, add_datas, randomKey=None): + redis = _redisManager.getRedis() + # 鍙湁鏈夋柊鏇炬暟鎹墠闇�瑕佷繚瀛� + if len(add_datas) > 0: + # 淇濆瓨鏈�杩戠殑鏁版嵁 + __start_time = round(time.time() * 1000) + redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) + l2_data_log.l2_time(code, randomKey, round(time.time() * 1000) - __start_time, "淇濆瓨鏈�杩憀2鏁版嵁鐢ㄦ椂") + # 璁剧疆杩涘唴瀛� + local_latest_datas[code] = datas + __set_l2_data_latest_count(code, len(datas)) + try: + log.logger_l2_data.info("{}-{}", code, add_datas) + except Exception as e: + logging.exception(e) + saveL2Data(code, add_datas) + + +# 璁剧疆鏈�鏂扮殑l2鏁版嵁閲囬泦鐨勬暟閲� +def __set_l2_data_latest_count(code, count): + redis = _redisManager.getRedis() + key = "latest-l2-count-{}".format(code) + redis.setex(key, 2, count) + pass + + +# 鑾峰彇浠g爜鏈�杩戠殑l2鏁版嵁鏁伴噺 +def get_l2_data_latest_count(code): + if code is None or len(code) < 1: + return 0 + redis = _redisManager.getRedis() + key = "latest-l2-count-{}".format(code) + + result = redis.get(key) + if result is None: + return 0 + else: + return int(result) + + +def parseL2Data(str): + day = datetime.datetime.now().strftime("%Y%m%d") + dict = json.loads(str) + data = dict["data"] + client = dict["client"] + code = data["code"] + channel = data["channel"] + capture_time = data["captureTime"] + process_time = data["processTime"] + data = data["data"] + limit_up_price = gpcode_manager.get_limit_up_price(code) + + datas = L2DataUtil.format_l2_data(data, code, limit_up_price) + # 鑾峰彇娑ㄥ仠浠� + return day, client, channel, code, capture_time, process_time, datas, data + + +class L2DataUtil: + @classmethod + def is_same_time(cls, time1, time2): + if constant.TEST: + return True + time1_s = time1.split(":") + time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) + time2_s = time2.split(":") + time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) + if abs(time2_second - time1_second) < 3: + return True + else: + return False + + # 鑾峰彇澧為噺鏁版嵁 + @classmethod + def get_add_data(cls, code, latest_datas, datas, _start_index): + if datas is not None and len(datas) < 1: + return [] + last_data = None + latest_datas_ = latest_datas + if latest_datas_ is not None and len(latest_datas_) > 0: + last_data = latest_datas_[-1] + + count = 0 + start_index = -1 + # 濡傛灉鍘熸潵娌℃湁鏁版嵁 + # 璁剧疆add_data鐨勫簭鍙� + for n in reversed(datas): + count += 1 + if n["key"] == (last_data["key"] if last_data is not None else ""): + start_index = len(datas) - count + break + + _add_datas = [] + if last_data is not None: + if start_index < 0: + if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second( + last_data["val"]["time"]): + _add_datas = datas + else: + _add_datas = [] + elif start_index + 1 >= len(datas): + _add_datas = [] + else: + _add_datas = datas[start_index + 1:] + else: + _add_datas = datas[start_index + 1:] + for i in range(0, len(_add_datas)): + _add_datas[i]["index"] = _start_index + i + + return _add_datas + + # 绾犳鏁版嵁锛屽皢re瀛楁鏇挎崲涓鸿緝澶у�� + @classmethod + def correct_data(cls, code, latest_datas, _datas): + latest_data = latest_datas + if latest_data is None: + latest_data = [] + save_list = [] + for data in _datas: + for _ldata in latest_data: + # 鏂版暟鎹潯鏁版瘮鏃ф暟鎹鎵嶄繚瀛� + if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]: + max_re = max(_ldata["re"], data["re"]) + _ldata["re"] = max_re + data["re"] = max_re + # 淇濆瓨鍒版暟鎹簱锛屾洿鏂皉e鐨勬暟鎹� + save_list.append(_ldata) + if len(save_list) > 0: + saveL2Data(code, save_list, "淇濆瓨绾犳鏁版嵁") + local_latest_datas[code] = latest_data + return _datas + + # 澶勭悊l2鏁版嵁 + @classmethod + def format_l2_data(cls, data, code, limit_up_price): + datas = [] + dataIndexs = {} + same_time_num = {} + for item in data: + # 瑙f瀽鏁版嵁 + time = item["time"] + if time in same_time_num: + same_time_num[time] = same_time_num[time] + 1 + else: + same_time_num[time] = 1 + + price = float(item["price"]) + num = item["num"] + limitPrice = item["limitPrice"] + # 娑ㄥ仠浠� + if limit_up_price is not None: + if limit_up_price == tool.to_price(decimal.Decimal(price)): + limitPrice = 1 + else: + limitPrice = 0 + item["limitPrice"] = "{}".format(limitPrice) + operateType = item["operateType"] + # 涓嶉渶瑕侀潪娑ㄥ仠涔颁笌涔版挙 + if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1): + continue + + cancelTime = item["cancelTime"] + cancelTimeUnit = item["cancelTimeUnit"] + key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, + cancelTimeUnit) + if key in dataIndexs: + # 鏁版嵁閲嶅娆℃暟+1 + datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 + else: + # 鏁版嵁閲嶅娆℃暟榛樿涓�1 + datas.append({"key": key, "val": item, "re": 1}) + dataIndexs.setdefault(key, len(datas) - 1) + # TODO 娴嬭瘯鐨勬椂鍊欏紑鍚紝鏂逛究璁板綍澶у崟鏁版嵁 + # l2_data_util.save_big_data(code, same_time_num, data) + return datas + + @classmethod + def get_time_as_second(cls, time_str): + ts = time_str.split(":") + return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) + + # @classmethod + # def get_time_as_str(cls, time_seconds): + # ts = time_str.split(":") + # return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) + + # 鏄惁鏄定鍋滀环涔� + @classmethod + def is_limit_up_price_buy(cls, val): + if int(val["limitPrice"]) != 1: + return False + + if int(val["operateType"]) != 0: + return False + + price = float(val["price"]) + num = int(val["num"]) + # if price * num * 100 < 50 * 10000: + # return False + return True + + # 鏄惁涓烘定鍋滃崠 + @classmethod + def is_limit_up_price_sell(cls, val): + if int(val["limitPrice"]) != 1: + return False + + if int(val["operateType"]) != 2: + return False + + price = float(val["price"]) + num = int(val["num"]) + # if price * num * 100 < 50 * 10000: + # return False + return True + + # 鏄惁娑ㄥ仠涔版挙 + @classmethod + def is_limit_up_price_buy_cancel(cls, val): + if int(val["limitPrice"]) != 1: + return False + + if int(val["operateType"]) != 1: + return False + + price = float(val["price"]) + num = int(val["num"]) + # if price * num * 100 < 50 * 10000: + # return False + return True + + # 鏄惁鍗栨挙 + @classmethod + def is_sell_cancel(cls, val): + if int(val["operateType"]) == 3: + return True + return False + + # 鏄惁涓哄崠 + @classmethod + def is_sell(cls, val): + if int(val["operateType"]) == 2: + return True + return False + + class L2TradeQueueUtils(object): # 鑾峰彇鎴愪氦杩涘害绱㈠紩 - def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList): + @classmethod + def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList, + latest_not_limit_up_time=None): if len(queueList) == 0: return None + # 琛ラ綈鏁存暟浣�5浣� + buy_1_price_format = f"{buy_1_price}" + while buy_1_price_format.find(".") < 4: + buy_1_price_format = "0" + buy_1_price_format index_set = set() for num in queueList: buy_datas = local_today_num_operate_map.get( - "{}-{}-{}".format(num, "0", buy_1_price)) + "{}-{}-{}".format(num, "0", buy_1_price_format)) if buy_datas is not None and len(buy_datas) > 0: for data in buy_datas: - index_set.add(data["index"]) + # 鍦ㄦ渶杩戜竴娆¢潪娑ㄥ仠涔�1鏇存柊鐨勬椂闂翠箣鍚庢墠鏈夋晥 + if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"], + latest_not_limit_up_time) >= 0: + index_set.add(data["index"]) index_list = list(index_set) index_list.sort() num_list = [] @@ -29,6 +394,9 @@ find_index = index_list_str.find(queue_list_str) if find_index >= 0: temp_str = index_list_str[0:find_index] + if temp_str.endswith(","): + temp_str = temp_str[:-1] + return new_index_list[len(temp_str.split(","))] raise Exception("灏氭湭鎵惧埌鎴愪氦杩涘害") -- Gitblit v1.8.0