Administrator
2022-09-18 b946684114d097e937b766f986d12c7eea8edce8
l2_data_manager.py
@@ -4,10 +4,13 @@
import time as t
from datetime import datetime
import big_money_num_manager
import data_process
import global_util
import l2_data_util
import gpcode_manager
import l2_trade_factor
import redis_manager
import tool
@@ -54,41 +57,34 @@
    @staticmethod
    def delete_buy_point(code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_compute_index-{}".format(code))
        redis.delete("buy_compute_num-{}".format(code))
    # 删除买撤点数据
    @staticmethod
    def delete_buy_cancel_point(code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_cancel_compute_index-{}".format(code))
        redis.delete("buy_cancel_compute_num-{}".format(code))
        redis.delete("buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
    @staticmethod
    def get_buy_compute_start_data(code):
        redis = TradePointManager.__get_redis()
        index = redis.get("buy_compute_index-{}".format(code))
        total_num = redis.get("buy_compute_num-{}".format(code))
        if index is None:
            return None, 0
        else:
            return int(index), int(total_num)
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        if _data_json is None:
            return None, 0, None
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2]
    # 设置买入点的值
    @staticmethod
    def set_buy_compute_start_data(code, num_add, index=None):
    def set_buy_compute_start_data(code, nums, compute_index, buy_index):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        if index is not None:
            redis.setex("buy_compute_index-{}".format(code), expire, index)
        key = "buy_compute_num-{}".format(code)
        if redis.get(key) is None:
            redis.setex(key, expire, num_add)
        _key = "buy_compute_index_info-{}".format(code)
        if buy_index is not None:
            redis.setex(_key, expire, json.dumps((buy_index, nums, compute_index)))
        else:
            redis.incrby(key, num_add)
            _buy_index, _nums, _compute_index = TradePointManager.get_buy_compute_start_data(code)
            redis.setex(_key, expire, json.dumps((_buy_index, nums, compute_index)))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    @staticmethod
    def get_buy_cancel_compute_start_data(code):
        redis = TradePointManager.__get_redis()
@@ -116,6 +112,12 @@
            raise Exception("无撤买信号记录")
        nums+=num_add
        cls.set_buy_cancel_compute_start_data(code,nums,computed_index)
    # 删除买撤点数据
    @staticmethod
    def delete_buy_cancel_point(code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_cancel_compute_info-{}".format(code))
def load_l2_data(code, force=False):
@@ -360,6 +362,13 @@
            return False
        return True
    @staticmethod
    def is_index_end(code, index):
        if index >= len(local_today_datas[code]) - 1:
            return True
        else:
            return False
# L2交易数据处理器
class L2TradeDataProcessor:
@@ -396,6 +405,9 @@
                                                                   total_datas[-1],
                                                                   add_datas)
                if len(add_datas) > 0:
                    # 计算大单数量
                    cls.__compute_big_money_data(code, add_datas)
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    if L2DataUtil.is_same_time(now_time_str, latest_time):
@@ -403,45 +415,63 @@
                        state = trade_manager.get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.process_order(code, add_datas)
                            cls.__process_order(code, len(total_datas) - len(add_datas) - 3)
                        else:
                            # 未挂单
                            cls.process_not_order(code, add_datas)
                            cls.__process_not_order(code, add_datas, capture_timestamp)
                # 保存数据
                save_l2_data(code, datas, add_datas)
        finally:
            if code in cls.unreal_buy_dict:
                cls.unreal_buy_dict.pop(code)
    @classmethod
    def __compute_big_money_data(cls, code, add_datas):
        # 计算大单
        num = 0
        for data in add_datas:
            if l2_trade_factor.L2TradeFactorSourceDataUtil.is_big_money(data):
                if int(data["val"]["operateType"]) == 0:
                    num += data["re"]
                elif int(data["val"]["operateType"]) == 1:
                    num -= data["re"]
        big_money_num_manager.add_num(code, num)
    # 处理未挂单
    @classmethod
    def process_not_order(cls, code, add_datas):
    def __process_not_order(cls, code, add_datas, capture_time):
        # 获取阈值
        threshold_money = cls.__get_threshmoney(code)
        cls.__start_compute_buy(code, len(local_today_datas[code]) - len(add_datas), threshold_money, capture_time)
    # 处理已挂单
    @classmethod
    def process_order(cls, code, add_datas):
    def __process_order(cls, code, start_index, capture_time):
        if start_index < 0:
            start_index = 0
        # 获取之前是否有记录的撤买信号
        cancel_index, buy_num_for_cancel,computed_index= cls.has_order_cancel_begin_pos(code)
        buy_index, buy_num = cls.get_order_begin_pos(code)
        cancel_index, buy_num_for_cancel, computed_index = cls.__has_order_cancel_begin_pos(code)
        buy_index, buy_num = cls.__get_order_begin_pos(code)
        if cancel_index is None:
            # 无撤单信号起始点记录
            cancel_index = cls.compute_order_cancel_begin_single(code, len(add_datas) + 3, 3)
            cancel_index = cls.__compute_order_cancel_begin_single(code, start_index, 3)
            buy_num_for_cancel = 0
            computed_index=buy_index
        if cancel_index is not None:
            # 获取阈值 有买撤信号,统计撤买纯买额
            threshold_money=10000000
            cls.start_compute_cancel(code,cancel_index,computed_index,buy_num_for_cancel,threshold_money)
            threshold_money = cls.__get_threshmoney(code)
            cls.__start_compute_cancel(code, cancel_index, computed_index, buy_num_for_cancel, threshold_money,
                                       capture_time)
        else:
            # 无买撤信号,终止执行
            pass
    #开始计算撤的信号
    @classmethod
    def start_compute_cancel(cls,code,cancel_index, compute_start_index,origin_num,threshold_money):
    def __start_compute_cancel(cls, code, cancel_index, compute_start_index, origin_num, threshold_money, capture_time):
        # sure_type 0-虚拟挂买位  1-真实挂买位
        computed_index , buy_num_for_cancel,sure_type = cls.sum_buy_num_for_cancel_order(code,compute_start_index,origin_num,threshold_money)
        computed_index, buy_num_for_cancel, sure_type = cls.__sum_buy_num_for_cancel_order(code, compute_start_index,
                                                                                           origin_num, threshold_money)
        total_datas = local_today_datas[code]
        if computed_index is not None:
            # 发出撤买信号,需要撤买
@@ -449,8 +479,8 @@
                # 有虚拟下单
                # 删除虚拟下单标记
                cls.unreal_buy_dict.pop(code)
                # TODO 删除下单标记位置
                pass
                # 删除下单标记位置
                TradePointManager.delete_buy_point(code)
            else:
                # 无虚拟下单,需要执行撤单
                logger_l2_trade.info(
@@ -464,76 +494,91 @@
                    pass
            if computed_index < len(local_today_datas[code])-1:
                # TODO数据尚未处理完,重新进入下单计算流程
                cls.start_compute_buy(code,computed_index+1,0,threshold_money)
                # 数据尚未处理完,重新进入下单计算流程
                cls.__start_compute_buy(code, computed_index + 1, 0, threshold_money, capture_time)
                pass
        else:
            #无需撤买,记录撤买信号
            TradePointManager.set_buy_cancel_compute_start_data(code,buy_num_for_cancel,len(total_datas)-1,cancel_index)
            TradePointManager.set_buy_cancel_compute_start_data(code, buy_num_for_cancel, len(total_datas) - 1,
                                                                cancel_index)
            # 判断是否有虚拟下单
            unreal_buy_info=cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                logger_l2_trade.info(
                    "执行买入:{} ".format(code))
                try:
                    trade_manager.start_buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
                cls.__buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
                                            unreal_buy_info[0])
                    TradePointManager.delete_buy_cancel_point(code)
                except Exception as e:
                    pass
                pass
            else:
                #终止执行
                pass
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        logger_l2_trade.info(
            "执行买入:{} ".format(code))
        try:
            trade_manager.start_buy(code, capture_timestamp, last_data,
                                    last_data_index)
            TradePointManager.delete_buy_cancel_point(code)
        except Exception as e:
            pass
    @classmethod
    def start_compute_buy(cls,code,compute_start_index,origin_num,threshold_money):
    def __start_compute_buy(cls, code, compute_start_index, threshold_money, capture_time):
        total_datas=local_today_datas[code]
        # 获取买入信号计算起始位置
        index, num = cls.get_order_begin_pos(code)
        index, num = cls.__get_order_begin_pos(code)
        # 是否为新获取到的位置
        new_get_pos = False
        if index is None:
            # 有买入信号
            has_single, index = cls.compute_order_begin_pos(code, len(total_datas) - compute_start_index , 3)
            has_single, _index = cls.__compute_order_begin_pos(code, len(total_datas) - compute_start_index, 3)
            index = _index
            if has_single:
                num = 0
                new_get_pos = True
                # TODO 记录买入信号位置
        if index is None:
            # 未获取到买入信号,终止程序
            return None
        # 买入纯买额统计
        # TODO 获取阈值
        threshold_money=10000000
        compute_index,buy_nums = cls.sum_buy_num_for_order(code,compute_start_index,num,threshold_money)
        if compute_index is not None:
            # 达到下单条件
            # 记录买入信号位置
            cls.__save_order_begin_data(code, compute_index, buy_nums, index)
            # 虚拟下单
            cls.unreal_buy_dict[code]=(compute_index,capture_time)
            # 删除之前的所有撤单信号
            TradePointManager.delete_buy_cancel_point(code)
            # 数据是否处理完毕
            if L2DataUtil.is_index_end(code, compute_index):
                # 数据已经处理完毕,下单
                cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
        else:
            # TODO 未达到下单条件,保存纯买额,设置纯买额
                # 数据尚未处理完毕,进行下一步处理
                cls.__process_order(code, compute_index + 1, capture_time)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, len(total_datas) - 1, buy_nums, index)
        pass
    # 获取下单起始信号
    @classmethod
    def get_order_begin_pos(cls, code):
        index, num = TradePointManager.get_buy_compute_start_data(code)
    def __get_order_begin_pos(cls, code):
        index, num, compute_index = TradePointManager.get_buy_compute_start_data(code)
        return index, num
    @classmethod
    def __save_order_begin_data(self, code, compute_index, num, buy_index=None):
        TradePointManager.set_buy_compute_start_data(code, num, compute_index, buy_index)
    # 获取撤单起始位置
    @classmethod
    def has_order_cancel_begin_pos(cls):
    def __has_order_cancel_begin_pos(cls, code):
        # cancel_index:撤单信号起点
        # buy_num_for_cancel:从挂入点计算的纯买额
        # computed_index 计算的最后位置
@@ -542,7 +587,7 @@
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
    def compute_order_begin_pos(self, code, compute_data_count, continue_count):
    def __compute_order_begin_pos(self, code, compute_data_count, continue_count):
        # 倒数100条数据查询
        datas = local_today_datas[code]
        __len = len(datas)
@@ -617,17 +662,11 @@
    # 是否有撤销信号
    @classmethod
    def compute_order_cancel_begin_single(cls, code, compute_data_count, continue_count):
    def __compute_order_cancel_begin_single(cls, code, start_index, continue_count):
        datas = local_today_datas[code]
        __len = len(datas)
        if __len < continue_count:
        if len(datas) - start_index < continue_count:
            return None
        start_index = 0
        if compute_data_count > __len:
            compute_data_count = __len
        if __len > compute_data_count:
            start_index = __len - compute_data_count
        for i in range(start_index, __len - (continue_count - 1)):
            _val = datas[i]["val"]
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
@@ -654,25 +693,20 @@
                    return i
        return None
    # 保存下单位置
    def save_order_pos(self):
        pass
    # 是否可以下单
    def is_can_order(self):
    def __is_can_order(self):
        pass
    # 虚拟下单
    def unreal_order(self):
    def __unreal_order(self):
        pass
    # 设置虚拟挂买位
    def set_unreal_sure_order_pos(self):
        pass
    def __get_threshmoney(cls, code):
        l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
    # 获取预估挂买位
    @classmethod
    def get_sure_order_pos(cls, code):
    def __get_sure_order_pos(cls, code):
        index, data = TradeBuyDataManager.get_buy_sure_position(code)
        if index is None:
            return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
@@ -681,7 +715,7 @@
    # 统计买入净买量
    @classmethod
    def sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
    def __sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -703,13 +737,18 @@
    # 同一时间买入的概率计算
    @classmethod
    def get_same_time_property(cls, code):
        # TODO 与板块热度有关
    def __get_same_time_property(cls, code):
        # 计算板块热度
        industry = global_util.code_industry_map.get(code)
        if industry is not None:
            hot_num = global_util.industry_hot_num.get(industry)
            if hot_num is not None:
                return 1 - l2_trade_factor.L2TradeFactorUtil.get_industry_rate(hot_num)
        return 0.5
    # 统计买撤净买量
    @classmethod
    def sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money):
    def __sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money):
        buy_nums = origin_num
        total_datas = local_today_datas[code]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -717,8 +756,8 @@
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
        sure_type, sure_pos, sure_data = cls.get_sure_order_pos(code)
        same_time_property = cls.get_same_time_property(code)
        sure_type, sure_pos, sure_data = cls.__get_sure_order_pos(code)
        same_time_property = cls.__get_same_time_property(code)
        # 同一秒,在预估买入位之后的数据之和
        property_buy_num_count = 0
        for i in range(start_index, len(total_datas)):
