Administrator
2025-05-26 b51b2ae184fad5aaf37a78903987e064f192d430
third_data/code_plate_key_manager.py
@@ -5,22 +5,27 @@
# 涨停代码关键词板块管理
import copy
import datetime
import itertools
import json
import time
import constant
from code_attribute import gpcode_manager
from db.redis_manager_delegate import RedisUtils
from third_data import kpl_block_util, kpl_api, kpl_util
from settings.trade_setting import MarketSituationManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, ContainsLimitupCodesBlocksManager
from third_data.third_blocks_manager import BlockMapManager
from utils import global_util, tool, buy_condition_util
from log_module import async_log_util
from db import redis_manager_delegate as redis_manager
from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data
from log_module.log import logger_kpl_block_can_buy, logger_kpl_jx_out, logger_kpl_jx_in, logger_debug
from log_module.log import logger_kpl_block_can_buy, logger_kpl_jx_out, logger_kpl_jx_in, logger_debug, \
    logger_kpl_latest_gaobiao
from third_data.kpl_util import KPLPlatManager
from trade import trade_manager, l2_trade_util, trade_constant
from trade import l2_trade_util, trade_constant
# 代码精选板块管理
from utils.kpl_data_db_util import KPLLimitUpDataUtil
@@ -222,8 +227,18 @@
# 开盘啦禁止交易板块管理
class KPLPlateForbiddenManager:
    __redisManager = redis_manager.RedisManager(3)
    """
    不能买的板块管理
    """
    __redis_manager = redis_manager.RedisManager(3)
    __kpl_forbidden_plates_cache = set()
    # 已经删除了的板块
    __deleted_kpl_forbidden_plates_cache = set()
    # 监控的高标板块代码字典:{"板块":{"代码1","代码2"}}
    __watch_block_high_codes = {}
    # 高标代码
    __watch_high_codes = set()
    __instance = None
