Administrator
2025-04-11 f09823c8aa1ac777d634d37ab29c0ad85386a06e
新题材重新定义
6个文件已修改
377 ■■■■■ 已修改文件
servers/data_server.py 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_constant.py 116 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/block_special_codes_manager.py 62 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/new_block_processor.py 136 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/radical_buy_data_manager.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/current_price_process_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/data_server.py
@@ -990,6 +990,13 @@
            for k in code_blocks_dict:
                fdata[k] = list(code_blocks_dict[k])
            response_data = json.dumps({"code": 0, "data": fdata})
        elif url.path == "/get_new_blocks_special_codes":
            # 获取所有辨识度的代码
            code_blocks_dict = BlockSpecialCodesManager().get_temp_code_blocks_dict()
            fdata = {}
            for k in code_blocks_dict:
                fdata[k] = list(code_blocks_dict[k])
            response_data = json.dumps({"code": 0, "data": fdata})
        async_log_util.info(logger_request_api, f"结束请求{tool.get_thread_id()}-{url}")
        self.send_response(200)
@@ -1023,7 +1030,7 @@
        def do_limit_up(result_list_):
            def request_new_blocks_codes(blocks_info):
            def request_new_blocks_codes(blocks_info, all_new_blocks):
                """
                请求新板块的代码
                @param blocks_info:[(板块名称,板块代码)]
@@ -1045,8 +1052,7 @@
                        code_info_list.append((d[0], d[6]))
                    if code_info_list:
                        # 将代码加入新题材
                        for x in code_info_list:
                            new_block_processor.process_new_block(x[0], bi[0])
                        new_block_processor.process_new_block_by_component_codes(bi[0], set([x[0] for x in code_info_list]), all_new_blocks)
            try:
@@ -1122,39 +1128,27 @@
                        pass
                    try:
                        records = KPLLimitUpDataRecordManager.total_datas
                        # 计算今日新增的题材概念
                        block_codes = {}
                        # 统计板块的代码
                        block_plate_code_dict = {}
                        for x in records:
                            bs = {kpl_util.filter_block(x[2])}
                            block_plate_code_dict[kpl_util.filter_block(x[2])] = x[15]
                            # if x[6]:
                            #     bs |= set(x[6].split("、"))
                            for b in bs:
                                if b not in block_codes:
                                    block_codes[b] = set()
                                block_codes[b].add(x[3])
                        # 所有代码的涨停原因
                        reasons = set(block_codes.keys())
                        reasons -= constant.KPL_INVALID_BLOCKS
                        # 原来的老题材
                        reasons -= BeforeBlocksComputer().get_old_blocks()
                        if reasons:
                        # 新题材
                        new_block_codes = new_block_processor.screen_new_blocks_with_limit_up_datas([(x[0], x[5]) for x in result_list_])
                        if new_block_codes:
                            # 统计板块的代码
                            records = KPLLimitUpDataRecordManager.total_datas
                            block_plate_code_dict = {}
                            for x in records:
                                block_plate_code_dict[kpl_util.filter_block(x[2])] = x[15]
                            # 新板块
                            update_new_block_plates = []
                            for r in reasons:
                                for c in block_codes[r]:
                                    new_block_processor.process_new_block(c, r)
                            for b in new_block_codes:
                                for c in new_block_codes[b]:
                                    new_block_processor.process_new_block_by_limit_up_list(c, b)
                            for r in reasons:
                            for r in new_block_codes:
                                if r in block_plate_code_dict:
                                    update_new_block_plates.append((r, block_plate_code_dict[r]))
                            if update_new_block_plates:
                                # 需要获取板块下的代码
                                self.__new_blocks_codes_request_thread_pool.submit(
                                    lambda: request_new_blocks_codes(update_new_block_plates))
                                    lambda: request_new_blocks_codes(update_new_block_plates, new_block_codes.keys()))
                    except:
                        pass
                    self.__kplDataManager.save_data(type_, result_list_)
third_data/kpl_data_constant.py
@@ -2,11 +2,9 @@
import constant
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util
from log_module.log import logger_debug
from third_data import kpl_util
from third_data.third_blocks_manager import BlockMapManager
from trade import trade_record_log_util
from utils import tool, global_util
from utils.kpl_data_db_util import KPLLimitUpDataUtil
@@ -15,6 +13,49 @@
from utils.tool import singleton
open_limit_up_code_dict_for_radical_buy = None
# 新题材
new_blocks = set()
# 涨停列表新题材: {"代码":"题材"}
limit_up_code_new_block_dict = {}
# 成分股新题材(没涨停) {"代码":{"题材"}}
limit_up_component_code_new_blocks_dict = {}
def get_new_blocks(code):
    """
    获取新概念
    @return:
    """
    blocks = set()
    if code in limit_up_component_code_new_blocks_dict:
        blocks |= limit_up_component_code_new_blocks_dict[code]
    if code in limit_up_code_new_block_dict:
        blocks.add(limit_up_code_new_block_dict[code])
    return blocks
def is_new_block(block):
    """
    是否是新题材
    @param block:
    @return:
    """
    if block in new_blocks:
        return True
    return False
def has_new_block(code):
    """
    是否有新题材
    @param code:
    @return:
    """
    if get_new_blocks(code):
        return True
    return False
@tool.singleton
@@ -70,12 +111,6 @@
    __radical_buy_reasons_dict = {}
    # 原始数据
    __radical_buy_reasons_origin_data_dict = {}
    # 今日的新概念
    __new_blocks = set()
    # 代码的新题材
    __code_new_blocks = {}
    __instance = None
@@ -204,64 +239,11 @@
        @param code:
        @return:
        """
        if code in self.__radical_buy_reasons_dict:
            return set(self.__radical_buy_reasons_dict.get(code))
        return None
    def add_new_blocks(self, code, block):
        """
        添加新题材的板块
        @param code:
        @param block:
        @return: 返回增加新题材是否成功
        """
        # 自由流通股本要大于50亿
        zyltgb = global_util.zyltgb_map.get(code)
        if not zyltgb or zyltgb < 10e8:
            return False
        if block in constant.KPL_INVALID_BLOCKS:
            return False
        old_blocks = self.__radical_buy_reasons_dict.get(code)
        if old_blocks is None:
            old_blocks = set()
        if block in old_blocks:
            return False
        self.__new_blocks.add(block)
        if code not in self.__code_new_blocks:
            self.__code_new_blocks[code] = set()
        self.__code_new_blocks[code].add(block)
        old_blocks.add(block)
        async_log_util.info(logger_debug, f"今日新增概念:{code} - {block}")
        self.__radical_buy_reasons_dict[code] = old_blocks
        return True
    def get_new_blocks(self):
        """
        获取新概念
        @return:
        """
        return self.__new_blocks
    def is_new_block(self, block):
        """
        是否是新题材
        @param block:
        @return:
        """
        if self.__new_blocks and block in self.__new_blocks:
            return True
        return False
    def has_new_block(self, code):
        """
        是否有新题材
        @param code:
        @return:
        """
        if self.__code_new_blocks.get(code):
            return True
        return False
        blocks = set(self.__radical_buy_reasons_dict.get(code, set()))
        # 获取代码的板块
        # 获取代码的新题材
        new_blocks = get_new_blocks(code)
        return blocks | new_blocks
