Administrator
2025-02-07 7eb1a8ed1a007d80de41d131071ee38f5872700c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
"""
板块辨识度票管理
"""
import constant
from db import mysql_data_delegate as mysql_data
from third_data import kpl_util
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import tool
 
 
@tool.singleton
class BlockSpecialCodesManager:
    __block_codes_dict = {}
    __code_blocks_dict = {}
 
    def __init__(self):
        self.mysql = mysql_data.Mysqldb()
        self.__load_data()
 
    def __load_data(self):
        results = self.mysql.select_all("select _block,_code from block_special_codes")
        temp_dict = {}
        for row in results:
            block, code = row[0], row[1]
            if block not in temp_dict:
                temp_dict[block] = set()
            if code not in self.__code_blocks_dict:
                self.__code_blocks_dict[code] = set()
            self.__code_blocks_dict[code].add(block)
            temp_dict[block].add(code)
        self.__block_codes_dict = temp_dict
 
    def get_block_codes(self, block):
        return self.__block_codes_dict.get(block)
 
    def get_code_blocks(self, code):
        return self.__code_blocks_dict.get(code)
 
    def get_code_blocks_dict(self):
        return self.__code_blocks_dict
 
    def set_block_codes_list(self, datas):
        """
        设置数据
        @param datas:[(板块,代码,代码名称,涨停次数,自由市值)]
        @return:
        """
        self.mysql.execute("delete from block_special_codes")
        for d in datas:
            sql = f"insert into block_special_codes(_block,_code,_code_name,_limit_up_count,_zyltgb,_create_time) values('{d[0]}', '{d[2]}', '{d[1]}', {d[3]}, {d[4]}, now())"
            self.mysql.execute(sql)
 
 
