Administrator
2022-10-21 892b50e242e3c59a738b92dfdfee1bf1ff8932f2
l2_data_manager.py
@@ -73,9 +73,9 @@
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        if _data_json is None:
            return None, None, None, 0
            return None, None, None, 0, 0
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3]
        return _data[0], _data[1], _data[2], _data[3], _data[4]
    # 设置买入点的值
    # buy_single_index 买入信号位
@@ -83,16 +83,16 @@
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums):
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        if buy_single_index is not None:
            redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums)))
            redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count)))
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums = TradePointManager.get_buy_compute_start_data(
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count = TradePointManager.get_buy_compute_start_data(
                code)
            redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums)))
            redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count)))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
@@ -265,6 +265,16 @@
        saveL2Data(code, add_datas)
# 清除l2数据
def clear_l2_data(code):
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = redis_l2.keys("l2-{}-*".format(code))
    for k in keys:
        redis_l2.delete(k)
    redis_l2.delete("l2-data-latest-{}".format(code))
class L2DataUtil:
    @classmethod
    def is_same_time(cls, time1, time2):
@@ -363,10 +373,11 @@
                else:
                    limitPrice = 0
                item["limitPrice"] = "{}".format(limitPrice)
            # 不需要非涨停数据/非跌停数据
            if int(item["limitPrice"]) == 0:
                continue
            operateType = item["operateType"]
            # 不需要非涨停买与买撤
            if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1):
                continue
            cancelTime = item["cancelTime"]
            cancelTimeUnit = item["cancelTimeUnit"]
            key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
@@ -380,6 +391,8 @@
                dataIndexs.setdefault(key, len(datas) - 1)
        l2_data_util.save_big_data(code, same_time_num, data)
        return datas
    @classmethod
    def get_time_as_second(cls, time_str):
