Administrator
2023-08-28 ba52d7ac92a36f413eacaa686f8535e859664ec6
l2/l2_data_manager_new.py
@@ -7,6 +7,7 @@
import constant
from db.redis_manager_delegate import RedisUtils
from l2.huaxin import l2_huaxin_util, huaxin_delegate_postion_manager
from log_module import async_log_util
from third_data import kpl_data_manager, block_info
from trade.deal_big_money_manager import DealComputeProgressManager
from utils import global_util, ths_industry_util, tool
@@ -207,7 +208,6 @@
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    volume_rate_info = {}
    l2BigNumForMProcessor = L2BigNumForMProcessor()
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager()
    __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
@@ -215,7 +215,8 @@
    __l2PlaceOrderParamsManagerDict = {}
    __last_buy_single_dict = {}
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    __latest_process_unique_keys = {}
    __latest_process_order_unique_keys = {}
    __latest_process_not_order_unique_keys = {}
    # 初始化
    __TradePointManager = l2_data_manager.TradePointManager()
    __SecondCancelBigNumComputer = SecondCancelBigNumComputer()
@@ -311,8 +312,9 @@
            # 获取下单位置
            place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, datas)
            if place_order_index:
                logger_l2_process.info("code:{} 获取到下单真实位置:{}", code, place_order_index)
                cls.__DCancelBigNumComputer.set_real_order_index(code, place_order_index)
                cls.__SecondCancelBigNumComputer.set_real_place_order_index(code, place_order_index)
                async_log_util.info(logger_l2_process, "code:{} 获取到下单真实位置:{}", code, place_order_index)
            __start_time = round(t.time() * 1000)
            if len(datas) > 0:
                cls.process_add_datas(code, datas, 0, __start_time)
@@ -416,9 +418,10 @@
                    if l2.l2_data_util.L2DataUtil.is_same_time(now_time_str, latest_time):
                        cls.__process_not_order(code, start_index, end_index, capture_timestamp, is_first_code)
            logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                   add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                   capture_timestamp)
            async_log_util.info(logger_l2_process, "code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code,
                                add_datas[0]["index"],
                                add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                capture_timestamp)
            # __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
            #                                    "l2数据处理时间")
@@ -454,10 +457,10 @@
    def __process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True):
        # 增加推出机制
        unique_key = f"{start_index}-{end_index}"
        if cls.__latest_process_unique_keys.get(code) == unique_key:
        if cls.__latest_process_order_unique_keys.get(code) == unique_key:
            logger_l2_error.error(f"重复处理数据:code-{code} start_index-{start_index} end_index-{end_index}")
            return
        cls.__latest_process_unique_keys[code] = unique_key
        cls.__latest_process_order_unique_keys[code] = unique_key
        # S撤
        def s_cancel(_buy_single_index, _buy_exec_index):
@@ -539,14 +542,17 @@
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
            code)
        # 默认量为0.2
        if buy_volume_rate is None:
            buy_volume_rate = 0.2
        # 依次处理
        cancel_data, cancel_msg = s_cancel(buy_single_index, buy_exec_index)
        if not cancel_data:
            cancel_data, cancel_msg = h_cancel(buy_single_index, buy_exec_index)
        # if not cancel_data:
        #     cancel_data, cancel_msg = h_cancel(buy_single_index, buy_exec_index)
        if not cancel_data:
            cancel_data, cancel_msg = l_cancel(buy_single_index, buy_exec_index)
        l2_log.debug(code, "撤单计算结束")
        # l2_log.debug(code, "撤单计算结束")
        # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
        #                                   "已下单-撤单 判断是否需要撤单")
        if cancel_data:
@@ -584,7 +590,7 @@
            if need_clear_data:
                trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index,
                                                         local_today_datas.get(code))
            return
            return False
        else:
            l2_log.debug(code, "可以下单,原因:{}", reason)
@@ -607,7 +613,9 @@
                l2_log.debug(code, "执行买入异常:{}", str(e))
                pass
            finally:
                l2_log.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
                # l2_log.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
                pass
            return True
    # 是否可以取消
    @classmethod