@@ -237,24 +252,164 @@
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            __kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "kpl_forbidden_plates")
            cls.__kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "kpl_forbidden_plates")
            cls.__deleted_kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "deleted_kpl_forbidden_plates")
        finally:
            RedisUtils.realse(__redis)
        cls.__load_latest_gb()
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
        return cls.__redis_manager.getRedis()
    def save_plate(self, plate):
        self.__kpl_forbidden_plates_cache.add(plate)
        RedisUtils.sadd(self.__get_redis(), "kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "kpl_forbidden_plates", tool.get_expire())
        self.__deleted_kpl_forbidden_plates_cache.discard(plate)
        RedisUtils.srem(self.__get_redis(), "deleted_kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "deleted_kpl_forbidden_plates", tool.get_expire())
    def delete_plate(self, plate):
        self.__kpl_forbidden_plates_cache.discard(plate)
        RedisUtils.srem(self.__get_redis(), "kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "kpl_forbidden_plates", tool.get_expire())
        self.__deleted_kpl_forbidden_plates_cache.add(plate)
        RedisUtils.sadd(self.__get_redis(), "deleted_kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "deleted_kpl_forbidden_plates", tool.get_expire())
    def list_all(self):
        return RedisUtils.smembers(self.__get_redis(), "kpl_forbidden_plates")
    def list_all_cache(self):
        return self.__kpl_forbidden_plates_cache
    def list_all_deleted_cache(self):
        return self.__deleted_kpl_forbidden_plates_cache
    def is_in_cache(self, plate):
        if self.__kpl_forbidden_plates_cache and plate in self.__kpl_forbidden_plates_cache:
            return True
        return False
    @classmethod
    def __load_latest_gb(cls):
        """
        加载最近的市场高标
        @return:
        """
        # 获取最近10个交易日涨停的涨停数据
        dates = HistoryKDatasUtils.get_latest_trading_date_cache(10)
        if not dates:
            return
        min_date = dates[-1]
        sql = f"SELECT r.`_code`, r.`_hot_block_name`, r.`_day`, r.`_open` FROM `kpl_limit_up_record` r WHERE r.`_day`>='{min_date}'"
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_all(sql)
        code_days_map = {}
        # 每炸板
        f_code_days_map = {}
        for r in results:
            if r[0] not in code_days_map:
                code_days_map[r[0]] = set()
            code_days_map[r[0]].add(r[2])
            if not r[3]:
                if r[0] not in f_code_days_map:
                    f_code_days_map[r[0]] = set()
                f_code_days_map[r[0]].add(r[2])
        # 过滤涨停次数>=3次的数据
        target_codes = set()
        for code in code_days_map:
            if f_code_days_map.get(code) and (len(f_code_days_map.get(code)) >= 4 or (
                    tool.is_ge_code(code) and len(f_code_days_map.get(code)) >= 2)):
                # 且有3天属于连续涨停
                day_list = list(code_days_map[code])
                day_list.sort(reverse=True)
                step = 3
                has_continue = False
                for i in range(0, len(day_list) - step + 1):
                    item_list = day_list[i:i + step]
                    # 是否属于连续涨停
                    is_sub = False
                    for j in range(0, len(dates) - step):
                        if f"{dates[j:j + step]}" == f"{item_list}":
                            is_sub = True
                            break
                    if is_sub:
                        has_continue = True
                        break
                if not has_continue:
                    continue
                has_big_deal = False
                # 最近10个交易日的成交额要大于10亿
                volumes_data = HistoryKDataManager().get_history_bars(code, dates[0])
                if volumes_data:
                    for d in volumes_data[:10]:
                        if d["amount"] > 10e8:
                            has_big_deal = True
                            break
                if not has_big_deal:
                    continue
                target_codes.add(code)
        # 代码对应的板块
        code_blocks = {}
        for r in results:
            if r[0] not in target_codes:
                continue
            if r[0] not in code_blocks:
                code_blocks[r[0]] = set()
            code_blocks[r[0]].add(kpl_util.filter_block(r[1]))
        # 所有板块对应的代码集合
        block_codes = {}
        for code in code_blocks:
            for b in code_blocks[code]:
                if b in constant.KPL_INVALID_BLOCKS:
                    continue
                if b not in block_codes:
                    block_codes[b] = set()
                block_codes[b].add(code)
        print(block_codes)
        cls.__watch_block_high_codes = block_codes
        logger_kpl_latest_gaobiao.info(f"{block_codes}")
        cls.__watch_high_codes.clear()
        for b in block_codes:
            cls.__watch_high_codes |= block_codes[b]
        for k in block_codes:
            print(k, [(x, gpcode_manager.get_code_name(x)) for x in block_codes[k]])
    def get_watch_high_codes(self):
        return self.__watch_high_codes
    def get_watch_high_codes_by_block(self, b):
        return self.__watch_block_high_codes.get(b)
    def compute(self, code_rate_dict: dict):
        """
        根据比例计算需要拉黑的代码
        @param code_rate_dict: 涨幅百分数
        @return:
        """
        try:
            if self.__watch_block_high_codes:
                forbidden_blocks = set()
                for b in self.__watch_block_high_codes:
                    total_rate = 0
                    for code in self.__watch_block_high_codes[b]:
                        if code in code_rate_dict:
                            total_rate += code_rate_dict.get(code)
                    average_rate = total_rate / len(self.__watch_block_high_codes[b])
                    if average_rate < 1:
                        forbidden_blocks.add(b)
                    # async_log_util.info(logger_debug, f"板块平均涨幅 {b}-{average_rate}")
                self.__kpl_forbidden_plates_cache = forbidden_blocks
                async_log_util.info(logger_debug, f"拉黑板块:{forbidden_blocks}")
        except Exception as e:
            logger_debug.exception(e)
class LimitUpCodesPlateKeyManager:
@@ -352,19 +507,38 @@
    __top_jx_out_blocks = []
    # 精选板块流入金额
    __jx_blocks_in_money_dict = {}
    # 市场行情热度,默认为60
    __market_strong = 60
    @classmethod
    def get_jingxuan_in_block_threshold_count(cls):
        """
        获取买精选流入前几
        @return:
        """
        score = 60
        if cls.__market_strong is not None:
            score = int(cls.__market_strong)
        for info in constant.RADICAL_BUY_TOP_IN_COUNT_BY_MARKET_STRONG:
            if info[0] <= score < info[1]:
                return info[2]
        return 10
    @classmethod
    def set_market_jingxuan_blocks(cls, datas):
        """
        设置精选流入数据
        @param datas:
        @param datas:[(板块编号,板块名称,涨幅, 板块流入金额)]
        @return:
        """
        # 流入阈值
        THRESHOLD_MONEY = 100 * (tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") // 60) + 1000
        THRESHOLD_MONEY = min(THRESHOLD_MONEY, 10000)
        THRESHOLD_MONEY = max(THRESHOLD_MONEY, 1000)
        THRESHOLD_MONEY = THRESHOLD_MONEY * 10000
        # THRESHOLD_MONEY = 50 * (tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") // 60) + 1000
        # THRESHOLD_MONEY = min(THRESHOLD_MONEY, 10000)
        # THRESHOLD_MONEY = max(THRESHOLD_MONEY, 1000)
        # THRESHOLD_MONEY = THRESHOLD_MONEY * 10000
        THRESHOLD_MONEY = 0
        # 最大数量
        # MAX_COUNT = cls.get_jingxuan_in_block_threshold_count()
        cls.top_in_list_cache = datas
        blocks = set()
@@ -381,7 +555,6 @@
            if blocks & fb:
                continue
            for b in fb:
                fblock_money[b] = data[3]
            blocks |= fb
@@ -394,11 +567,19 @@
                    break
            if has_code:
                count += 1
            if count >= 10:
                break
                if count == 10:
                    strong = cls.get_market_strong()
                    if strong is None:
                        strong = 60
                    if data[3] > 3e7:
                        # 大于3千万
                        THRESHOLD_MONEY = int((1 - strong / 200) * data[3])
                    else:
                        THRESHOLD_MONEY = data[3]
            # if count >= MAX_COUNT:
            #     break
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:20]} 板块:{blocks}")
        async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:50]} 板块:{blocks}")
        blocks = list(blocks)
        blocks.sort(key=lambda x: fblock_money.get(x), reverse=True)
        cls.__top_jx_blocks = blocks
@@ -433,12 +614,43 @@
        cls.__top_jx_out_blocks = list(blocks)
    @classmethod
    def set_market_strong(cls, strong):
        """
        设置市场行情强度
        @param strong:
        @return:
        """
        cls.__market_strong = strong
    @classmethod
    def is_ignore_block_in_money(cls):
        if cls.__market_strong and cls.__market_strong >= constant.IGNORE_BLOCK_IN_MONEY_MARKET_STRONG:
            return True
        return False
    @classmethod
    def get_market_strong(cls):
        return cls.__market_strong
    @classmethod
    def get_top_market_jingxuan_blocks(cls):
        return cls.__top_jx_blocks
    @classmethod
    def get_top_market_jingxuan_out_blocks(cls):
        return cls.__top_jx_out_blocks
    @classmethod
    def get_block_info_at_block_in(cls, b):
        """
        获取板块的净流入情况
        @param b:
        @return: (板块名称,身位,流入金额)
        """
        for i in range(0, len(cls.top_in_list_cache)):
            if cls.top_in_list_cache[i][1] == b:
                return b, i, cls.top_in_list_cache[i][3]
        return b, -1, 0
    @classmethod
    def set_top_5_industry(cls, datas):
@@ -670,19 +882,7 @@
    __TargetCodePlateKeyManager = TargetCodePlateKeyManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __CodesHisReasonAndBlocksManager = CodesHisReasonAndBlocksManager()
    __CodesTradeStateManager = trade_manager.CodesTradeStateManager()
    __can_buy_compute_result_dict = {}
    @classmethod
    def __remove_from_l2(cls, code, msg):
        # 根据身位移除代码
        # return
        # 下过单的代码不移除
        if trade_manager.CodesTradeStateManager().get_trade_state_cache(code) != trade_constant.TRADE_STATE_NOT_TRADE:
            # 只要下过单的就不移除
            return
        l2_trade_util.forbidden_trade(code, msg=msg)
        logger_kpl_block_can_buy.info(msg)
    # 是否需要积极买
    @classmethod
@@ -1020,7 +1220,8 @@
    @classmethod
    def __compute_can_buy_blocks(cls, code, current_limit_up_datas, limit_up_record_datas,
                                 yesterday_current_limit_up_codes, before_blocks_dict,
                                 current_limit_up_block_codes_dict, high_level_general_code_blocks):
                                 current_limit_up_block_codes_dict, high_level_general_code_blocks, codes_delegate,
                                 codes_success):
        # 根据代码泛化板块获取泛化板块的代码集合
        high_level_general_block_codes = {}
        for c in high_level_general_code_blocks:
