Administrator
2025-05-30 a8002450d41c721d2bac7e9bd8d5c9014d9b2391
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
import copy
import json
import logging
import os
import threading
import time
 
import requests
 
import constant
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util, log
from third_data.kpl_data_constant import LimitUpDataConstant, TodayLimitUpReasonChangeManager
from utils import tool
 
# 开盘啦历史涨停数据管理
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from log_module.log import logger_kpl_limit_up_reason_change, logger_debug, logger_kpl_limit_up, \
    logger_kpl_open_limit_up
from third_data import kpl_util, kpl_api
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager, CodesHisReasonAndBlocksManager
 
# 代码对应的涨停原因保存
from third_data.kpl_util import KPLPlatManager, KPLDataType
 
 
class KPLCodeLimitUpReasonManager:
    __redisManager = redis_manager.RedisManager(3)
 
    def __get_redis(self):
        return self.__redisManager.getRedis()
 
    def save_reason(self, code, reason):
        RedisUtils.setex(self.__get_redis(), f"kpl_limitup_reason-{code}", tool.get_expire(), reason)
 
    def list_all(self):
        keys = RedisUtils.keys(self.__get_redis(), "kpl_limitup_reason-*")
        dict_ = {}
        for k in keys:
            val = RedisUtils.get(self.__get_redis(), k)
            dict_[k.split("-")[1]] = val
        return dict_
 
 
