| | |
| | | |
| | | |
| | | def load_l2_data(code, load_latest=True, force=False): |
| | | redis = _redisManager.getRedis() |
| | | # 加载最近的l2数据 |
| | | if load_latest: |
| | | if local_latest_datas.get(code) is None or force: |
| | | # 获取最近的数据 |
| | | _data = RedisUtils.get(redis, "l2-data-latest-{}".format(code)) |
| | | _data = RedisUtils.get(_redisManager.getRedis(), "l2-data-latest-{}".format(code)) |
| | | if _data is not None: |
| | | if code in local_latest_datas: |
| | | local_latest_datas[code] = json.loads(_data) |
| | |
| | | redis_instance = _redisManager.getRedis() |
| | | |
| | | try: |
| | | if RedisUtils.setnx(redis_instance, "l2-save-{}".format(code), "1") > 0: |
| | | if RedisUtils.setnx(redis_instance, "l2-save-{}".format(code), "1", auto_free=False) > 0: |
| | | # 计算保留的时间 |
| | | expire = tool.get_expire() |
| | | i = 0 |
| | | for _data in datas: |
| | | i += 1 |
| | | key = "l2-" + _data["key"] |
| | | value = RedisUtils.get(redis_instance, key) |
| | | value = RedisUtils.get(redis_instance, key, auto_free=False) |
| | | if value is None: |
| | | # 新增 |
| | | try: |
| | | value = {"index": _data["index"], "re": _data["re"]} |
| | | RedisUtils.setex(redis_instance, key, expire, json.dumps(value)) |
| | | RedisUtils.setex(redis_instance, key, expire, json.dumps(value), auto_free=False) |
| | | except: |
| | | logging.error("更正L2数据出错:{} key:{}".format(code, key)) |
| | | else: |
| | | json_value = json.loads(value) |
| | | if json_value["re"] != _data["re"]: |
| | | json_value["re"] = _data["re"] |
| | | RedisUtils.setex(redis_instance, key, expire, json.dumps(json_value)) |
| | | RedisUtils.setex(redis_instance, key, expire, json.dumps(json_value), auto_free=False) |
| | | finally: |
| | | RedisUtils.delete(redis_instance, "l2-save-{}".format(code)) |
| | | RedisUtils.delete(redis_instance, "l2-save-{}".format(code), auto_free=False) |
| | | redis_instance.connection_pool.disconnect() |
| | | |
| | | print("保存新数据用时:", msg, "耗时:{}".format(round(time.time() * 1000) - start_time)) |
| | | return datas |
| | |
| | | |
| | | # 保存l2数据 |
| | | def save_l2_data(code, datas, add_datas): |
| | | redis = _redisManager.getRedis() |
| | | # 只有有新曾数据才需要保存 |
| | | if len(add_datas) > 0: |
| | | # 保存最近的数据 |
| | | __start_time = round(time.time() * 1000) |
| | | if datas: |
| | | RedisUtils.setex(redis, "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) |
| | | RedisUtils.setex(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) |
| | | l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时") |
| | | # 设置进内存 |
| | | local_latest_datas[code] = datas |
| | |
| | | |
| | | # 设置最新的l2数据采集的数量 |
| | | def set_l2_data_latest_count(code, count): |
| | | redis = _redisManager.getRedis() |
| | | key = "latest-l2-count-{}".format(code) |
| | | RedisUtils.setex(redis, key, 2, count) |
| | | RedisUtils.setex(_redisManager.getRedis(), key, 2, count) |
| | | pass |
| | | |
| | | |
| | |
| | | def get_l2_data_latest_count(code): |
| | | if code is None or len(code) < 1: |
| | | return 0 |
| | | redis = _redisManager.getRedis() |
| | | key = "latest-l2-count-{}".format(code) |
| | | |
| | | result = RedisUtils.get(redis, key) |
| | | result = RedisUtils.get(_redisManager.getRedis(), key) |
| | | if result is None: |
| | | return 0 |
| | | else: |