@@ -1036,10 +1237,7 @@
                                                             high_level_general_block_codes)
        if not blocks_compute_results:
            return False, True, f"没有找到板块", [], keys, []
        codes_delegate = set(cls.__CodesTradeStateManager.get_codes_by_trade_states_cache(
            {trade_constant.TRADE_STATE_BUY_DELEGATED, trade_constant.TRADE_STATE_BUY_PLACE_ORDER}))
        codes_success = set(cls.__CodesTradeStateManager.get_codes_by_trade_states_cache(
            {trade_constant.TRADE_STATE_BUY_SUCCESS}))
        codes = codes_delegate | codes_success
        # 统计成交代码的板块
        trade_codes_blocks_dict = {}
@@ -1118,7 +1316,7 @@
    @classmethod
    def update_can_buy_blocks(cls, code, current_limit_up_datas, limit_up_record_datas,
                              latest_current_limit_up_records,
                              before_blocks_dict, current_limit_up_block_codes_dict):
                              before_blocks_dict, current_limit_up_block_codes_dict, delegate_codes, deal_codes):
        yesterday_current_limit_up_codes = set()
        yesterday_current_limit_up_records_dict = {}
        yesterday_current_limit_up_records = latest_current_limit_up_records[0][1]
@@ -1152,11 +1350,13 @@
                                                                                                                   yesterday_current_limit_up_codes,
                                                                                                                   before_blocks_dict,
                                                                                                                   current_limit_up_block_codes_dict,
                                                                                                                   high_level_general_code_blocks)
                                                                                                                   high_level_general_code_blocks,
                                                                                                                   delegate_codes,
                                                                                                                   deal_codes)
        # 保存板块计算结果
        cls.__can_buy_compute_result_dict[code] = (
            can_buy_blocks, unique, msg, can_buy_strong_blocks, keys, active_buy_blocks)
if __name__ == "__main__":
    pass
    KPLPlateForbiddenManager().compute()