@@ -835,29 +843,29 @@
                float(open_limit_up_lowest_price) - price_pre_close) / price_pre_close < 0.05:
            return False, True, f"炸板后最低价跌至5%以下"
        limit_up_info = cls.__Buy1PriceManager.get_limit_up_info(code)
        if limit_up_info[0] is None and False:
            total_data = local_today_datas.get(code)
            buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
                code)
            # 之前没有涨停过
            # 统计买入信号位到当前位置没有撤的大单金额
            min_money_w = l2_data_util.get_big_money_val(float(total_data[buy_single_index]["val"]["price"])) // 10000
            left_big_num = cls.__SecondCancelBigNumComputer.compute_left_big_num(code,
                                                                                 buy_single_index,
                                                                                 buy_exec_index,
                                                                                 total_data[-1][
                                                                                     "index"],
                                                                                 total_data,
                                                                                 0, min_money_w)
            if left_big_num > 0:
                # 重新获取分数与分数索引
                limit_up_time = cls.__LimitUpTimeManager.get_limit_up_time_cache(code)
                if limit_up_time is None:
                    limit_up_time = tool.get_now_time_str()
                score = first_code_score_manager.get_score(code, cls.volume_rate_info[code][0], limit_up_time, True,
                                                           left_big_num)
                cls.__l2PlaceOrderParamsManagerDict[code].set_score(score)
        # limit_up_info = cls.__Buy1PriceManager.get_limit_up_info(code)
        # if limit_up_info[0] is None and False:
        #     total_data = local_today_datas.get(code)
        #     buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
        #         code)
        #     # 之前没有涨停过
        #     # 统计买入信号位到当前位置没有撤的大单金额
        #     min_money_w = l2_data_util.get_big_money_val(float(total_data[buy_single_index]["val"]["price"])) // 10000
        #     left_big_num = cls.__SecondCancelBigNumComputer.compute_left_big_num(code,
        #                                                                          buy_single_index,
        #                                                                          buy_exec_index,
        #                                                                          total_data[-1][
        #                                                                              "index"],
        #                                                                          total_data,
        #                                                                          0, min_money_w)
        #     if left_big_num > 0:
        #         # 重新获取分数与分数索引
        #         limit_up_time = cls.__LimitUpTimeManager.get_limit_up_time_cache(code)
        #         if limit_up_time is None:
        #             limit_up_time = tool.get_now_time_str()
        #         score = first_code_score_manager.get_score(code, cls.volume_rate_info[code][0], limit_up_time, True,
        #                                                    left_big_num)
        #         cls.__l2PlaceOrderParamsManagerDict[code].set_score(score)
        # logger_place_order_score.info("code={},data='score_index':{},'score_info':{}", code,
        #                               cls.__l2PlaceOrderParamsManagerDict[code].score_index,
@@ -880,56 +888,29 @@
            # with open(f"{constant.get_path_prefix()}/logs/profile/{code}_can_buy_first.txt", 'w') as f:
            #     f.write(output.getvalue())
            # return results
            return cls.can_buy_first(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code])
            return cls.can_buy_first(code, limit_up_price)
        else:
            return True, False, "在想买名单中"
    @classmethod
    def can_buy_first(cls, code, limit_up_price, score_index, score, score_info, volume_rate_info):
        def is_has_k_format(score_info):
            # (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度)
            if score_info[1][3][6][0] and not score_info[1][3][3][0]:
                return True
            if score_info[1][3][7][0]:
                return True
            return False
        if float(limit_up_price) >= constant.MAX_CODE_PRICE:
            return False, True, f"股价大于{constant.MAX_CODE_PRICE}块"
        # 9:35之前买大市值(>=80亿)票
        if int(tool.get_now_date_str("%Y%m%d")) < int("093500"):
            zyltgb = global_util.zyltgb_map.get(code)
            if zyltgb is None:
                global_data_loader.load_zyltgb()
                zyltgb = global_util.zyltgb_map.get(code)
            if zyltgb >= 80 * 100000000:
                return True, False, "{9:30:00-9:35:00}自由市值≥80亿"
    def can_buy_first(cls, code, limit_up_price):
        # 判断板块
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        plate_can_buy, msg = CodePlateKeyBuyManager.can_buy(code,
                                                            kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
                                                            kpl_data_manager.KPLLimitUpDataRecordManager.total_datas,
                                                            yesterday_codes,
                                                            block_info.get_before_blocks_dict())
        if not plate_can_buy:
            return False, True, msg
        return True, False, msg
        can_buy_result = CodePlateKeyBuyManager.can_buy(code)
        if can_buy_result is None:
            logger_debug.warning("没有获取到板块缓存,将获取板块")
            yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
            CodePlateKeyBuyManager.update_can_buy_blocks(code,
                                                         kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
                                                         kpl_data_manager.KPLLimitUpDataRecordManager.total_datas,
                                                         yesterday_codes,
                                                         block_info.get_before_blocks_dict())
            can_buy_result = CodePlateKeyBuyManager.can_buy(code)
        # if volume_rate_info[0] < 0.4:
        #     return False, True, f"量大于40%才下单,量比:{volume_rate_info[0]}"
        # 是否有K线形态(有K线形态或者天量大阳),10点后才需要判断是否有K线形态与分值
        # if int(tool.get_now_time_str().replace(":", "")) > int("100000"):
        #     has_k_format = score_info[1][3][6][0] or score_info[1][3][7][0]
        #     if not has_k_format:
        #         return False, True, f"无K线形态"
        #
        #     if score_index < 0:
        #         return False, True, f"分值:{score}未达到需要买入的分数线"
        # return True, False, ""
        if can_buy_result is None:
            return False, True, "尚未获取到板块信息"
        if not can_buy_result[0]:
            return False, True, can_buy_result[1]
        return True, False, can_buy_result[1]
    @classmethod
    def __cancel_buy(cls, code):