@@ -406,6 +419,20 @@
            return False
        return True
    @classmethod
    def is_limit_up_price_sell(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 2:
            return False
        price = float(val["price"])
        num = int(val["num"])
        if price * num * 100 < 50 * 10000:
            return False
        return True
    # 是否涨停买撤
    @classmethod
    def is_limit_up_price_buy_cancel(cls, val):
@@ -420,6 +447,20 @@
        if price * num * 100 < 50 * 10000:
            return False
        return True
    # 是否卖撤
    @classmethod
    def is_sell_cancel(cls, val):
        if int(val["operateType"]) == 3:
            return True
        return False
    # 是否为卖
    @classmethod
    def is_sell(cls, val):
        if int(val["operateType"]) == 2:
            return True
        return False
# L2交易数据处理器
@@ -484,17 +525,17 @@
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    # TODO 暂时关闭处理
                    if L2DataUtil.is_same_time(now_time_str, latest_time):
                        # 判断是否已经挂单
                        state = trade_manager.get_trade_state(code)
                        start_index = len(total_datas) - len(add_datas)
                        end_index = len(total_datas) - 1
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.__process_order(code, start_index, end_index, capture_timestamp)
                        else:
                            # 未挂单
                            cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    # if L2DataUtil.is_same_time(now_time_str, latest_time):
                    #     # 判断是否已经挂单
                    #     state = trade_manager.get_trade_state(code)
                    #     start_index = len(total_datas) - len(add_datas)
                    #     end_index = len(total_datas) - 1
                    #     if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    #         # 已挂单
                    #         cls.__process_order(code, start_index, end_index, capture_timestamp)
                    #     else:
                    #         # 未挂单
                    #         cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
                # 保存数据
@@ -722,6 +763,8 @@
        except Exception as e:
            cls.debug(code, "执行买入异常:{}", str(e))
            pass
        finally:
            cls.debug(code, "m值影响因子:", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
    # 是否可以买
    @classmethod
@@ -774,10 +817,19 @@
        # 删除大群撤事件的大单
        L2BetchCancelBigNumProcessor.del_recod(code)
        L2ContinueLimitUpCountManager.del_data(code)
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
            # 取消买入标识
            TradePointManager.delete_buy_point(code)
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            # 删除大群撤事件的大单
            L2BetchCancelBigNumProcessor.del_recod(code)
        else:
            cls.__cancel_buy(code)
        L2BigNumProcessor.del_big_num_pos(code)
    @classmethod
@@ -905,7 +957,7 @@
                count += datas[i]["re"]
                if count >= continue_count:
                    return True, start
            else:
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                last_index = None
                count = 0
                start = None
@@ -931,7 +983,7 @@
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += datas[i]["re"]
            else:
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                if count >= continue_count:
                    return start, i - 1
                start = -1
@@ -967,7 +1019,7 @@
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += int(datas[i]["re"])
            else:
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                if count >= continue_count:
                    return start, i - 1
                start = -1
@@ -1323,7 +1375,7 @@
    @classmethod
    def test_can_order(cls):
        code = "002393"
        code = "000948"
        global_util.load_industry()
        limit_up_time_manager.load_limit_up_time()
@@ -1618,8 +1670,9 @@
                if need_cancel:
                    # 需要撤单
                    # 撤单
                    cls.__cancel_buy(code, max_num_data["index"])
                    L2TradeDataProcessor.cancel_debug(code, "跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index, max_num_data["index"])
                    cls.__cancel_buy(code, max_num_data["index"] if cancel_data is None else cancel_data)
                    L2TradeDataProcessor.cancel_debug(code, "原来跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index,
                                                      max_num_data["index"])
                    return True, cancel_data
                else:
                    # 无需撤单
@@ -1695,8 +1748,8 @@
            if i <= latest_buy_index:
                total_count += total_datas[i]["re"]
        L2TradeDataProcessor.debug(code, "大群撤大单数量:{}/{}", count, total_count)
        # 大单小于5笔无脑撤
        if total_count <= 5:
        # 大单小于5笔无脑撤,后修改为无大单无脑撤
        if total_count <= 0:
            return True
        # 大单撤单笔数大于总大单笔数的1/5就撤单
@@ -1788,6 +1841,287 @@
                    index_set.add(d[1])
                    big_nums_info_new.append(d)
            cls.__save_recod(code, max_big_num_info, big_nums_info_new)
# 卖跟踪
class L2SellProcessor:
    @classmethod
    def __get_recod(cls, code):
        redis = _redisManager.getRedis()
        _val = redis.get("sell_num-{}".format(code))
        if _val is None:
            return None, None
        else:
            datas = json.loads(_val)
            return datas[0], datas[1]
    @classmethod
    def del_recod(cls, code):
        redis = _redisManager.getRedis()
        key = "sell_num-{}".format(code)
        redis.delete(key)
    @classmethod
    def __save_recod(cls, code, process_index, count):
        redis = _redisManager.getRedis()
        key = "sell_num-{}".format(code)
        redis.setex(key, tool.get_expire(), json.dumps((process_index, count)))
    # 暂时弃用
    @classmethod
    def need_cancel(cls, code, start_index, end_index):
        # 是否需要撤单
        process_index, count = cls.__get_recod(code)
        if process_index is None:
            # 无卖的信息
            return False
        if count is None:
            count = 0
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            return False
        if float(limit_up_price) * count * 100 >= l2_trade_factor.L2TradeFactorUtil.get_base_safe_val(
                global_util.zyltgb_map[code]):
            return True
        return False
    @classmethod
    def process(cls, code, start_index, end_index):
        # 处理大单
        # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
        total_datas = local_today_datas[code]
        process_index, count = cls.__get_recod(code)
        # 寻找最大值
        for index in range(start_index, end_index + 1):
            # 只处理涨停卖
            if not L2DataUtil.is_limit_up_price_sell(
                    total_datas[index]["val"]):
                continue
            # 不处理历史数据
            if process_index is not None and process_index >= index:
                continue
            if count is None:
                count = 0
            count += int(total_datas[index]["val"]["num"])
        if process_index is None:
            process_index = end_index
        cls.__save_recod(code, process_index, count)
# 涨停封单额统计
class L2LimitUpMoneyStatisticUtil:
    _redisManager = redis_manager.RedisManager(1)
    @classmethod
    def __get_redis(cls):
        return cls._redisManager.getRedis()
    # 设置l2的每一秒涨停封单额数据
    @classmethod
    def __set_l2_second_money_record(cls, code, time, num, from_index, to_index):
        old_num, old_from, old_to = cls.__get_l2_second_money_record(code, time)
        if old_num is None:
            old_num = num
            old_from = from_index
            old_to = to_index
        else:
            old_num += num
            old_to = to_index
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        cls.__get_redis().setex(key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
    @classmethod
    def __get_l2_second_money_record(cls, code, time):
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        val = cls.__get_redis().get(key)
        return cls.__format_second_money_record_val(val)
    @classmethod
    def __format_second_money_record_val(cls, val):
        if val is None:
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __get_l2_second_money_record_keys(cls, code, time_regex):
        key = "l2_limit_up_second_money-{}-{}".format(code, time_regex)
        keys = cls.__get_redis().keys(key)
        return keys
    # 设置l2最新的封单额数据
    @classmethod
    def __set_l2_latest_money_record(cls, code, index, num):
        key = "l2_limit_up_money-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), json.dumps((num, index)))
    # 返回数量,索引
    @classmethod
    def __get_l2_latest_money_record(cls, code):
        key = "l2_limit_up_money-{}".format(code)
        result = cls.__get_redis().get(key)
        if result:
            result = json.loads(result)
            return result[0], result[1]
        else:
            return 0, -1
    # 矫正数据
    # 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标
    @classmethod
    def verify_num(cls, code, num, time_str):
        time_ = time_str.replace(":", "")
        key = None
        for i in range(4, -2, -2):
            # 获取本(分钟/小时/天)内秒分布数据
            time_regex = "{}*".format(time_[:i])
            keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
            if keys_ and len(keys_) > 1:
                # 需要排序
                keys = []
                for k in keys_:
                    keys.append(k)
                keys.sort(key=lambda tup: int(tup.split("-")[-1]))
                # 有2个元素
                for index in range(0, len(keys) - 1):
                    time_1 = keys[index].split("-")[-1]
                    time_2 = keys[index + 1].split("-")[-1]
                    if int(time_1) <= int(time_) <= int(time_2):
                        # 在此时间范围内
                        if time_ == time_2:
                            key = keys[index + 1]
                        else:
                            key = keys[index]
                        break
            if key:
                val = cls.__get_redis().get(key)
                old_num, old_from, old_to = cls.__format_second_money_record_val(val)
                end_index = old_to
                # 保存最近的数据
                cls.__set_l2_latest_money_record(code, end_index, num)
                break
    # 计算量,用于涨停封单量的计算
    @classmethod
    def __compute_num(cls, code, data, buy_single_data):
        if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]):
            # 涨停买撤与卖
            return 0 - int(data["val"]["num"]) * data["re"]
        else:
            # 卖撤
            if L2DataUtil.is_sell_cancel(data["val"]):
                # 卖撤的买数据是否在买入信号之前,如果在之前就不计算,不在之前就计算
                if l2_data_util.is_sell_index_before_target(data, buy_single_data,
                                                            local_today_num_operate_map.get(code)):
                    return 0
            return int(data["val"]["num"]) * data["re"]
    @classmethod
    def clear(cls, code):
        key = "l2_limit_up_money-{}".format(code)
        cls.__get_redis().delete(key)
    # 返回取消的标志数据
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, with_cancel=True):
        start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        time_dict_num = {}
        # 记录计算的坐标
        time_dict_num_index = {}
        num_dict = {}
        # 统计时间分布
        time_dict = {}
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            time_ = val["time"]
            if time_ not in time_dict:
                time_dict[time_] = i
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            time_ = val["time"]
            if time_ not in time_dict_num:
                time_dict_num[time_] = 0
                time_dict_num_index[time_] = {"s": i, "e": i}
            time_dict_num_index[time_]["e"] = i
            num = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            num_dict[i] = num
            time_dict_num[time_] = time_dict_num[time_] + num
        for t_ in time_dict_num:
            cls.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"],
                                             time_dict_num_index[t_]["e"])
        print("保存涨停封单额时间:", round(t.time() * 1000) - start_time)
        # 累计最新的金额
        total_num, index = cls.__get_l2_latest_money_record(code)
        if index == -1:
            # 没有获取到最新的矫正封单额,需要从买入信号开始点计算
            index = buy_single_begin_index - 1
            total_num = 0
        # TODO 待优化计算
        cancel_index = None
        cancel_msg = None
        # 待计算量
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        min_volumn = round(10000000 / (limit_up_price * 100))
        # 不同时间的数据开始坐标
        time_start_index_dict = {}
        # 数据时间分布
        time_list = []
        # 到当前时间累积的买1量
        time_total_num_dict = {}
        for i in range(index + 1, end_index + 1):
            data = total_datas[i]
            time_ = data["val"]["time"]
            if time_ not in time_start_index_dict:
                # 记录每一秒的开始位置
                time_start_index_dict[time_] = i
                # 记录时间分布
                time_list.append(time_)
                # 上一段时间的总数
                time_total_num_dict[time_] = total_num
            val = num_dict.get(i)
            if val is None:
                val = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            total_num += val
            # 如果是减小项,且在处理数据的范围内,就需要判断是否要撤单了
            if val < 0 and start_index <= i <= end_index:
                # 累计封单金额小于1000万
                if total_num < min_volumn:
                    cancel_index = i
                    cancel_msg = "封单金额小于1000万"
                    break
                # 相邻2s内的数据减小50%
                # 上1s的总数
                last_second_total_volumn = time_total_num_dict.get(time_list[-1])
                if last_second_total_volumn > 0 and (
                        last_second_total_volumn - total_num) / last_second_total_volumn >= 0.5:
                    # 相邻2s内的数据减小50%
                    cancel_index = i
                    cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
                                                                     total_num)
                    break
        if not with_cancel:
            cancel_index = None
        print("封单额计算时间:", round(t.time() * 1000) - start_time)
        process_end_index = end_index
        if cancel_index:
            process_end_index = cancel_index
        # 保存最新累计金额
        # cls.__set_l2_latest_money_record(code, process_end_index, total_num)
        if cancel_index:
            return total_datas[cancel_index], cancel_msg
        return None, None
def __get_time_second(time_str):
@@ -2035,4 +2369,7 @@
if __name__ == "__main__":
    L2TradeDataProcessor.test_can_order()
    # 处理数据
    code = "002898"
    load_l2_data(code)
    L2LimitUpMoneyStatisticUtil.verify_num(code, 70582, "09:42:00")