@@ -751,167 +790,6 @@
            if buy_nums + property_buy_num <= threshold_num:
                return i, buy_nums + property_buy_num,sure_type
        return None, buy_nums + round(property_buy_num_count * same_time_property),sure_type
def process_data(code, datas, capture_timestamp):
    now_time_str = datetime.now().strftime("%H:%M:%S")
    __start_time = round(t.time() * 1000)
    try:
        if len(datas) > 0:
            # 判断价格区间是否正确
            if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                      "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
            # 加载历史数据
            load_l2_data(code)
            # 纠正数据
            datas = correct_data(code, datas)
            add_datas = get_add_data(code, datas)
            if len(add_datas) > 0:
                # 拼接数据
                local_today_datas[code].extend(add_datas)
                l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
            total_datas = local_today_datas[code]
            # 买入确认点处理
            TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, total_datas[-1],
                                                               add_datas)
            if len(add_datas) > 0:
                latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                # 时间差不能太大才能处理
                if __is_same_time(now_time_str, latest_time):
                    # logger.info("及时的数据,新增数据数量{}".format(len(add_datas)))
                    # 是否已经有买入开始计算点
                    c_index, c_num = TradePointManager.get_buy_compute_start_data(code)
                    if c_index is None:
                        # 判断是否出现禁止交易信号
                        forbidden = __is_have_forbidden_feature(code, len(add_datas) + 6, 6)
                        if forbidden:
                            trade_manager.forbidden_trade(code)
                        # 没有计算开始点
                        c_index = __get_limit_up_buy_start(code, len(add_datas) + 3, 3)
                        if c_index is not None:
                            logger_l2_trade.info("找到买点:{} - {}".format(code, json.dumps(total_datas[c_index])))
                            # 触发数据分析 ,获取连续涨停标记数据
                            buy_nums = 0
                            # 获取涨停价
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            last_data_index = -1
                            for i in range(c_index, len(total_datas)):
                                _val = total_datas[i]["val"]
                                # 有连续4个涨停买就标记计算起始点
                                if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0:
                                    # 涨停买
                                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                                elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                    # 涨停买撤
                                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                                if buy_nums * limit_up_price * 100 > 1000 * 10000:
                                    last_data_index = i
                                    break
                            TradePointManager.set_buy_compute_start_data(code, buy_nums, c_index)
                            if limit_up_price is not None:
                                if last_data_index > -1:
                                    # 大于1000w就买
                                    logger_l2_trade.info(
                                        "执行买入:{} - 计算结束点: {}".format(code, json.dumps(total_datas[-1])))
                                    try:
                                        trade_manager.start_buy(code, capture_timestamp, total_datas[last_data_index],
                                                                last_data_index)
                                        TradePointManager.delete_buy_cancel_point(code)
                                    except Exception as e:
                                        pass
                    else:
                        # 有计算开始点,计算新增的数据
                        buy_nums = c_num
                        last_data = None
                        last_data_index = len(total_datas) - len(add_datas) - 1
                        # 获取涨停价
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        for data in add_datas:
                            last_data_index += 1
                            _val = data["val"]
                            if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0:
                                # 涨停买
                                buy_nums += int(_val["num"]) * int(data["re"])
                            elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                # 涨停买撤
                                buy_nums -= int(_val["num"]) * int(data["re"])
                            if buy_nums * limit_up_price * 100 > 1000 * 10000:
                                last_data = data
                                break
                        TradePointManager.set_buy_compute_start_data(code, buy_nums)
                        if limit_up_price is not None:
                            if last_data is not None:
                                # 大于1000w就买
                                logger_l2_trade.info("执行买入:{} - 计算结束点: {}".format(code, json.dumps(add_datas[-1])))
                                try:
                                    trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index)
                                    TradePointManager.delete_buy_cancel_point(code)
                                except Exception as e:
                                    pass
                    if c_index is not None:
                        # 是否处于委托待成交
                        state = trade_manager.get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已经委托,检测取消接口
                            cancel_index, cancel_num = TradePointManager.get_buy_cancel_compute_start_data(code)
                            if cancel_index is None:
                                # 之前尚未监测到买撤起点
                                cancel_index = __get_limit_up_buy_cancel_start(code, len(add_datas) + 3, 3)
                                if cancel_index is not None:
                                    total_datas = local_today_datas[code]
                                    # print("找到买撤点", cancel_index, total_datas[cancel_index])
                                    logger_l2_trade.info(
                                        "找到买撤点:{} - {}".format(code, json.dumps(total_datas[cancel_index])))
                                    # 触发数据分析 ,获取连续涨停标记数据
                                    nums = 0
                                    for i in range(c_index, len(total_datas)):
                                        _val = total_datas[i]["val"]
                                        if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                            # 涨停买撤
                                            nums += int(_val["num"]) * int(total_datas[i]["re"])
                                    TradePointManager.set_buy_cancel_compute_start_data(code, nums, cancel_index)
                            else:
                                # 之前监测到了买撤销起点
                                cancel_nums_add = 0
                                for data in add_datas:
                                    _val = data["val"]
                                    if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                        # 涨停买撤
                                        cancel_nums_add += int(_val["num"]) * int(data["re"])
                                TradePointManager.set_buy_cancel_compute_start_data(code, cancel_nums_add)
                                latest_num = cancel_num + cancel_nums_add
                                # 获取涨停价
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    if latest_num * limit_up_price * 100 > 1000 * 10000:
                                        # 大于1000w就买
                                        # print("执行撤销")
                                        logger_l2_trade.info(
                                            "执行撤销:{} - {}".format(code, json.dumps(add_datas[-1])))
                                        try:
                                            trade_manager.start_cancel_buy(code)
                                            # 取消买入标识
                                            TradePointManager.delete_buy_point(code)
                                            TradePointManager.delete_buy_cancel_point(code)
                                        except Exception as e:
                                            pass
                            pass
            # 保存数据
            save_l2_data(code, datas, add_datas)
    finally:
        pass
def __get_time_second(time_str):
@@ -1168,8 +1046,8 @@
        for line in f.readlines():  # 依次读取每行
            line = line.strip()
            data = json.loads(line)
            result = __format_l2_data(data, code, 10.00)
            add_datas = get_add_data(code, result)
            result = L2DataUtil.format_l2_data(data, code, 10.00)
            add_datas = L2DataUtil.get_add_data(code, result)
            print("增加的数量:", len(add_datas))
            if len(add_datas) > 0:
                # 拼接数据