Administrator
2025-01-02 77f196d3c0c352e2d1129b82c3b0f533e1132a87
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
"""
开盘啦涨停数据管理
"""
from third_data import kpl_util, kpl_data_manager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
from utils import tool, init_data_util
 
 
def get_current_limit_up_datas(day):
    """
    获取当时涨停的数据
    @param day:
    @return:
    """
    datas = kpl_data_manager.KPLDataManager.get_from_file_cache(kpl_util.KPLDataType.LIMIT_UP, day)
    return datas
 
 
def get_history_limit_up_datas(day):
    """
    获取曾涨停的代码数据
    @param day:
    @return:
    """
    limit_up_records = kpl_data_manager.KPLLimitUpDataRecordManager.list_all(day)
    return limit_up_records
 
 
def get_today_history_limit_up_datas_cache():
    """
    获取今天的历史涨停数据
    @return:
    """
    return kpl_data_manager.KPLLimitUpDataRecordManager.total_datas
 
 
 
 
class CodeLimitUpSequenceManager:
    """
    代码身位管理
    """
    # 首板身位
    __first_block_sequence_dict = {}
 
    @classmethod
    def __get_code_blocks(cls, code):
        blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
        if not blocks:
            blocks = set()
        return blocks
 
    @classmethod
    def set_current_limit_up_datas(cls, current_limit_up_datas):
        """
        设置目前的涨停代码
        @param current_limit_up_datas:
        @return:
        """
        records = get_today_history_limit_up_datas_cache()
        # 按代码排序
        # {"代码":(代码,涨停原因, 涨停时间, 几版)}
        current_code_block_dict = {x[0]: (x[0], x[5], x[2], x[4]) for x in current_limit_up_datas}
        record_code_block_dict = {x[3]: (x[3], x[2], x[5], x[12]) for x in records}
        # 根据涨停原因统计
        # {"板块":{代码}}
        block_codes = {}
        limit_up_codes = set()
        for code in current_code_block_dict:
            bs = cls.__get_code_blocks(code)
            for b in bs:
                if b not in block_codes:
                    block_codes[b] = set()
                block_codes[b].add(code)
            limit_up_codes.add(code)
        for code in record_code_block_dict:
            bs = cls.__get_code_blocks(code)
            for b in bs:
                if b not in block_codes:
                    block_codes[b] = set()
                block_codes[b].add(code)
        # 获取上个交易日涨停的代码
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        if yesterday_codes is None:
            yesterday_codes = set()
        temp_block_sequence_dict = {}
        for code in limit_up_codes:
            bs = cls.__get_code_blocks(code)
            for b in bs:
                # 计算身位
                codes = block_codes[b]
                total_count = len(codes)
                # 统计真正涨停数
                limit_up_count = 0
                limit_up_codes_list = []
                for c in codes:
                    if c in limit_up_codes:
                        limit_up_count += 1
                        if c not in yesterday_codes:
                            limit_up_codes_list.append((c, current_code_block_dict[c][2]))
                # 获取首板代码的排位
                limit_up_codes_list.sort(key=lambda x: x[1])
                index = 1
                for i in range(0, len(limit_up_codes_list)):
                    if limit_up_codes_list[i][0] == code:
                        index = i + 1
                        break
                if code not in temp_block_sequence_dict:
                    temp_block_sequence_dict[code] = []
                temp_block_sequence_dict[code].append((b, index, total_count, limit_up_count))
        cls.__first_block_sequence_dict = temp_block_sequence_dict
 
    @classmethod
    def get_current_limit_up_sequence(cls, code):
        """
        获取代码当前的板块身位
        @param code:
        @return:[(板块名称,身位,总涨停数量,目前涨停数量)]
        """
        return cls.__first_block_sequence_dict.get(code)
 
 
