Administrator
2022-09-08 e7f8c6013d777dd5ba10b8d548d2d3db6158d37a
l2_data_manager.py
@@ -4,6 +4,7 @@
from datetime import datetime
import data_process
import l2_data_util
import mysql
import gpcode_manager
@@ -19,6 +20,9 @@
local_latest_datas = {}
# 本地今日数据
local_today_datas = {}
# 本地手数+操作那类型组成的临时变量
# 用于加快数据处理,用空换时间
local_today_num_operate_map = {}
class L2DataException(Exception):
@@ -125,26 +129,15 @@
        datas = []
        keys = redis.keys("l2-{}-*".format(code))
        for k in keys:
            key = k.replace("l2-", "")
            split_data = key.split("-")
            code = split_data[0]
            operateType = split_data[1]
            time = split_data[2]
            num = split_data[3]
            price = split_data[4]
            limitPrice = split_data[5]
            cancelTime = split_data[6]
            cancelTimeUnit = split_data[7]
            item = {"operateType": operateType, "time": time, "num": num, "price": price, "limitPrice": limitPrice,
                    "cancelTime": cancelTime, "cancelTimeUnit": cancelTimeUnit}
            value = redis.get(k)
            json_value = json.loads(value)
            _data = {"key": key, "val": item, "re": json_value["re"], "index": int(json_value["index"])}
            _data = l2_data_util.l2_data_key_2_obj(k, value)
            datas.append(_data)
        # 排序
        new_datas = sorted(datas,
                           key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        local_today_datas.setdefault(code, new_datas)
        local_today_datas[code] = new_datas
    # 根据今日数据加载
    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
def saveL2Data(code, datas):
@@ -233,9 +226,13 @@
            dataIndexs.setdefault(key, len(datas) - 1)
    for key in same_time_num:
        if same_time_num[key] > 50:
            # TODO 保存数据
            redis = _redisManager.getRedis()
            redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str)
            # 只能保存近3s的数据
            ts1 = l2_data_util.get_time_as_seconds(datas[-1]["val"]["time"])
            ts_now = l2_data_util.get_time_as_seconds(datetime.now().strftime("%H:%M:%S"))
            if abs(ts1 - ts_now) <= 3:
                # TODO 保存数据
                redis = _redisManager.getRedis()
                redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str)
    return day, client, channel, code, datas
@@ -317,7 +314,8 @@
        if len(datas) > 0:
            # 判断价格区间是否正确
            if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配")
                raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code,datas[0]["val"]["price"]))
            # 加载历史数据
            load_l2_data(code)
            # 纠正数据
@@ -326,6 +324,8 @@
            if len(add_datas) > 0:
                # 拼接数据
                local_today_datas[code].extend(add_datas)
                l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
                latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                # 时间差不能太大才能处理
                if __is_same_time(now_time_str, latest_time):
@@ -697,8 +697,17 @@
if __name__ == "__main__":
    print("big_data-{}-{}".format("123", int(round(t.time() * 1000))))
    pass
    # 删除大数据
    redis = redis_manager.RedisManager(1).getRedis()
    keys = redis.keys("big_data*")
    for key in keys:
        redis.delete(key)
    # print("big_data-{}-{}".format("123", int(round(t.time() * 1000))))
    # load_l2_data("002868")
    # keys= local_today_num_operate_map["002868"]
    # for k in keys:
    #     print(len( local_today_num_operate_map["002868"][k]))
    # pass
    # __set_buy_compute_start_data("000000", 100, 1)
    # __set_buy_compute_start_data("000000", 100)
    # __set_l2_data_latest_count("000333", 20)