Administrator
2024-11-15 b53b0f632cca75df8f39a17fab3d26caeecb2caf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""
板块辨识度票管理
"""
import constant
from db import mysql_data_delegate as mysql_data
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import tool
 
 
@tool.singleton
class BlockSpecialCodesManager:
    __block_codes_dict = {}
 
    def __init__(self):
        self.mysql = mysql_data.Mysqldb()
        self.__load_data()
 
    def __load_data(self):
        results = self.mysql.select_all("select _block,_code from block_special_codes")
        temp_dict = {}
        for row in results:
            if row[0] not in temp_dict:
                temp_dict[row[0]] = set()
            temp_dict[row[0]].add(row[1])
        self.__block_codes_dict = temp_dict
 
    def get_block_codes(self, block):
        return self.__block_codes_dict.get(block)
 
    def set_block_codes_list(self, datas):
        """
        设置数据
        @param datas:[(板块,代码,代码名称,涨停次数,自由市值)]
        @return:
        """
        self.mysql.execute("delete from block_special_codes")
        for d in datas:
            sql = f"insert into block_special_codes(_block,_code,_code_name,_limit_up_count,_zyltgb,_create_time) values('{d[0]}', '{d[2]}', '{d[1]}', {d[3]}, {d[4]}, now())"
            self.mysql.execute(sql)
 
 
class AnalysisBlockSpecialCodesManager:
    __mysql = mysql_data.Mysqldb()
 
    def __get_code_blocks(self, min_day, max_day):
        results = self.__mysql.select_all(
            f"SELECT r.`_hot_block_name`, r.`_code`, COUNT(*) FROM kpl_limit_up_record  r WHERE r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' group by  r.`_hot_block_name`, r.`_code`")
        return results
 
    def __get_limit_up_info(self, min_day):
        sql = f"SELECT r.`_code`, COUNT(r.`_code`),r.`_code_name`,IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 )  FROM (SELECT * FROM kpl_limit_up_record  r ORDER BY r.`_create_time` DESC)  r WHERE r.`_day`>'{min_day}' GROUP BY r.`_code`"
        results = self.__mysql.select_all(sql)
        # {"代码":(涨停次数, 名称, 自由流通市值)}
        return {x[0]: (x[1], x[2], x[3]) for x in results}
 
    def __get_top(self, block, min_day, max_day):
        sql = f"SELECT r.`_code`,r.`_code_name`, COUNT(*), IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 )  FROM (SELECT * FROM kpl_limit_up_record  r ORDER BY r.`_create_time` DESC)  r WHERE r.`_hot_block_name`='{block}' AND r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' GROUP BY r.`_code`"
        results = self.__mysql.select_all(sql)
        results = list(results)
        info_map = {x[1]: x for x in results}
        max_count = min(10, len(results) // 2)
        results.sort(key=lambda x: x[2], reverse=True)
        names1 = set([x[0] for x in results[:max_count]])
        results.sort(key=lambda x: x[3], reverse=True)
        names2 = set([x[0] for x in results[:max_count]])
        fnames = names1 & names2
        return fnames
 
    def __get_block_map(self):
        """
        获取板块裂变
        @return:
        """
        constant.get_path_prefix()
        with open(f"{constant.get_path_prefix()}/板块对应.txt", encoding="utf-8") as f:
            lines = f.readlines()
            block_map = {}
            for line in lines:
                line = line.strip()
                if line:
                    line = line.replace(",", ",").replace("丨", "|")
                    parent_blocks = set()
                    if line.find("|") >= 0:
                        sts = line.split("|")
                        parent_blocks |= set(sts[0].split(","))
                        line = sts[1]
                    blocks = line.split(",")
                    if not blocks:
                        continue
                    if not parent_blocks:
                        parent_blocks.add(blocks[0])
                    for b in blocks:
                        b = b.strip()
                        if b not in block_map:
                            block_map[b] = set()
                        block_map[b] |= parent_blocks
                    if len(parent_blocks) > 1:
                        # parent加入
                        for b in parent_blocks:
                            if b not in block_map:
                                block_map[b] = set()
                            block_map[b].add(b)
            return block_map
 
    def test_block_map(self):
        print(self.__get_block_map())
 
    def get_block_special_codes(self):
        """
        获取板块有辨识度的代码
        @return:
        """
        trading_dates = HistoryKDatasUtils.get_latest_trading_date(15)
        max_day = trading_dates[-1]
        min_day = tool.date_sub(max_day, 365)
 
        block_map = self.__get_block_map()
        # [(板块名称,代码, 在板块中的涨停次数)]
        code_block_infos = self.__get_code_blocks(min_day, max_day)
        code_block_dict = {}  # {"代码":{"板块": 涨停次数}}
        for b in code_block_infos:
            if b[1] not in code_block_dict:
                code_block_dict[b[1]] = {}
            bs = block_map.get(b[0])
            if not bs:
                bs = {b[0]}
            for bb in bs:
                if bb not in code_block_dict[b[1]]:
                    code_block_dict[b[1]][bb] = 0
                code_block_dict[b[1]][bb] += b[2]
        block_codes_dict = {}  # {"板块":[(代码,涨停次数)]}
        for code in code_block_dict:
            for b in code_block_dict[code]:
                if b not in block_codes_dict:
                    block_codes_dict[b] = []
                block_codes_dict[b].append((code, code_block_dict[code][b]))
 
        limit_up_info_map = self.__get_limit_up_info(min_day)
        fdatas = []
        for b in block_codes_dict:
            # if b != '人工智能':
            #     continue
 
            if b in constant.KPL_INVALID_BLOCKS:
                continue
 
            code_info_list = block_codes_dict[b]
            # code_info_list.sort(key=lambda x: x[1], reverse=True)
            max_count = len(code_info_list) // 3
            # code_info_list.sort(key=lambda x: float(limit_up_info_map[x[0]][2]), reverse=True)
            # code_info_list.sort(key=lambda x: code_block_dict[x[0]][b], reverse=True)
            zylt_list = code_info_list[:max_count]
            zylt_list = [x[0] for x in zylt_list]
            # fcodes = list(set(count_list) & set(zylt_list))
            # fcodes.sort(key=lambda x: code_block_dict[x][b], reverse=True)
            # ffcodes = fcodes[:5]
            index = 0
            # 获取股价,是否是ST
            if not zylt_list:
                continue
            juejin_results = HistoryKDatasUtils.get_gp_latest_info(zylt_list, fields="sec_id,sec_level,upper_limit")
            if juejin_results is None:
                continue
 
            juejin_result_dict = {x['sec_id']: (x['sec_id'], x['sec_level'], x['upper_limit']) for x in juejin_results}
            for code in zylt_list:
                if code_block_dict[code][b] <= 3:
                    # 累计涨停次数小于3次
                    continue
 
                if code not in juejin_result_dict:
                    # 查询不到信息
                    continue
 
                if juejin_result_dict[code][1] != 1:
                    # 非正常票
                    continue
 
                if juejin_result_dict[code][2] < 3:
                    # 小于3块
                    continue
                index += 1
                fdatas.append(
                    (b, limit_up_info_map[code][1], code, code_block_dict[code][b],
                     int(float(limit_up_info_map[code][2]))))
                if index >= 10:
                    break
        # BlockSpecialCodesManager().set_block_codes_list(fdatas)
        return fdatas
 
 
if __name__ == "__main__":
    datas = AnalysisBlockSpecialCodesManager().get_block_special_codes()
    for d in datas:
        print(d)
    # BlockSpecialCodesManager().set_block_codes_list(datas)
    print(datas)