class LatestLimitUpBlockManager:
    """
    最近涨停的板块管理
    """
    # 看最近7天
    __LATEST_DAY_COUNT = 7
 
    __days = []
    # 目前涨停
    __current_limit_up_day_datas = {}
    # 曾涨停
    __history_limit_up_day_datas = {}
 
    # K线数据
    __k_datas = {}
 
    # 代码的最高涨幅
    __k_max_rate = {}
 
    __code_name_dict = {}
 
    # 统计板块数据:{"day":{"板块":[(涨停数,破板数, 代码集合)]}}
    __block_day_datas = {}
 
    __instance = None
 
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(LatestLimitUpBlockManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
 
    @classmethod
    def __load_datas(cls):
        # 加载最近7天的数据
        __days = HistoryKDatasUtils.get_latest_trading_date_cache(cls.__LATEST_DAY_COUNT - 1)
        now_day = tool.get_now_date_str()
        if __days[0] != now_day:
            __days.insert(0, now_day)
        cls.__days = __days
        # 加载之前6天的涨停,曾涨停,曾涨停代码的最近6天的K线
        for day in __days:
            if day == now_day:
                continue
            limit_up_records = get_history_limit_up_datas(day)
            cls.__history_limit_up_day_datas[day] = limit_up_records
            current_limit_up_datas = get_current_limit_up_datas(day)
            cls.__current_limit_up_day_datas[day] = current_limit_up_datas
        # 获取代码的k线
        __total_codes = set()
        for d in cls.__current_limit_up_day_datas:
            __total_codes |= set([dd[3] for dd in cls.__history_limit_up_day_datas[d]])
        # 获取最近7天的k线情况
        for code in __total_codes:
            cls.__get_bars(code)
        # 统计前6天的板块信息
        for day in __days:
            if day == now_day:
                continue
            cls.__block_day_datas[day] = cls.__statistics_limit_up_block_infos_by_day(day)
 
    def set_current_limit_up_data(self, day, datas):
        self.__current_limit_up_day_datas[day] = datas
        self.__history_limit_up_day_datas[day] = get_today_history_limit_up_datas_cache()
        # 加载代码K线数据
        __total_codes = set([d[0] for d in datas])
        __total_codes |= set([d[3] for d in self.__history_limit_up_day_datas[day]])
        for code in __total_codes:
            self.__get_bars(code)
 
    @classmethod
    def __statistics_limit_up_block_infos_by_day(cls, day):
        """
        统计涨停代码信息
        @return:
        """
        # 统计板块的
        current_code_dict = {d[0]: d for d in cls.__current_limit_up_day_datas[day]}
        # history_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas[day]}
        block_codes_dict = {}
        for h in cls.__history_limit_up_day_datas[day]:
            cls.__code_name_dict[h[3]] = h[4]
            if h[2] not in block_codes_dict:
                block_codes_dict[h[2]] = set()
            block_codes_dict[h[2]].add(h[3])
        fdata = {}
        for b in block_codes_dict:
            limit_up_count = 0
            open_limit_up_count = 0
            for code in block_codes_dict[b]:
                if code in current_code_dict:
                    limit_up_count += 1
                else:
                    open_limit_up_count += 1
 
            fdata[b] = (limit_up_count, open_limit_up_count, block_codes_dict[b])
        return fdata
 
    def statistics_limit_up_block_infos(self):
        """
        统计涨停板块数据
        @return:
        """
        # 只统计今天的数据
        now_day = tool.get_now_date_str()
        try:
            block_dict = self.__statistics_limit_up_block_infos_by_day(now_day)
            self.__block_day_datas[now_day] = block_dict
        except:
            pass
        # 板块出现的天数
        block_count_dict = {}
        # 板块出现的代码
        block_codes_dict = {}
        for day in self.__block_day_datas:
            for b in self.__block_day_datas[day]:
                if b not in block_count_dict:
                    block_count_dict[b] = set()
                if b not in block_codes_dict:
                    block_codes_dict[b] = set()
                block_count_dict[b].add(day)
                block_codes_dict[b] |= self.__block_day_datas[day][b][2]
 
        block_count_list = [(k, block_count_dict[k]) for k in block_count_dict]
        block_count_list.sort(key=lambda x: x[1], reverse=True)
        block_count_list = block_count_list[:50]
        # [(涨停原因,累计涨停次数,连续次数)]
        fdatas = []
        if self.__history_limit_up_day_datas.get(now_day):
            today_records_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas.get(now_day)}
        else:
            today_records_code_dict = {}
        for d in block_count_list:
            b = d[0]
            fdata = [d[0], len(d[1])]
            temp = []
            max_continue_count = 0
            for day in self.__days:
                if day not in self.__block_day_datas:
                    continue
                if d[0] in self.__block_day_datas[day]:
                    temp.append(day)
                else:
                    c = len(temp)
                    if c > max_continue_count:
                        max_continue_count = c
                    temp.clear()
            c = len(temp)
            if c > max_continue_count:
                max_continue_count = c
            temp.clear()
            # 最大连续次数
            fdata.append(max_continue_count)
            # 最高板
            max_rate_info = None
            for code in block_codes_dict[d[0]]:
                if max_rate_info is None:
                    max_rate_info = (code, self.__k_max_rate.get(code), self.__code_name_dict.get(code))
                if max_rate_info[1] < self.__k_max_rate.get(code):
                    max_rate_info = (code, self.__k_max_rate.get(code), self.__code_name_dict.get(code))
            fdata.append(max_rate_info)
 
            # 统计今天这个板块中大于二板的代码数量
            limit_up_counts = 0
            for code in block_codes_dict[d[0]]:
                if code in today_records_code_dict and today_records_code_dict[code][12] != '首板':
                    limit_up_counts += 1
            fdata.append(limit_up_counts)
            # 获取每天的数量
            days_datas = []
            for day in self.__days:
                if day not in self.__block_day_datas:
                    continue
                binfo = self.__block_day_datas[day].get(b)
                if not binfo:
                    days_datas.append((0, 0))
                else:
                    days_datas.append((binfo[0], binfo[1]))
            fdata.append(days_datas)
            fdatas.append(fdata)
        return fdatas
 
    @classmethod
    def __get_bars(cls, code):
        """
        获取K线
        @param code:
        @return:
        """
        if code in cls.__k_datas:
            return cls.__k_datas[code]
        volumes_data = None
        if cls.__days:
            volumes_data = HistoryKDataManager().get_history_bars(code, cls.__days[1])
            if volumes_data:
                volumes_data = volumes_data[:cls.__LATEST_DAY_COUNT - 1]
                cls.__k_datas[code] = volumes_data
        if not volumes_data:
            volumes_data = init_data_util.get_volumns_by_code(code, cls.__LATEST_DAY_COUNT - 1)
            if volumes_data:
                cls.__k_datas[code] = volumes_data
        # 获取最大涨幅
        min_price = volumes_data[-1]["low"]
        for d in volumes_data:
            if min_price > d["low"]:
                min_price = d["low"]
        rate = int((volumes_data[0]["close"] - min_price) * 100 / min_price)
        cls.__k_max_rate[code] = rate
        return cls.__k_datas.get(code)