@@ -985,11 +966,18 @@
                            new_add=True):
        if compute_end_index < compute_start_index:
            return
        unique_key = f"{compute_start_index}-{compute_end_index}"
        if cls.__latest_process_not_order_unique_keys.get(code) == unique_key:
            logger_l2_error.error(f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}")
            return
        cls.__latest_process_not_order_unique_keys[code] = unique_key
        _start_time = tool.get_now_timestamp()
        total_datas = local_today_datas[code]
        # 处理安全笔数
        cls.__buyL2SafeCountManager.compute_left_rate(code, compute_start_index, compute_end_index, total_datas,
                                                      local_today_num_operate_map.get(code))
        # cls.__buyL2SafeCountManager.compute_left_rate(code, compute_start_index, compute_end_index, total_datas,
        #                                               local_today_num_operate_map.get(code))
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
@@ -1016,8 +1004,6 @@
                count = 0
                l2_log.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,量比:{},数据:{}", buy_single_index, compute_start_index,
                             compute_end_index, cls.volume_rate_info[code], total_datas[buy_single_index])
                # 如果是今天第一次有下单开始信号,需要设置大单起始点
                cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index)
        # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "下单信号计算时间")
@@ -1029,11 +1015,6 @@
        start_process_index = max(buy_single_index, compute_start_index)
        if new_get_single:
            start_process_index = buy_single_index
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, start_process_index,
                                          compute_end_index,
                                          gpcode_manager.get_limit_up_price(code))
        # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "计算m值大单")
@@ -1068,17 +1049,19 @@
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index,
                                        buy_nums, buy_count, max_num_set_new,
                                        cls.volume_rate_info[code][0])
            l2_log.debug(code, "__save_order_begin_data")
            cls.__LimitUpTimeManager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
            l2_log.debug(code, "save_limit_up_time")
            cls.__TradePointManager.delete_buy_cancel_point(code)
            l2_log.debug(code, "delete_buy_cancel_point")
            # 直接下单
            cls.__buy(code, capture_time, total_datas[compute_index], compute_index, is_first_code)
            ordered = cls.__buy(code, capture_time, total_datas[compute_index], compute_index, is_first_code)
            # 数据是否处理完毕
            if compute_index < compute_end_index:
                cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, is_first_code, False)
                if ordered:
                    cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, is_first_code, False)
                else:
                    cls.__start_compute_buy(code, compute_index + 1, compute_end_index, threshold_money, capture_time,
                                            is_first_code)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
@@ -1093,7 +1076,6 @@
    def __get_order_begin_pos(cls, code):
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = cls.__TradePointManager.get_buy_compute_start_data_cache(
            code)
        logger_debug.info(f"获取买入执行位置信息{code}:{buy_single_index}-{buy_exec_index}-{compute_index}-{num}-{count}")
        return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate
    # 保存下单起始信号
@@ -1173,10 +1155,6 @@
    @classmethod
    def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
                                  threshold_money, buy_single_index, max_num_set):
        def get_threshold_count():
            count = threshold_count
            return count
        _start_time = t.time()
        total_datas = local_today_datas[code]
        # is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code)
@@ -1234,9 +1212,9 @@
                    # 只统计59万以上的金额
                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count += int(total_datas[i]["re"])
                    if buy_nums >= threshold_num and buy_count >= get_threshold_count():
                        logger_l2_trade_buy.info(
                            f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count} 目标纯买单数:{get_threshold_count()}, 大单数量:{len(max_buy_num_set)}")
                    if buy_nums >= threshold_num and buy_count >= threshold_count:
                        async_log_util.info(logger_l2_trade_buy,
                                            f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count} 目标纯买单数:{threshold_count}, 大单数量:{len(max_buy_num_set)}")
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if _val["num"] >= bigger_num:
                    # 只统计59万以上的金额
@@ -1275,13 +1253,13 @@
            for i in max_buy_num_set:
                max_buy_num_set_count += total_datas[i]["re"]
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and max_buy_num_set_count >= big_num_count:
            if buy_nums >= threshold_num and buy_count >= threshold_count and trigger_buy and max_buy_num_set_count >= big_num_count:
                return i, buy_nums, buy_count, None, max_buy_num_set
        l2_log.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{} 目标大单数量:{}",
                         compute_start_index,
                         buy_nums,
                         threshold_num, buy_count, get_threshold_count(), max_buy_num_set_count, big_num_count)
                         threshold_num, buy_count, threshold_count, max_buy_num_set_count, big_num_count)
        return None, buy_nums, buy_count, None, max_buy_num_set