Administrator
2025-04-10 23d160b75ebb0b9883f84c735e71539e8e10f6b7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""
新题材处理器
"""
import constant
from code_attribute import code_nature_analyse
from code_attribute.gpcode_manager import HumanRemoveForbiddenManager
from third_data import kpl_util
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
from trade import l2_trade_util, trade_record_log_util
from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
from utils import tool
from utils.kpl_data_db_util import KPLLimitUpDataUtil
 
 
@tool.singleton
class BeforeBlocksComputer:
    """
    往日题材计算器
    """
    def __init__(self):
        self.__before_blocks = set()
        self.__load_data()
 
    def __load_data(self):
        # kpl_results: [( r.`_code`, r.`_day`, r.`_hot_block_name`, r.`_blocks`,  r.`_open`)]
        kpl_results = KPLLimitUpDataUtil.get_latest_block_infos()
        days = set()
        for r in kpl_results:
            # 取60个交易日之内的题材
            days.add(r[1])
        days = list(days)
        days.sort(key=lambda x: int(x.replace("-", "")), reverse=True)
        days = days[:60]
        # {"日期":{ "题材":{代码} }}
        day_block_codes_dict = {}
        for r in kpl_results:
            if r[1] not in days:
                continue
            if r[4]:
                # 不算炸板
                continue
            # 统计每一天的题材
            day = r[1]
            block = kpl_util.filter_block(r[2])
            if block in constant.KPL_INVALID_BLOCKS:
                continue
            if day not in day_block_codes_dict:
                day_block_codes_dict[day] = {}
            if block not in day_block_codes_dict[day]:
                day_block_codes_dict[day][block] = set()
            day_block_codes_dict[day][block].add(r[0])
 
        # 每一天走的板块
        day_block_dict = {}
        for day in day_block_codes_dict:
            for b in day_block_codes_dict[day]:
                if len(day_block_codes_dict[day][b]) >= 3:
                    if day not in day_block_dict:
                        day_block_dict[day] = set()
                    day_block_dict[day].add(b)
        day_block_list = [(d, day_block_dict[d]) for d in day_block_dict]
        day_block_list.sort(key=lambda x: x[0])
        # 过去59天出现的题材
        old_blocks = set()
        for d in day_block_list:
            if d[0] == days[0]:
                continue
            old_blocks |= d[1]
        self.__before_blocks |= old_blocks
 
        # 昨天出现的题材
        # if days[0] == day_block_list[-1][0]:
        #     # 昨天出现的题材
        #     yesterday_blocks = day_block_list[-1][1]
 
    def is_new_block(self, block):
        return block not in self.__before_blocks
 
    def get_old_blocks(self):
        return self.__before_blocks
 
 
def process_new_block(code, block):
    """
    处理新题材
    @param code:
    @param block:
    @return:
    """
    add_result = LimitUpCodesBlockRecordManager().add_new_blocks(code, block)
    if add_result:
        # 新题材破前高就移黑
        k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code)
        if k_format and k_format[1]:
            # 破前高的票才会加入辨识度
            # 增加新题材是否成功, 临时将票加入辨识度
            BlockSpecialCodesManager().add_code_block_for_temp(code, block)
            if l2_trade_util.is_in_forbidden_trade_codes(code):
                l2_trade_util.remove_from_forbidden_trade_codes(code)
                # 加想买单要从黑名单移除
                trade_record_log_util.add_common_msg(code, "新题材移黑", block)
 
 
def is_can_forbidden(code):
    """
    是否可以拉黑
    @param code:
    @return:
    """
    if LimitUpCodesBlockRecordManager().has_new_block(code):
        # 有新题材
        k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code)
        if k_format and k_format[1]:
            # 突破形态
            return False
    return True