class TodayLimitUpReasonChangeManager:
trade/buy_radical/block_special_codes_manager.py
@@ -8,6 +8,7 @@
from third_data import kpl_util
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
from trade import trade_record_log_util
from utils import tool
@@ -15,6 +16,9 @@
class BlockSpecialCodesManager:
    __block_codes_dict = {}
    __code_blocks_dict = {}
    __temp_block_codes_dict = {}
    __temp_code_blocks_dict = {}
    def __init__(self):
        self.mysql = mysql_data.Mysqldb()
@@ -34,30 +38,56 @@
        self.__block_codes_dict = temp_dict
    def get_block_codes(self, block):
        return self.__block_codes_dict.get(block)
        return self.__block_codes_dict.get(block, set()) | self.__temp_block_codes_dict.get(block, set())
    def get_code_blocks(self, code):
        return self.__code_blocks_dict.get(code)
        return self.__code_blocks_dict.get(code, set()) | self.__temp_code_blocks_dict.get(code, set())
    def get_code_blocks_dict(self):
        return self.__code_blocks_dict
        fdict = {}
        for code in self.__code_blocks_dict:
            fdict[code] = set(self.__code_blocks_dict.get(code))
    def add_code_block_for_temp(self, code, block):
        for code in self.__temp_code_blocks_dict:
            if code not in fdict:
                fdict[code] = set()
            fdict[code] |= self.__temp_code_blocks_dict.get(code)
        return fdict
    def get_temp_code_blocks_dict(self):
        fdict = {}
        for code in self.__temp_code_blocks_dict:
            if code not in fdict:
                fdict[code] = set()
            fdict[code] |= self.__temp_code_blocks_dict.get(code)
        return fdict
    def set_code_blocks_for_temp(self, code, blocks):
        """
        临时添加代码的辨识度板块
        @param code:
        @param block:
        @param code: 带啊吗
        @param blocks: 板块
        @return:
        """
        if code not in self.__code_blocks_dict:
            self.__code_blocks_dict[code] = set()
        self.__code_blocks_dict[code].add(block)
        old_blocks = self.__temp_code_blocks_dict.get(code, set())
        self.__temp_code_blocks_dict[code] = set(blocks)
        if block not in self.__block_codes_dict:
            self.__block_codes_dict[block] = set()
        self.__block_codes_dict[block].add(code)
        add_blocks = blocks - old_blocks
        del_blocks = old_blocks - blocks
        if add_blocks:
            for b in add_blocks:
                if b not in self.__temp_block_codes_dict:
                    self.__temp_block_codes_dict[b] = set()
                self.__temp_block_codes_dict[b].add(code)
        if del_blocks:
            for b in del_blocks:
                if b not in self.__temp_block_codes_dict:
                    self.__temp_block_codes_dict[b] = set()
                self.__temp_block_codes_dict[b].discard(code)
        if add_blocks or del_blocks:
            trade_record_log_util.add_temp_special_codes(code, f"新题材辨识度:{blocks}")
    def set_block_codes_list(self, datas):
        """
@@ -69,8 +99,6 @@
        for d in datas:
            sql = f"insert into block_special_codes(_block,_code,_code_name,_limit_up_count,_zyltgb,_create_time) values('{d[0]}', '{d[2]}', '{d[1]}', {d[3]}, {d[4]}, now())"
            self.mysql.execute(sql)
class AnalysisBlockSpecialCodesManager:
@@ -316,8 +344,6 @@
            zylt_list.sort(key=lambda x: x[1], reverse=True)
            zylt_list = [x[0] for x in zylt_list]
            index = 0
            # 获取股价,是否是ST
            if not zylt_list:
trade/buy_radical/new_block_processor.py
@@ -3,13 +3,14 @@
"""
import constant
from code_attribute import code_nature_analyse
from code_attribute.gpcode_manager import HumanRemoveForbiddenManager
from third_data import kpl_util
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
from third_data import kpl_util, kpl_data_constant
from trade import l2_trade_util, trade_record_log_util
from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
from utils import tool
from utils import tool, global_util
from utils.kpl_data_db_util import KPLLimitUpDataUtil
# 新题材的最少涨停个数
__MIN_LIMIT_UP_COUNT = 1
@tool.singleton
@@ -17,6 +18,7 @@
    """
    往日题材计算器
    """
    def __init__(self):
        self.__before_blocks = set()
        self.__load_data()
