| | |
| | | L2相关数据处理 |
| | | """ |
| | | |
| | | |
| | | # L2交易队列 |
| | | import datetime |
| | | import decimal |
| | | import json |
| | | import logging |
| | | import time |
| | | |
| | | import constant |
| | | import gpcode_manager |
| | | import l2_data_log |
| | | import log |
| | | import redis_manager |
| | | import tool |
| | | |
| | | _redisManager = redis_manager.RedisManager(1) |
| | | # l2数据管理 |
| | | # 本地最新一次上传的数据 |
| | | local_latest_datas = {} |
| | | # 本地今日数据 |
| | | local_today_datas = {} |
| | | # 本地手数+操作那类型组成的临时变量 |
| | | # 用于加快数据处理,用空换时间 |
| | | local_today_num_operate_map = {} |
| | | |
| | | |
| | | def load_l2_data(code, force=False): |
| | | redis = _redisManager.getRedis() |
| | | # 加载最近的l2数据 |
| | | if local_latest_datas.get(code) is None or force: |
| | | # 获取最近的数据 |
| | | _data = redis.get("l2-data-latest-{}".format(code)) |
| | | if _data is not None: |
| | | if code in local_latest_datas: |
| | | local_latest_datas[code] = json.loads(_data) |
| | | else: |
| | | local_latest_datas.setdefault(code, json.loads(_data)) |
| | | # 获取今日的数据 |
| | | |
| | | if local_today_datas.get(code) is None or force: |
| | | datas = log.load_l2_from_log() |
| | | datas = datas.get(code) |
| | | if datas is None: |
| | | datas = [] |
| | | local_today_datas[code] = datas |
| | | |
| | | # 从数据库加载 |
| | | # datas = [] |
| | | # keys = redis.keys("l2-{}-*".format(code)) |
| | | # for k in keys: |
| | | # value = redis.get(k) |
| | | # _data = l2_data_util.l2_data_key_2_obj(k, value) |
| | | # datas.append(_data) |
| | | # # 排序 |
| | | # new_datas = sorted(datas, |
| | | # key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index'))) |
| | | # local_today_datas[code] = new_datas |
| | | # 根据今日数据加载 |
| | | load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) |
| | | |
| | | |
| | | # 将数据根据num-operate分类 |
| | | def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False): |
| | | if local_today_num_operate_map.get(code) is None: |
| | | local_today_num_operate_map[code] = {} |
| | | if clear: |
| | | local_today_num_operate_map[code] = {} |
| | | |
| | | for data in source_datas: |
| | | key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"]) |
| | | if local_today_num_operate_map[code].get(key) is None: |
| | | local_today_num_operate_map[code].setdefault(key, []) |
| | | local_today_num_operate_map[code].get(key).append(data) |
| | | |
| | | |
| | | @tool.async_call |
| | | def saveL2Data(code, datas, msg=""): |
| | | start_time = round(time.time() * 1000) |
| | | # 查询票是否在待监听的票里面 |
| | | if not gpcode_manager.is_in_gp_pool(code): |
| | | return None |
| | | # 验证股价的正确性 |
| | | redis_instance = _redisManager.getRedis() |
| | | |
| | | try: |
| | | if redis_instance.setnx("l2-save-{}".format(code), "1") > 0: |
| | | |
| | | # 计算保留的时间 |
| | | expire = tool.get_expire() |
| | | i = 0 |
| | | for _data in datas: |
| | | i += 1 |
| | | key = "l2-" + _data["key"] |
| | | value = redis_instance.get(key) |
| | | if value is None: |
| | | # 新增 |
| | | try: |
| | | value = {"index": _data["index"], "re": _data["re"]} |
| | | redis_instance.setex(key, expire, json.dumps(value)) |
| | | except: |
| | | logging.error("更正L2数据出错:{} key:{}".format(code, key)) |
| | | else: |
| | | json_value = json.loads(value) |
| | | if json_value["re"] != _data["re"]: |
| | | json_value["re"] = _data["re"] |
| | | redis_instance.setex(key, expire, json.dumps(json_value)) |
| | | finally: |
| | | redis_instance.delete("l2-save-{}".format(code)) |
| | | |
| | | print("保存新数据用时:", msg, "耗时:{}".format(round(time.time() * 1000) - start_time)) |
| | | return datas |
| | | |
| | | |
| | | # 保存l2数据 |
| | | def save_l2_data(code, datas, add_datas, randomKey=None): |
| | | redis = _redisManager.getRedis() |
| | | # 只有有新曾数据才需要保存 |
| | | if len(add_datas) > 0: |
| | | # 保存最近的数据 |
| | | __start_time = round(time.time() * 1000) |
| | | redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) |
| | | l2_data_log.l2_time(code, randomKey, round(time.time() * 1000) - __start_time, "保存最近l2数据用时") |
| | | # 设置进内存 |
| | | local_latest_datas[code] = datas |
| | | __set_l2_data_latest_count(code, len(datas)) |
| | | try: |
| | | log.logger_l2_data.info("{}-{}", code, add_datas) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | saveL2Data(code, add_datas) |
| | | |
| | | |
| | | # 设置最新的l2数据采集的数量 |
| | | def __set_l2_data_latest_count(code, count): |
| | | redis = _redisManager.getRedis() |
| | | key = "latest-l2-count-{}".format(code) |
| | | redis.setex(key, 2, count) |
| | | pass |
| | | |
| | | |
| | | # 获取代码最近的l2数据数量 |
| | | def get_l2_data_latest_count(code): |
| | | if code is None or len(code) < 1: |
| | | return 0 |
| | | redis = _redisManager.getRedis() |
| | | key = "latest-l2-count-{}".format(code) |
| | | |
| | | result = redis.get(key) |
| | | if result is None: |
| | | return 0 |
| | | else: |
| | | return int(result) |
| | | |
| | | |
| | | def parseL2Data(str): |
| | | day = datetime.datetime.now().strftime("%Y%m%d") |
| | | dict = json.loads(str) |
| | | data = dict["data"] |
| | | client = dict["client"] |
| | | code = data["code"] |
| | | channel = data["channel"] |
| | | capture_time = data["captureTime"] |
| | | process_time = data["processTime"] |
| | | data = data["data"] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | |
| | | datas = L2DataUtil.format_l2_data(data, code, limit_up_price) |
| | | # 获取涨停价 |
| | | return day, client, channel, code, capture_time, process_time, datas, data |
| | | |
| | | |
| | | class L2DataUtil: |
| | | @classmethod |
| | | def is_same_time(cls, time1, time2): |
| | | if constant.TEST: |
| | | return True |
| | | time1_s = time1.split(":") |
| | | time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) |
| | | time2_s = time2.split(":") |
| | | time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) |
| | | if abs(time2_second - time1_second) < 3: |
| | | return True |
| | | else: |
| | | return False |
| | | |
| | | # 获取增量数据 |
| | | @classmethod |
| | | def get_add_data(cls, code, latest_datas, datas, _start_index): |
| | | if datas is not None and len(datas) < 1: |
| | | return [] |
| | | last_data = None |
| | | latest_datas_ = latest_datas |
| | | if latest_datas_ is not None and len(latest_datas_) > 0: |
| | | last_data = latest_datas_[-1] |
| | | |
| | | count = 0 |
| | | start_index = -1 |
| | | # 如果原来没有数据 |
| | | # 设置add_data的序号 |
| | | for n in reversed(datas): |
| | | count += 1 |
| | | if n["key"] == (last_data["key"] if last_data is not None else ""): |
| | | start_index = len(datas) - count |
| | | break |
| | | |
| | | _add_datas = [] |
| | | if last_data is not None: |
| | | if start_index < 0: |
| | | if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second( |
| | | last_data["val"]["time"]): |
| | | _add_datas = datas |
| | | else: |
| | | _add_datas = [] |
| | | elif start_index + 1 >= len(datas): |
| | | _add_datas = [] |
| | | else: |
| | | _add_datas = datas[start_index + 1:] |
| | | else: |
| | | _add_datas = datas[start_index + 1:] |
| | | for i in range(0, len(_add_datas)): |
| | | _add_datas[i]["index"] = _start_index + i |
| | | |
| | | return _add_datas |
| | | |
| | | # 纠正数据,将re字段替换为较大值 |
| | | @classmethod |
| | | def correct_data(cls, code, latest_datas, _datas): |
| | | latest_data = latest_datas |
| | | if latest_data is None: |
| | | latest_data = [] |
| | | save_list = [] |
| | | for data in _datas: |
| | | for _ldata in latest_data: |
| | | # 新数据条数比旧数据多才保存 |
| | | if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]: |
| | | max_re = max(_ldata["re"], data["re"]) |
| | | _ldata["re"] = max_re |
| | | data["re"] = max_re |
| | | # 保存到数据库,更新re的数据 |
| | | save_list.append(_ldata) |
| | | if len(save_list) > 0: |
| | | saveL2Data(code, save_list, "保存纠正数据") |
| | | local_latest_datas[code] = latest_data |
| | | return _datas |
| | | |
| | | # 处理l2数据 |
| | | @classmethod |
| | | def format_l2_data(cls, data, code, limit_up_price): |
| | | datas = [] |
| | | dataIndexs = {} |
| | | same_time_num = {} |
| | | for item in data: |
| | | # 解析数据 |
| | | time = item["time"] |
| | | if time in same_time_num: |
| | | same_time_num[time] = same_time_num[time] + 1 |
| | | else: |
| | | same_time_num[time] = 1 |
| | | |
| | | price = float(item["price"]) |
| | | num = item["num"] |
| | | limitPrice = item["limitPrice"] |
| | | # 涨停价 |
| | | if limit_up_price is not None: |
| | | if limit_up_price == tool.to_price(decimal.Decimal(price)): |
| | | limitPrice = 1 |
| | | else: |
| | | limitPrice = 0 |
| | | item["limitPrice"] = "{}".format(limitPrice) |
| | | operateType = item["operateType"] |
| | | # 不需要非涨停买与买撤 |
| | | if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1): |
| | | continue |
| | | |
| | | cancelTime = item["cancelTime"] |
| | | cancelTimeUnit = item["cancelTimeUnit"] |
| | | key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, |
| | | cancelTimeUnit) |
| | | if key in dataIndexs: |
| | | # 数据重复次数+1 |
| | | datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 |
| | | else: |
| | | # 数据重复次数默认为1 |
| | | datas.append({"key": key, "val": item, "re": 1}) |
| | | dataIndexs.setdefault(key, len(datas) - 1) |
| | | # TODO 测试的时候开启,方便记录大单数据 |
| | | # l2_data_util.save_big_data(code, same_time_num, data) |
| | | return datas |
| | | |
| | | @classmethod |
| | | def get_time_as_second(cls, time_str): |
| | | ts = time_str.split(":") |
| | | return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) |
| | | |
| | | # @classmethod |
| | | # def get_time_as_str(cls, time_seconds): |
| | | # ts = time_str.split(":") |
| | | # return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) |
| | | |
| | | # 是否是涨停价买 |
| | | @classmethod |
| | | def is_limit_up_price_buy(cls, val): |
| | | if int(val["limitPrice"]) != 1: |
| | | return False |
| | | |
| | | if int(val["operateType"]) != 0: |
| | | return False |
| | | |
| | | price = float(val["price"]) |
| | | num = int(val["num"]) |
| | | # if price * num * 100 < 50 * 10000: |
| | | # return False |
| | | return True |
| | | |
| | | # 是否为涨停卖 |
| | | @classmethod |
| | | def is_limit_up_price_sell(cls, val): |
| | | if int(val["limitPrice"]) != 1: |
| | | return False |
| | | |
| | | if int(val["operateType"]) != 2: |
| | | return False |
| | | |
| | | price = float(val["price"]) |
| | | num = int(val["num"]) |
| | | # if price * num * 100 < 50 * 10000: |
| | | # return False |
| | | return True |
| | | |
| | | # 是否涨停买撤 |
| | | @classmethod |
| | | def is_limit_up_price_buy_cancel(cls, val): |
| | | if int(val["limitPrice"]) != 1: |
| | | return False |
| | | |
| | | if int(val["operateType"]) != 1: |
| | | return False |
| | | |
| | | price = float(val["price"]) |
| | | num = int(val["num"]) |
| | | # if price * num * 100 < 50 * 10000: |
| | | # return False |
| | | return True |
| | | |
| | | # 是否卖撤 |
| | | @classmethod |
| | | def is_sell_cancel(cls, val): |
| | | if int(val["operateType"]) == 3: |
| | | return True |
| | | return False |
| | | |
| | | # 是否为卖 |
| | | @classmethod |
| | | def is_sell(cls, val): |
| | | if int(val["operateType"]) == 2: |
| | | return True |
| | | return False |
| | | |
| | | |
| | | class L2TradeQueueUtils(object): |
| | | # 获取成交进度索引 |
| | | def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList): |
| | | @classmethod |
| | | def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList, |
| | | latest_not_limit_up_time=None): |
| | | if len(queueList) == 0: |
| | | return None |
| | | # 补齐整数位5位 |
| | | buy_1_price_format = f"{buy_1_price}" |
| | | while buy_1_price_format.find(".") < 4: |
| | | buy_1_price_format = "0" + buy_1_price_format |
| | | index_set = set() |
| | | for num in queueList: |
| | | buy_datas = local_today_num_operate_map.get( |
| | | "{}-{}-{}".format(num, "0", buy_1_price)) |
| | | "{}-{}-{}".format(num, "0", buy_1_price_format)) |
| | | if buy_datas is not None and len(buy_datas) > 0: |
| | | for data in buy_datas: |
| | | index_set.add(data["index"]) |
| | | # 在最近一次非涨停买1更新的时间之后才有效 |
| | | if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"], |
| | | latest_not_limit_up_time) >= 0: |
| | | index_set.add(data["index"]) |
| | | index_list = list(index_set) |
| | | index_list.sort() |
| | | num_list = [] |
| | |
| | | find_index = index_list_str.find(queue_list_str) |
| | | if find_index >= 0: |
| | | temp_str = index_list_str[0:find_index] |
| | | if temp_str.endswith(","): |
| | | temp_str = temp_str[:-1] |
| | | |
| | | return new_index_list[len(temp_str.split(","))] |
| | | raise Exception("尚未找到成交进度") |
| | | |