Administrator
2025-06-03 f82aab768e4bc361eac69e1ae69b36a9bec2970c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
"""
涨停数据过滤器
"""
 
# 判断是龙几,判断是否涨停,判断是否炸板,加载分数
from db.redis_manager_delegate import RedisUtils
from utils import tool
from db import redis_manager_delegate as redis_manager
from third_data.kpl_data_manager import KPLDataManager
from third_data.kpl_util import KPLDataType
 
__kplDataManager = KPLDataManager()
 
 
# 忽略代码管理器
class IgnoreCodeManager:
    __redisManager = redis_manager.RedisManager(3)
 
    def __get_redis(self):
        return self.__redisManager.getRedis()
 
    def ignore_code(self, type, code):
        RedisUtils.sadd(self.__get_redis(), f"kp_ignore_codes_{type}", code)
        RedisUtils.expire(self.__get_redis(), f"kp_ignore_codes_{type}", tool.get_expire())
 
    def list_ignore_codes(self, type):
        return RedisUtils.smembers(self.__get_redis(), f"kp_ignore_codes_{type}")
 
 
# 获取涨停顺序(按涨停原因分组)
def get_limit_up_time_rank_dict(datas):
    datas.sort(key=lambda x: int(x[5]))
    max_record = {}
    rank_dict = {}
    for d in datas:
        if d[2] not in max_record:
            max_record[d[2]] = 0
        max_record[d[2]] = max_record[d[2]] + 1
        rank_dict[d[3]] = max_record[d[2]]
    return rank_dict
 
 
# 获取涨停信息
def get_limit_up_info(codes):
    limit_up_data = __kplDataManager.get_data(KPLDataType.LIMIT_UP)
    limit_up_codes = []
    if limit_up_data:
        limit_up_codes = set([val[0] for val in limit_up_data])
    open_limit_up_data = __kplDataManager.get_data(KPLDataType.OPEN_LIMIT_UP)
    open_limit_up_codes = set()
    if open_limit_up_data:
        open_limit_up_codes = set([val[0] for val in open_limit_up_data])
    dict_ = {}
    for code in codes:
        dict_[code] = (code in limit_up_codes, code in open_limit_up_codes)
    return dict_, limit_up_codes, open_limit_up_codes
 
if __name__ == "__main__":
    get_limit_up_info()