@@ -80,26 +82,117 @@
        return self.__before_blocks
def process_new_block(code, block):
def screen_new_blocks_with_limit_up_datas(limit_up_datas):
    """
    处理新题材
    从涨停代码筛选出新题材
    @param limit_up_datas:[(代码,涨停原因)]
    @return:{"新题材":{新题材的代码}}
    """
    block_codes_dict = {}
    for d in limit_up_datas:
        b = kpl_util.filter_block(d[1])
        if b not in block_codes_dict:
            block_codes_dict[b] = set()
        block_codes_dict[b].add(d[0])
    blocks = set()
    for b in block_codes_dict:
        if b in constant.KPL_INVALID_BLOCKS:
            continue
        if len(block_codes_dict[b]) >= __MIN_LIMIT_UP_COUNT:
            # 涨停代码数大于2个的原因
            blocks.add(b)
    old_blocks = BeforeBlocksComputer().get_old_blocks()
    # 新题材
    new_blocks = blocks - old_blocks
    return {x: block_codes_dict[x] for x in new_blocks}
def __is_can_add_new_block(code):
    """
    是否可以添加到新板块
    @param code:
    @return:
    """
    zyltgb = global_util.zyltgb_map.get(code)
    if zyltgb and zyltgb < 10e8:
        return False
    k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code)
    if not constant.TEST:
        if not k_format or not k_format[1][0]:
            return False
    return True
