| | |
| | | import numpy |
| | | |
| | | import constant |
| | | import gpcode_manager |
| | | import l2_data_util |
| | | from code_attribute import gpcode_manager |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from l2 import l2_data_log, l2_data_source_util |
| | | import log |
| | | from db import redis_manager |
| | | import tool |
| | | from log_module import log, log_export |
| | | from db import redis_manager_delegate as redis_manager |
| | | from utils import tool |
| | | |
| | | _redisManager = redis_manager.RedisManager(1) |
| | | # l2数据管理 |
| | |
| | | # 用于加快数据处理,用空换时间 |
| | | local_today_num_operate_map = {} |
| | | |
| | | # 买入订单号映射,只有原生的L2数据才有 |
| | | local_today_buyno_map = {} |
| | | |
| | | def load_l2_data(code, force=False): |
| | | redis = _redisManager.getRedis() |
| | | |
| | | def load_l2_data(code, load_latest=True, force=False): |
| | | # 加载最近的l2数据 |
| | | if local_latest_datas.get(code) is None or force: |
| | | # 获取最近的数据 |
| | | _data = redis.get("l2-data-latest-{}".format(code)) |
| | | if _data is not None: |
| | | if code in local_latest_datas: |
| | | local_latest_datas[code] = json.loads(_data) |
| | | else: |
| | | local_latest_datas.setdefault(code, json.loads(_data)) |
| | | if load_latest: |
| | | if local_latest_datas.get(code) is None or force: |
| | | # 获取最近的数据 |
| | | _data = RedisUtils.get(_redisManager.getRedis(), "l2-data-latest-{}".format(code)) |
| | | if _data is not None: |
| | | if code in local_latest_datas: |
| | | local_latest_datas[code] = json.loads(_data) |
| | | else: |
| | | local_latest_datas.setdefault(code, json.loads(_data)) |
| | | # 获取今日的数据 |
| | | |
| | | if local_today_datas.get(code) is None or force: |
| | | datas = log.load_l2_from_log() |
| | | datas = log_export.load_l2_from_log() |
| | | datas = datas.get(code) |
| | | if datas is None: |
| | | datas = [] |
| | | local_today_datas[code] = datas |
| | | data_normal = True |
| | | if datas and len(datas) < datas[-1]["index"] + 1: |
| | | data_normal = False |
| | | |
| | | # 从数据库加载 |
| | | # datas = [] |
| | |
| | | # local_today_datas[code] = new_datas |
| | | # 根据今日数据加载 |
| | | load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) |
| | | load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force) |
| | | return data_normal |
| | | return True |
| | | |
| | | |
| | | # 将数据根据num-operate分类 |
| | |
| | | local_today_num_operate_map[code].get(key).append(data) |
| | | |
| | | |
| | | # 将数据根据orderNo分类,原生数据才有 |
| | | def load_buy_no_map(local_today_buyno_map, code, source_datas, clear=False): |
| | | # 只有原生L2数据才会有此操作 |
| | | if constant.L2_SOURCE_TYPE != constant.L2_SOURCE_TYPE_HUAXIN: |
| | | return |
| | | if local_today_buyno_map.get(code) is None: |
| | | local_today_buyno_map[code] = {} |
| | | if clear: |
| | | local_today_buyno_map[code] = {} |
| | | |
| | | for data in source_datas: |
| | | if data["val"]["operateType"] != 0: |
| | | continue |
| | | # 只填充买入数据 |
| | | key = "{}".format(data["val"]["orderNo"]) |
| | | if local_today_buyno_map[code].get(key) is None: |
| | | local_today_buyno_map[code].setdefault(key, data) |
| | | |
| | | |
| | | @tool.async_call |
| | | def saveL2Data(code, datas, msg=""): |
| | | start_time = round(time.time() * 1000) |
| | |
| | | redis_instance = _redisManager.getRedis() |
| | | |
| | | try: |
| | | if redis_instance.setnx("l2-save-{}".format(code), "1") > 0: |
| | | |
| | | if RedisUtils.setnx(redis_instance, "l2-save-{}".format(code), "1", auto_free=False) > 0: |
| | | # 计算保留的时间 |
| | | expire = tool.get_expire() |
| | | i = 0 |
| | | for _data in datas: |
| | | i += 1 |
| | | key = "l2-" + _data["key"] |
| | | value = redis_instance.get(key) |
| | | value = RedisUtils.get(redis_instance, key, auto_free=False) |
| | | if value is None: |
| | | # 新增 |
| | | try: |
| | | value = {"index": _data["index"], "re": _data["re"]} |
| | | redis_instance.setex(key, expire, json.dumps(value)) |
| | | RedisUtils.setex(redis_instance, key, expire, json.dumps(value), auto_free=False) |
| | | except: |
| | | logging.error("更正L2数据出错:{} key:{}".format(code, key)) |
| | | else: |
| | | json_value = json.loads(value) |
| | | if json_value["re"] != _data["re"]: |
| | | json_value["re"] = _data["re"] |
| | | redis_instance.setex(key, expire, json.dumps(json_value)) |
| | | RedisUtils.setex(redis_instance, key, expire, json.dumps(json_value), auto_free=False) |
| | | finally: |
| | | redis_instance.delete("l2-save-{}".format(code)) |
| | | RedisUtils.delete(redis_instance, "l2-save-{}".format(code), auto_free=False) |
| | | RedisUtils.realse(redis_instance) |
| | | |
| | | print("保存新数据用时:", msg, "耗时:{}".format(round(time.time() * 1000) - start_time)) |
| | | return datas |
| | |
| | | |
| | | # 保存l2数据 |
| | | def save_l2_data(code, datas, add_datas): |
| | | redis = _redisManager.getRedis() |
| | | # 只有有新曾数据才需要保存 |
| | | if len(add_datas) > 0: |
| | | # 保存最近的数据 |
| | | __start_time = round(time.time() * 1000) |
| | | redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) |
| | | l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时") |
| | | # 设置进内存 |
| | | local_latest_datas[code] = datas |
| | | __set_l2_data_latest_count(code, len(datas)) |
| | | if datas: |
| | | RedisUtils.setex_async(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) |
| | | # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时") |
| | | # 设置进内存 |
| | | local_latest_datas[code] = datas |
| | | set_l2_data_latest_count(code, len(datas)) |
| | | try: |
| | | log.logger_l2_data.info("{}-{}", code, add_datas) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | saveL2Data(code, add_datas) |
| | | # 暂时不将数据保存到redis |
| | | # saveL2Data(code, add_datas) |
| | | |
| | | |
| | | # 设置最新的l2数据采集的数量 |
| | | def __set_l2_data_latest_count(code, count): |
| | | redis = _redisManager.getRedis() |
| | | def set_l2_data_latest_count(code, count): |
| | | key = "latest-l2-count-{}".format(code) |
| | | redis.setex(key, 2, count) |
| | | RedisUtils.setex(_redisManager.getRedis(), key, 2, count) |
| | | pass |
| | | |
| | | |
| | |
| | | def get_l2_data_latest_count(code): |
| | | if code is None or len(code) < 1: |
| | | return 0 |
| | | redis = _redisManager.getRedis() |
| | | key = "latest-l2-count-{}".format(code) |
| | | |
| | | result = redis.get(key) |
| | | result = RedisUtils.get(_redisManager.getRedis(), key) |
| | | if result is None: |
| | | return 0 |
| | | else: |
| | |
| | | channel = data["channel"] |
| | | capture_time = data["captureTime"] |
| | | process_time = data["processTime"] |
| | | count = data["count"] |
| | | data = data["data"] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | |
| | | datas = L2DataUtil.format_l2_data(data, code, limit_up_price) |
| | | # 获取涨停价 |
| | | return day, client, channel, code, capture_time, process_time, datas, data |
| | | return day, client, channel, code, capture_time, process_time, data, count |
| | | |
| | | |
| | | # 元数据是否有差异 |
| | | def is_origin_data_diffrent(data1, data2): |
| | | if data1 is None or data2 is None: |
| | | return True |
| | | if len(data1) != len(data2): |
| | | return True |
| | | # 比较 |
| | | data_length = len(data1) |
| | | step = len(data1) // 10 |
| | | for i in range(0, data_length, step): |
| | | if json.dumps(data1[i]) != json.dumps(data2[i]): |
| | | return True |
| | | return False |
| | | |
| | | |
| | | # 是否为大单 |
| | |
| | | # 保存到数据库,更新re的数据 |
| | | save_list.append(_ldata) |
| | | if len(save_list) > 0: |
| | | saveL2Data(code, save_list, "保存纠正数据") |
| | | # 暂时不将数据保存到redis |
| | | # saveL2Data(code, save_list, "保存纠正数据") |
| | | local_latest_datas[code] = latest_data |
| | | return _datas |
| | | |
| | |
| | | return new_index_list[start_index], new_index_list[start_index:start_index + len(queues)] |
| | | return None, None |
| | | |
| | | # 3个数据以上的不需要判断最近的一次未涨停时间 |
| | | if len(queueList) >= 3: |
| | | latest_not_limit_up_time = None |
| | | |
| | | # 判断匹配的位置是否可信 |
| | | def is_trust(indexes): |
| | | cha = [] |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | cha = [0, 2, 4] |
| | | std_result = numpy.std(cha) |
| | | print(std_result) |
| | | print(load_l2_data("002235")) |