class AnalysisBlockSpecialCodesManager:
    __mysql = mysql_data.Mysqldb()
 
    def __get_code_blocks(self, min_day, max_day):
        results = self.__mysql.select_all(
            f"SELECT r.`_hot_block_name`, r.`_code`, COUNT(*) FROM kpl_limit_up_record  r WHERE r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' group by  r.`_hot_block_name`, r.`_code`")
        return results
 
    def __list_code_blocks(self, min_day, max_day):
        results = self.__mysql.select_all(
            f"SELECT r.`_hot_block_name`, r.`_code`, r.`_day` FROM kpl_limit_up_record  r WHERE r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%'")
        return results
 
    def __get_limit_up_info(self, min_day):
        sql = f"SELECT r.`_code`, COUNT(r.`_code`),r.`_code_name`,IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 )  FROM (SELECT * FROM kpl_limit_up_record  r ORDER BY r.`_create_time` DESC)  r WHERE r.`_day`>'{min_day}' GROUP BY r.`_code`"
        results = self.__mysql.select_all(sql)
        # {"代码":(涨停次数, 名称, 自由流通市值)}
        return {x[0]: (x[1], x[2], x[3]) for x in results}
 
    def __get_top(self, block, min_day, max_day):
        sql = f"SELECT r.`_code`,r.`_code_name`, COUNT(*), IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 )  FROM (SELECT * FROM kpl_limit_up_record  r ORDER BY r.`_create_time` DESC)  r WHERE r.`_hot_block_name`='{block}' AND r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' GROUP BY r.`_code`"
        results = self.__mysql.select_all(sql)
        results = list(results)
        info_map = {x[1]: x for x in results}
        max_count = min(10, len(results) // 2)
        results.sort(key=lambda x: x[2], reverse=True)
        names1 = set([x[0] for x in results[:max_count]])
        results.sort(key=lambda x: x[3], reverse=True)
        names2 = set([x[0] for x in results[:max_count]])
        fnames = names1 & names2
        return fnames
 
    def __get_block_map(self):
        """
        获取板块裂变
        @return:
        """
        constant.get_path_prefix()
        with open(f"{constant.get_path_prefix()}/板块对应.txt", encoding="utf-8") as f:
            lines = f.readlines()
            block_map = {}
            for line in lines:
                line = line.strip()
                if line:
                    line = line.replace(",", ",").replace("丨", "|")
                    parent_blocks = set()
                    if line.find("|") >= 0:
                        sts = line.split("|")
                        parent_blocks |= set(sts[0].split(","))
                        line = sts[1]
                    blocks = line.split(",")
                    if not blocks:
                        continue
                    if not parent_blocks:
                        parent_blocks.add(blocks[0])
                    for b in blocks:
                        b = b.strip()
                        if b not in block_map:
                            block_map[b] = set()
                        block_map[b] |= parent_blocks
                    if len(parent_blocks) > 1:
                        # parent加入
                        for b in parent_blocks:
                            if b not in block_map:
                                block_map[b] = set()
                            block_map[b].add(b)
            return block_map
 
    def test_block_map(self):
        print(self.__get_block_map())
 
    def get_block_special_codes(self):
        """
        获取板块有辨识度的代码
        @return:
        """
        trading_dates = HistoryKDatasUtils.get_latest_trading_date(9)
        max_day = trading_dates[0]
        min_day = tool.date_sub(max_day, 180)
 
        block_map = self.__get_block_map()
        # 统计最近180天涨停数据
        # [(板块名称,代码, 日期)]
        code_block_infos = self.__list_code_blocks(min_day, max_day)
 
        # 统计最近8天的涨停数据
        min_day = trading_dates[-1]
        code_block_infos_8 = self.__get_code_blocks(min_day, max_day)
        # 统计涨停次数大于3次的涨停代码
        # 格式:{"代码":涨停次数}
        count_dict = {}
        filter_codes = set()
        for d in code_block_infos_8:
            if d[1] not in count_dict:
                count_dict[d[1]] = 0
            count_dict[d[1]] += d[2]
            if count_dict[d[1]] > 3:
                filter_codes.add(d[1])
        count_dict.clear()
        code_block_infos_8.clear()
 
        # 删除数据
        temp_data_dict = {}
        for d in code_block_infos:
            if d[1] in filter_codes and int(d[2].replace("-", "")) > int(min_day.replace("-", "")):
                # 需要过滤且最近8天涨停
                continue
            block, code = d[0], d[1]
            k = f"{block}#{code}"
            if k not in temp_data_dict:
                temp_data_dict[k] = [block, code, 0]
            temp_data_dict[k][2] += 1
        code_block_infos = [temp_data_dict[k] for k in temp_data_dict]
 
        min_day = tool.date_sub(max_day, 180)
 
        code_block_dict = {}  # {"代码":{"板块": 涨停次数}}
        for b in code_block_infos:
            if b[1] not in code_block_dict:
                code_block_dict[b[1]] = {}
            bs = block_map.get(b[0])
            if not bs:
                bs = {b[0]}
            for bb in bs:
                if bb not in code_block_dict[b[1]]:
                    code_block_dict[b[1]][bb] = 0
                if tool.is_ge_code(b[1]):
                    # 创业板1次涨停算2次涨停
                    code_block_dict[b[1]][bb] += b[2] * 2
                else:
                    code_block_dict[b[1]][bb] += b[2]
        block_codes_dict = {}  # {"板块":[(代码,涨停次数)]}
        for code in code_block_dict:
            for b in code_block_dict[code]:
                if b not in block_codes_dict:
                    block_codes_dict[b] = []
                block_codes_dict[b].append((code, code_block_dict[code][b]))
 
        limit_up_info_map = self.__get_limit_up_info(min_day)
        fdatas = []
        for b in block_codes_dict:
            # if b != '机器人':
            #     continue
 
            if b in constant.KPL_INVALID_BLOCKS:
                continue
 
            code_info_list = block_codes_dict[b]
            # 取自由市值的前1/3
            code_info_list.sort(key=lambda x: float(limit_up_info_map[x[0]][2]), reverse=True)
            max_count = len(code_info_list) // 3
            zylt_list = code_info_list[:max_count]
            # 按照涨停次数排序
            zylt_list.sort(key=lambda x: x[1], reverse=True)
            zylt_list = [x[0] for x in zylt_list]
            index = 0
            # 获取股价,是否是ST
            if not zylt_list:
                continue
            juejin_results = HistoryKDatasUtils.get_gp_latest_info(zylt_list, fields="sec_id,sec_level,upper_limit")
            if juejin_results is None:
                continue
 
            juejin_result_dict = {x['sec_id']: (x['sec_id'], x['sec_level'], x['upper_limit']) for x in juejin_results}
            for code in zylt_list:
                if code_block_dict[code][b] <= 3:
                    # 累计涨停次数小于3次
                    continue
 
                if code not in juejin_result_dict:
                    # 查询不到信息
                    continue
 
                if juejin_result_dict[code][1] != 1:
                    # 非正常票
                    continue
 
                if juejin_result_dict[code][2] < 3 or juejin_result_dict[code][2] > 50:
                    # 小于3块/大于50块
                    continue
 
                index += 1
                if int(float(limit_up_info_map[code][2])) < 50:
                    continue
                # [(板块, 代码名称, 代码, 涨停次数, 自由市值)]
                fdatas.append(
                    (kpl_util.filter_block(b), limit_up_info_map[code][1], code, code_block_dict[code][b],
                     int(float(limit_up_info_map[code][2]))))
                if index >= 10:
                    break
        # BlockSpecialCodesManager().set_block_codes_list(fdatas)
        return fdatas
 
 
def update_block_special_codes():
    datas = AnalysisBlockSpecialCodesManager().get_block_special_codes()
    for d in datas:
        print(d)
    BlockSpecialCodesManager().set_block_codes_list(datas)
    return len(datas)
 
 
if __name__ == "__main__":
    # print(datas)
    datas = AnalysisBlockSpecialCodesManager().get_block_special_codes()
    print(datas)
    # print(BlockSpecialCodesManager().get_code_blocks("002582"))