class KPLLimitUpDataRecordManager:
    total_datas = None
    latest_datas = {}
    latest_origin_datas = []
    __kplPlatManager = KPLPlatManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
    __current_code_reasons_dict = {}
    # 当前涨停原因+推荐原因的代码集合
    __current_reason_codes_dict = {}
    # 当前涨停原因的代码集合
    __current_limit_up_reason_codes_dict = {}
    __records_cache = {}
    record_code_dict = {}
 
    @classmethod
    def __load_hist_and_blocks(cls, code):
        # 有数据新增,加载历史原因与板块
        his_reasons = cls.get_latest_infos(code, 10, False)
        his_reasons = set([r[0] for r in his_reasons])
        cls.__CodesPlateKeysManager.set_history_limit_up_reason(code, his_reasons)
        try:
            if not cls.__CodesPlateKeysManager.get_blocks(code):
                results = kpl_api.getStockIDPlate(code)
                bs = [r[1] for r in results]
                cls.__CodesPlateKeysManager.set_blocks(code, bs)
        except Exception as e:
            pass
 
    @classmethod
    def save_record(cls, day, records, set_not_open=False):
        """
        @param day:
        @param records:
        @param set_not_open: 是否需要设置炸板与否
        @return:
        """
        # 统计炸板
        try:
            last_codes = set()
            if cls.latest_origin_datas:
                last_codes = set([x[0] for x in cls.latest_origin_datas])
            now_codes = set()
            if records:
                now_codes = set([x[0] for x in records])
            open_limit_up_codes = last_codes - now_codes
            if open_limit_up_codes:
                logger_kpl_open_limit_up.info(f"炸板代码:{open_limit_up_codes}")
        except Exception as e:
            pass
 
        # 统计代码所属板块
        code_block_dict = {}
        for data in records:
            cls.record_code_dict[data[0]] = data
            blocks = set(data[5].split("、"))
            code = data[0]
            for b in blocks:
                if not code_block_dict.get(code):
                    code_block_dict[code] = set()
                code_block_dict[code].add(b)
                # 设置涨停数据
        if records:
            cls.latest_origin_datas = records
            cls.__LimitUpCodesPlateKeyManager.set_today_limit_up(
                [(r[0], r[5], r[6].split('、') if r[6] else []) for r in records])
            LimitUpDataConstant.set_current_limit_up_datas(records)
 
        code_reasons_dict = {}
        reason_codes_dict = {}
        limit_up_reason_codes_dict = {}
        for d in records:
            if d[5] not in limit_up_reason_codes_dict:
                limit_up_reason_codes_dict[d[5]] = set()
            limit_up_reason_codes_dict[d[5]].add(d[0])
 
            # 涨停原因 + 推荐原因
            bs = {d[5]}
            if d[6]:
                bs |= set(d[6].split("、"))
            code_reasons_dict[d[0]] = bs
            for b in bs:
                if b not in reason_codes_dict:
                    reason_codes_dict[b] = set()
                reason_codes_dict[b].add(d[0])
        cls.__current_code_reasons_dict = code_reasons_dict
        cls.__current_reason_codes_dict = reason_codes_dict
        cls.__current_limit_up_reason_codes_dict = limit_up_reason_codes_dict
 
        # 涨停数据记录
        mysqldb = mysql_data.Mysqldb()
        # 统计涨停原因和概念代码
        plats = {}
        for d in records:
            plats[d[5]] = d[9]
        for p in plats:
            cls.__kplPlatManager.save_plat(plats[p], p)
 
        for d in records:
            # (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量)
            code = d[0]
            _id = f"{day}_{code}_{d[5]}"
 
            result = mysqldb.select_one("select * from kpl_limit_up_record where _id='{}'".format(_id))
            if not result:
                mysqldb.execute(
                    f"insert into kpl_limit_up_record(_id,_day,_hot_block_name,_code,_code_name,_limit_up_time,_blocks,_latest_limit_up_time,_update_time,_create_time,_hot_block_code_count,_limit_up_high_info,_zylt_val,_hot_block_code) values('{_id}','{day}','{d[5]}','{d[0]}','{d[1]}','{d[2]}','{d[6]}','{d[3]}',now(),now(),{d[10]},'{d[4]}',{d[7]},{d[9]})")
                cls.__load_hist_and_blocks(code)
            else:
                if _id in cls.latest_datas and json.dumps(cls.latest_datas.get(_id)) != json.dumps(d):
                    mysqldb.execute(
                        f"update kpl_limit_up_record set _latest_limit_up_time='{d[3]}',_limit_up_time='{d[2]}',_hot_block_code_count={d[10]},_limit_up_high_info='{d[4]}' ,_update_time=now() where _id='{_id}'")
                    cls.latest_datas[_id] = d
            if set_not_open:
                # 需要设置不炸板
                mysqldb.execute(f"update kpl_limit_up_record set _open = 0, _update_time = now() where _id='{_id}'")
 
            cls.latest_datas[_id] = d
 
            # 获取原来的代码所属板块,删除之前错误的板块
            old_datas = KPLLimitUpDataRecordManager.list_by_code(code, day)
            if old_datas:
                for dd in old_datas:
                    if dd[2] not in code_block_dict[code]:
                        mysqldb.execute(f"delete from kpl_limit_up_record where _id='{dd[0]}'")
                        logger_kpl_limit_up_reason_change.info(f"code-{dd[3]}:{dd[2]}-{code_block_dict[code]}")
                        # 板块更改过
                        mysqldb.execute(
                            f"update kpl_limit_up_record set _hot_block_change = f'{dd[2]}' where _day='{dd[1]}' and _code='{code}'")
                        TodayLimitUpReasonChangeManager().set_today_limit_up_reason_change(code, dd[2],
                                                                                           code_block_dict[code])
 
                        if dd[0] in cls.latest_datas:
                            cls.latest_datas.pop(dd[0])
        cls.total_datas = KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
        LimitUpDataConstant.set_history_limit_up_datas(cls.total_datas)
 
    @classmethod
    def load_total_datas(cls):
        cls.total_datas = KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
        cls.__LimitUpCodesPlateKeyManager.set_today_total_limit_up(
            [(r[3], r[2], r[6].split("、") if r[6] else []) for r in cls.total_datas])
        for d in cls.total_datas:
            cls.__load_hist_and_blocks(d[3])
 
    @staticmethod
    def list_all(day, max_limit_up_time=None):
        mysqldb = mysql_data.Mysqldb()
        sql = f"select * from kpl_limit_up_record where _day='{day}'"
        if max_limit_up_time:
            sql += f" and cast(_limit_up_time as unsigned)<={max_limit_up_time}"
        return mysqldb.select_all(sql)
 
    @classmethod
    def list_all_cache(cls, day):
        if day in cls.__records_cache:
            return cls.__records_cache[day]
        fdata = cls.list_all(day)
        if fdata:
            cls.__records_cache[day] = fdata
        return fdata
 
    @staticmethod
    def list_by_code(code, day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from kpl_limit_up_record where _code='{code}' and _day='{day}'")
 
    @staticmethod
    def list_by_block(block_name, day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(
            f"select * from kpl_limit_up_record where _hot_block_name='{block_name}' and _day='{day}'")
 
    @staticmethod
    def list_blocks_with_day(days):
        mysqldb = mysql_data.Mysqldb()
        sql = "select _hot_block_name,_day from kpl_limit_up_record where "
        wheres = []
        for day in days:
            wheres.append(f"_day = '{day}'")
        sql += " or ".join(wheres)
        sql += " group by _hot_block_name,_day"
 
        results = mysqldb.select_all(sql)
        return results
 
    @staticmethod
    def get_latest_blocks(code):
        wheres = []
        for b in constant.KPL_INVALID_BLOCKS:
            wheres.append(f"hb.`_hot_block_name` != '{b}'")
        wheres = " and ".join(wheres)
        sql = f"SELECT GROUP_CONCAT(_hot_block_name) FROM (SELECT hb.`_hot_block_name`,hb.`_day` FROM `kpl_limit_up_record` hb WHERE hb.`_code`='{code}' AND {wheres} ORDER BY hb.`_day` DESC LIMIT 2) a  GROUP BY a._day ORDER BY a._day DESC LIMIT 1"
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_one(sql)
 
    # 获取代码最近的板块,返回[(板块,日期)]
    @classmethod
    def get_latest_infos(cls, code, count, contains_today=True):
        wheres = []
        for b in constant.KPL_INVALID_BLOCKS:
            wheres.append(f"hb.`_hot_block_name` != '{b}'")
        wheres = " and ".join(wheres)
        # 只获取最近180天的数据
        min_day = tool.date_sub(tool.get_now_date_str(), 180)
        sql = f"SELECT GROUP_CONCAT(_hot_block_name),`_day`,_blocks FROM (SELECT hb.`_hot_block_name`,hb.`_day`,hb._blocks FROM `kpl_limit_up_record` hb WHERE hb.`_code`='{code}' and {wheres} and hb.`_day` > '{min_day}' ORDER BY hb.`_day` DESC LIMIT 10) a  GROUP BY a._day ORDER BY a._day DESC LIMIT {count}"
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_all(sql)
        if results and not contains_today and results[0][1] == tool.get_now_date_str():
            return results[1:]
        return results
 
    @classmethod
    def get_latest_block_infos(cls, min_day=tool.date_sub(tool.get_now_date_str(), 180), code=None):
        """
 
        @param min_day: 默认获取180天之前的
        @param code: 代码
        @return: 最近的涨停板块信息
        """
        sql = f"SELECT r.`_code`, r.`_day`, r.`_hot_block_name`, r.`_blocks` FROM `kpl_limit_up_record` r WHERE r.`_day`>'{min_day}'"
        if code:
            sql += f" AND _code='{code}'"
        sql += " order by _create_time"
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_all(sql)
        return results
 
    @classmethod
    def get_latest_blocks_set(cls, code):
        results = cls.get_latest_infos(code, 2, False)
        bs = set([b[0] for b in results])
        return bs
 
    @classmethod
    def get_current_blocks(cls, code):
        return cls.__current_code_reasons_dict.get(code)
 
    @classmethod
    def get_current_codes_by_block(cls, block):
        return cls.__current_reason_codes_dict.get(block)
 
    @classmethod
    def get_current_reason_codes_dict(cls):
        return copy.deepcopy(cls.__current_reason_codes_dict)
 
    @classmethod
    def get_current_limit_up_reason_codes_dict(cls):
        return copy.deepcopy(cls.__current_limit_up_reason_codes_dict)
 
    @classmethod
    def get_current_reasons(cls):
        if cls.__current_reason_codes_dict:
            return cls.__current_reason_codes_dict.keys()
        return set()
 
    @classmethod
    def get_new_blocks(cls, day):
        """
        获取某一天新出现的板块(新板块)
        @param day:
        @return:
        """
        sql = f"SELECT k.`_hot_block_name`, k.`_day` FROM `kpl_limit_up_record` k GROUP BY k.`_hot_block_name` HAVING k.`_day`='{day}' ORDER BY  k.`_day` DESC"
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_all(sql)
        return [x[0] for x in results]
 
 
class KPLDataManager:
    __latest_datas = {}
    kpl_data_update_info = {}
    __file_content_cache = {}
 
    @classmethod
    def __save_in_file(cls, key, datas):
        name = f"{tool.get_now_date_str()}_{key}.log"
        path = f"{constant.CACHE_PATH}/{name}"
        with open(path, 'w') as f:
            f.write(json.dumps(datas))
 
    @classmethod
    def __get_from_file(cls, key, day=tool.get_now_date_str()):
        name = f"{day}_{key}.log"
        path = f"{constant.CACHE_PATH}/{name}"
        if not os.path.exists(path):
            return None
        with open(path, 'r') as f:
            lines = f.readlines()
            if lines:
                return json.loads(lines[0])
        return None
 
    @classmethod
    def get_from_file(cls, type, day):
        name = f"{day}_{type.value}.log"
        path = f"{constant.CACHE_PATH}/{name}"
        if not os.path.exists(path):
            return None
        with open(path, 'r') as f:
            lines = f.readlines()
            if lines:
                return json.loads(lines[0])
        return None
 
    @classmethod
    def get_from_file_cache(cls, type, day):
        key = f"{type}-{day}"
        if key in cls.__file_content_cache:
            return cls.__file_content_cache.get(key)
        fdata = cls.get_from_file(type, day)
        if fdata:
            cls.__file_content_cache[key] = fdata
        return fdata
 
    @classmethod
    # 获取最近几天的数据,根据日期倒序返回
    def get_latest_from_file(cls, type, count, max_day=tool.get_now_date_str()):
        files = os.listdir(constant.CACHE_PATH)
        file_name_list = []
        for f in files:
            if f[10:] == f"_{type.value}.log":
                file_name_list.append((f.split("_")[0], f))
 
        file_name_list.sort(key=lambda x: x[0], reverse=True)
        fresults = []
        for file in file_name_list:
            path = f"{constant.CACHE_PATH}/{file[1]}"
            if not os.path.exists(path):
                continue
            with open(path, 'r') as f:
                lines = f.readlines()
                if lines:
                    if int(file[0].replace("-", "")) <= int(max_day.replace("-", "")):
                        fresults.append((file[0], json.loads(lines[0])))
                if len(fresults) >= count:
                    break
 
        return fresults
 
    @classmethod
    def save_data(cls, type, datas):
        cls.kpl_data_update_info[type] = (tool.get_now_time_str(), len(datas))
        cls.__latest_datas[type] = datas
        cls.__save_in_file(type, datas)
 
    @classmethod
    def get_data(cls, type):
        type = type.value
        if type in cls.__latest_datas:
            return cls.__latest_datas[type]
        result = cls.__get_from_file(type)
        if result is not None:
            cls.__latest_datas[type] = result
        return result
 
 
def load_history_limit_up():
    for file_name in os.listdir(f"{constant.get_path_prefix()}/kpl/his"):
        if file_name.find("HisDaBanList_1.log") < 0:
            continue
        day = file_name[:10]
        with open(f"{constant.get_path_prefix()}/kpl/his/{file_name}", 'r', encoding="utf-16") as f:
            lines = f.readlines()
            line = lines[0]
            result = json.loads(line)
            list_ = kpl_util.parseDaBanData(result, kpl_util.DABAN_TYPE_LIMIT_UP)
            # KPLLimitUpDataRecordManager.save_record(day, list_)
            for r in list_:
                print(r[-1], r[5])
                KPLPlatManager().save_plat(r[-1], r[5])
 
            # print(day, list_)
 
 
# 历史涨停列表
__limit_up_list_records_dict = {}
 
 
# 获取最近几天的实时涨停信息
# 返回格式([日期,数据])
def get_current_limit_up_data_records(count, day=tool.get_now_date_str()):
    fresults = []
    datas = []
    if day in __limit_up_list_records_dict:
        datas = __limit_up_list_records_dict[day]
    else:
        logger_debug.info("从文件中获取前几天的实时涨停数据")
        datas = KPLDataManager().get_latest_from_file(KPLDataType.LIMIT_UP, count + 2, max_day=day)
        # 移除比今天还大的数据
        fdatas = []
        for d in datas:
            if int(d[0].replace("-", "")) > int(day.replace("-", "")):
                continue
            fdatas.append(d)
        if fdatas:
            # 保存数据
            __limit_up_list_records_dict[day] = fdatas
    datas = __limit_up_list_records_dict[day]
    for i in range(len(datas)):
        if datas[i][0] == day:
            continue
        fresults.append(datas[i])
        if len(fresults) >= count:
            break
    return fresults
 
 
def get_yesterday_limit_up_codes():
    yesterday_limit_up_data_records = get_yesterday_current_limit_up_records()
    yesterday_codes = set([x[0] for x in yesterday_limit_up_data_records])
    return yesterday_codes
 
 
def get_yesterday_current_limit_up_records():
    yesterday_limit_up_data_records = get_current_limit_up_data_records(1)[0][1]
    return yesterday_limit_up_data_records
 
 
# 获取最近几天涨停原因
__latest_current_limit_up_records = {}
 
 
def get_latest_current_limit_up_records(day=tool.get_now_date_str(), max_day_count=15):
    if day not in __latest_current_limit_up_records:
        fdatas = get_current_limit_up_data_records(max_day_count)
        __latest_current_limit_up_records[day] = fdatas
    return __latest_current_limit_up_records.get(day)
 
 
class PullTask:
    # 最近更新时间
    __latest_update_time_dict = {}
 
    @classmethod
    def __upload_data(cls, type, datas):
        root_data = {
            "type": type,
            "data": datas
        }
        requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
 
    @classmethod
    def repaire_pull_task(cls):
        """
        修复拉取任务
        @return:
        """
        # 修复涨停
        logger_debug.info("任务修复-开盘啦:启动修复")
        key = "limit_up"
        if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
            logger_debug.info("任务修复-开盘啦:涨停列表")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        # key = "jingxuan_rank"
        # if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
        #     logger_debug.info("任务修复-开盘啦:精选流入列表")
        #     # 大于20s就需要更新
        #     threading.Thread(target=cls.run_market_jingxuan_in, daemon=True).start()
        #
        # key = "jingxuan_rank_out"
        # if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
        #     logger_debug.info("任务修复-开盘啦:精选流出列表")
        #     # 大于20s就需要更新
        #     threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
 
        key = "market_strong"
        if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
            logger_debug.info("任务修复-开盘啦:市场强度")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_market_strong, daemon=True).start()
 
    @classmethod
    def run_limit_up_task(cls):
        # 关闭log
        log.close_print()
        while True:
            try:
                if (tool.is_trade_time() and int(tool.get_now_time_str().replace(':', '')) > int("092530")):
                    results = kpl_api.getLimitUpInfoNew()
                    result = json.loads(results)
                    start_time = time.time()
                    cls.__upload_data("limit_up", result)
            except Exception as e:
                try:
                    logging.exception(e)
                    logger_debug.exception(e)
                except:
                    pass
            except:
                pass
            finally:
                cls.__latest_update_time_dict["limit_up"] = time.time()
                time.sleep(3)
 
    @classmethod
    def run_market_jingxuan_in(cls):
        """
        精选流入
        @return:
        """
        while True:
            try:
                if tool.is_trade_time():
                    results = kpl_api.getMarketJingXuanRealRankingInfo()
                    result = json.loads(results)
                    cls.__upload_data("jingxuan_rank", result)
            except:
                pass
            finally:
                cls.__latest_update_time_dict["jingxuan_rank"] = time.time()
                time.sleep(3)
 
    @classmethod
    def run_market_jingxuan_out(cls):
        """
        精选流出
        @return:
        """
        while True:
            try:
                if tool.is_trade_time():
                    results = kpl_api.getMarketJingXuanRealRankingInfo(False)
                    result = json.loads(results)
                    cls.__upload_data("jingxuan_rank_out", result)
            except:
                pass
            finally:
                cls.__latest_update_time_dict["jingxuan_rank_out"] = time.time()
                time.sleep(3)
 
    @classmethod
    def run_market_strong(cls):
        """
        精选流出
        @return:
        """
        while True:
            try:
                if tool.is_trade_time():
                    strong_value = kpl_api.getMarketStrong()
                    cls.__upload_data("market_strong", strong_value)
            except:
                pass
            finally:
                cls.__latest_update_time_dict["market_strong"] = time.time()
                time.sleep(3)
 
    @classmethod
    # 运行拉取任务
    def run_pull_task(cls):
        def get_bidding_money():
            # 竞价数据上传
            while True:
                if int("092600") < int(tool.get_now_time_str().replace(":", "")) < int("092700"):
                    try:
                        results = kpl_api.daBanList(kpl_api.DABAN_TYPE_BIDDING)
                        result = json.loads(results)
                        cls.__upload_data("biddings", result)
                    except Exception as e:
                        pass
                time.sleep(3)
 
        def get_market_industry():
            while True:
                if tool.is_trade_time():
                    try:
                        results = kpl_api.getMarketIndustryRealRankingInfo()
                        result = json.loads(results)
                        cls.__upload_data("industry_rank", result)
                    except:
                        pass
                time.sleep(3)
 
        def get_market_jingxuan():
            while True:
                if tool.is_trade_time():
                    try:
                        results = kpl_api.getMarketJingXuanRealRankingInfo()
                        result = json.loads(results)
                        cls.__upload_data("jingxuan_rank", result)
                    except:
                        pass
                    finally:
                        cls.__latest_update_time_dict["jingxuan_rank"] = time.time()
                        time.sleep(3)
                else:
                    time.sleep(3)
 
        threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        threading.Thread(target=cls.run_market_strong, daemon=True).start()
        # threading.Thread(target=get_bidding_money, daemon=True).start()
        # threading.Thread(target=get_market_industry, daemon=True).start()
        # threading.Thread(target=cls.run_market_jingxuan_in, daemon=True).start()
        # threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
 
 
@tool.singleton
class CodeHighLevel:
    """
    代码高度管理
    """
    __instance = None
    # 下单板块的代码记录
    __code_level_dict = {}
    __codes = set()
 
    def __init__(self, day=tool.get_now_date_str()):
        self.__day = day
        self.__load_data(day)
 
    @classmethod
    def __load_data(cls, day):
        fdatas = get_current_limit_up_data_records(15, day=day)
        temp_dict = {d[0]: 2 for d in fdatas[0][1]}
        break_codes = set()
        for i in range(1, len(fdatas)):
            codes = [d[0] for d in fdatas[i][1]]
            for k in temp_dict:
                if k in break_codes:
                    continue
                if k in codes:
                    temp_dict[k] += 1
                else:
                    break_codes.add(k)
        cls.__code_level_dict = temp_dict
 
    def get_high_level(self, code):
        """
        获取涨停高度,默认1板
        @param code:
        @return:
        """
        if code in self.__code_level_dict:
            return self.__code_level_dict[code]
        return 1
 
 
if __name__ == "__main__":
    print(CodeHighLevel("2024-11-11").get_high_level("000833"))
    input()