def process_new_block_by_limit_up_list(code, block):
    """
    根据涨停列表处理新题材
    @param code:
    @param block:
    @return:
    """
    add_result = LimitUpCodesBlockRecordManager().add_new_blocks(code, block)
    if add_result:
        # 新题材破前高就移黑
        k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code)
        if k_format and k_format[1][0]:
            # 破前高的票才会加入辨识度
            # 增加新题材是否成功, 临时将票加入辨识度
            trade_record_log_util.add_temp_special_codes(code, f"新题材辨识度:{block}")
            BlockSpecialCodesManager().add_code_block_for_temp(code, block)
            if l2_trade_util.is_in_forbidden_trade_codes(code):
                l2_trade_util.remove_from_forbidden_trade_codes(code)
                # 加想买单要从黑名单移除
                trade_record_log_util.add_common_msg(code, "新题材移黑", block)
    # 不用删除炸板代码
    if __is_can_add_new_block(code):
        if kpl_data_constant.limit_up_code_new_block_dict.get(code, "") != block:
            # 题材有变化
            kpl_data_constant.limit_up_code_new_block_dict[code] = block
            __sync_data_to_special_codes(code)
            blocks = set()
            for k in kpl_data_constant.limit_up_code_new_block_dict:
                blocks.add(kpl_data_constant.limit_up_code_new_block_dict[k])
            kpl_data_constant.new_blocks = blocks
def process_new_block_by_component_codes(block, codes, all_new_blocks):
    """
    根据成分股处理新题材
    @param block: 板块
    @param codes: 成分股
    @param all_new_blocks: 所有的新题材
    @return:
    """
    # 清除这个新题材下的代码
    for c in kpl_data_constant.limit_up_component_code_new_blocks_dict:
        if block in kpl_data_constant.limit_up_component_code_new_blocks_dict[c] and c not in codes:
            # 原来的代码中包含该板块,而当前板块的辨识度不包含该代码
            kpl_data_constant.limit_up_component_code_new_blocks_dict[c].discard(block)
    # 清除不是新题材的代码
    for c in kpl_data_constant.limit_up_component_code_new_blocks_dict:
        # 原来的板块
        diff = kpl_data_constant.limit_up_component_code_new_blocks_dict[c] - all_new_blocks
        if diff:
            for d in diff:
                kpl_data_constant.limit_up_component_code_new_blocks_dict[c].discard(d)
    # 在代码中加入新题材
    for c in codes:
        if __is_can_add_new_block(c):
            if c not in kpl_data_constant.limit_up_component_code_new_blocks_dict:
                kpl_data_constant.limit_up_component_code_new_blocks_dict[c] = set()
            kpl_data_constant.limit_up_component_code_new_blocks_dict[c].add(block)
            __sync_data_to_special_codes(c)
