| | |
| | | from datetime import datetime |
| | | |
| | | import data_process |
| | | import l2_data_util |
| | | import mysql |
| | | |
| | | import gpcode_manager |
| | |
| | | local_latest_datas = {} |
| | | # 本地今日数据 |
| | | local_today_datas = {} |
| | | # 本地手数+操作那类型组成的临时变量 |
| | | # 用于加快数据处理,用空换时间 |
| | | local_today_num_operate_map = {} |
| | | |
| | | |
| | | class L2DataException(Exception): |
| | |
| | | datas = [] |
| | | keys = redis.keys("l2-{}-*".format(code)) |
| | | for k in keys: |
| | | key = k.replace("l2-", "") |
| | | split_data = key.split("-") |
| | | code = split_data[0] |
| | | operateType = split_data[1] |
| | | time = split_data[2] |
| | | num = split_data[3] |
| | | price = split_data[4] |
| | | limitPrice = split_data[5] |
| | | cancelTime = split_data[6] |
| | | cancelTimeUnit = split_data[7] |
| | | item = {"operateType": operateType, "time": time, "num": num, "price": price, "limitPrice": limitPrice, |
| | | "cancelTime": cancelTime, "cancelTimeUnit": cancelTimeUnit} |
| | | value = redis.get(k) |
| | | json_value = json.loads(value) |
| | | _data = {"key": key, "val": item, "re": json_value["re"], "index": int(json_value["index"])} |
| | | _data = l2_data_util.l2_data_key_2_obj(k, value) |
| | | datas.append(_data) |
| | | # 排序 |
| | | new_datas = sorted(datas, |
| | | key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index'))) |
| | | local_today_datas.setdefault(code, new_datas) |
| | | local_today_datas[code] = new_datas |
| | | # 根据今日数据加载 |
| | | l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) |
| | | |
| | | |
| | | def saveL2Data(code, datas): |
| | |
| | | dataIndexs.setdefault(key, len(datas) - 1) |
| | | for key in same_time_num: |
| | | if same_time_num[key] > 50: |
| | | # TODO 保存数据 |
| | | redis = _redisManager.getRedis() |
| | | redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str) |
| | | # 只能保存近3s的数据 |
| | | ts1 = l2_data_util.get_time_as_seconds(datas[-1]["val"]["time"]) |
| | | ts_now = l2_data_util.get_time_as_seconds(datetime.now().strftime("%H:%M:%S")) |
| | | if abs(ts1 - ts_now) <= 3: |
| | | # TODO 保存数据 |
| | | redis = _redisManager.getRedis() |
| | | redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str) |
| | | |
| | | return day, client, channel, code, datas |
| | | |
| | |
| | | if len(datas) > 0: |
| | | # 判断价格区间是否正确 |
| | | if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): |
| | | raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配") |
| | | |
| | | raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code,datas[0]["val"]["price"])) |
| | | # 加载历史数据 |
| | | load_l2_data(code) |
| | | # 纠正数据 |
| | |
| | | if len(add_datas) > 0: |
| | | # 拼接数据 |
| | | local_today_datas[code].extend(add_datas) |
| | | l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) |
| | | |
| | | latest_time = add_datas[len(add_datas) - 1]["val"]["time"] |
| | | # 时间差不能太大才能处理 |
| | | if __is_same_time(now_time_str, latest_time): |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print("big_data-{}-{}".format("123", int(round(t.time() * 1000)))) |
| | | pass |
| | | # 删除大数据 |
| | | redis = redis_manager.RedisManager(1).getRedis() |
| | | keys = redis.keys("big_data*") |
| | | for key in keys: |
| | | redis.delete(key) |
| | | # print("big_data-{}-{}".format("123", int(round(t.time() * 1000)))) |
| | | # load_l2_data("002868") |
| | | # keys= local_today_num_operate_map["002868"] |
| | | # for k in keys: |
| | | # print(len( local_today_num_operate_map["002868"][k])) |
| | | # pass |
| | | # __set_buy_compute_start_data("000000", 100, 1) |
| | | # __set_buy_compute_start_data("000000", 100) |
| | | # __set_l2_data_latest_count("000333", 20) |