def __sync_data_to_special_codes(code):
    """
    同步数据到辨识度
    @param code:
    @return:
    """
    blocks = set()
    if code in kpl_data_constant.limit_up_component_code_new_blocks_dict:
        blocks |= kpl_data_constant.limit_up_component_code_new_blocks_dict[code]
    if code in kpl_data_constant.limit_up_code_new_block_dict:
        blocks.add(kpl_data_constant.limit_up_code_new_block_dict[code])
    if blocks and l2_trade_util.is_in_forbidden_trade_codes(code):
        l2_trade_util.remove_from_forbidden_trade_codes(code)
        # 加想买单要从黑名单移除
        trade_record_log_util.add_common_msg(code, "新题材移黑", f"{blocks}")
    # 新板块
    if constant.TEST:
        print(code,kpl_data_constant.limit_up_code_new_block_dict.get(code),  kpl_data_constant.limit_up_component_code_new_blocks_dict.get(code))
    BlockSpecialCodesManager().set_code_blocks_for_temp(code, blocks)
def is_can_forbidden(code):
@@ -108,11 +201,10 @@
    @param code:
    @return:
    """
    if LimitUpCodesBlockRecordManager().has_new_block(code):
    if kpl_data_constant.has_new_block(code):
        # 有新题材
        k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code)
        if k_format and k_format[1][0]:
            # 突破形态
            return False
    return True
    return True
trade/buy_radical/radical_buy_data_manager.py
@@ -806,7 +806,7 @@
        @return:
        """
        if not LimitUpCodesBlockRecordManager().is_new_block(block):
        if not kpl_data_constant.is_new_block(block):
            return False, "非新题材"
        # 9:45点之前涨停的才能买入
@@ -992,7 +992,7 @@
        @return:
        """
        if not LimitUpCodesBlockRecordManager().is_new_block(block):
        if not kpl_data_constant.is_new_block(block):
            return False, "非新题材"
        # 开始买的身位 2:从老三开始买  1: 从老二开始买
@@ -1123,7 +1123,7 @@
        @return:
        """
        if not LimitUpCodesBlockRecordManager().is_new_block(block):
        if not kpl_data_constant.is_new_block(block):
            return False, "非新题材"
        # 获取身位
@@ -1290,7 +1290,7 @@
        if jx_in_blocks and block in jx_in_blocks:
            return True, False
        if LimitUpCodesBlockRecordManager().is_new_block(block):
        if kpl_data_constant.is_new_block(block):
            # 今日新板块不考虑净流入
            return True, False
        # 10点之前不考虑净流入
@@ -1452,7 +1452,7 @@
            delete_blocks = set()
            for b in can_buy_blocks:
                # 板块
                if LimitUpCodesBlockRecordManager().is_new_block(b):
                if kpl_data_constant.is_new_block(b):
                    if not is_new_high:
                        msges.append(f"【{b}】:新题材没破前高")
                        delete_blocks.add(b)
trade/current_price_process_manager.py
@@ -10,6 +10,7 @@
from log_module.log import logger_l2_codes_subscript, logger_debug
import constant
from code_attribute import gpcode_manager
from third_data import kpl_data_constant
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, LimitUpDataConstant
from trade.buy_money_count_setting import BuyMoneyUtil
@@ -101,7 +102,7 @@
                        return 200
            else:
                # 没有辨识度,新板块订阅前3
                new_blocks = LimitUpCodesBlockRecordManager().get_new_blocks()
                new_blocks = kpl_data_constant.get_new_blocks(code)
                if new_blocks and b in new_blocks:
                    info = RadicalBuyBlockManager().get_history_index(code, b, yesterday_limit_up_codes)
                    if info[0] > 0: