Administrator
2023-05-15 045a5aa6434da6e83c3d850b17e7e58cd7b55ef5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
涨停数据过滤器
"""
 
# 判断是龙几,判断是否涨停,判断是否炸板,加载分数
import redis
 
import code_volumn_manager
import gpcode_manager
import limit_up_time_manager
import tool
from db import redis_manager
from third_data.kpl_data_manager import KPLDataManager
from third_data.kpl_util import KPLDataType
from trade import first_code_score_manager
 
__kplDataManager = KPLDataManager()
 
 
# 忽略代码管理器
class IgnoreCodeManager:
    __redisManager = redis_manager.RedisManager(3)
 
    def __get_redis(self):
        return self.__redisManager.getRedis()
 
    def ignore_code(self, type, code):
        self.__get_redis().sadd(f"kp_ignore_codes_{type}", code)
        self.__get_redis().expire(f"kp_ignore_codes_{type}", tool.get_expire())
 
    def list_ignore_codes(self, type):
        return self.__get_redis().smembers(f"kp_ignore_codes_{type}")
 
 
# 获取涨停顺序(按涨停原因分组)
def get_limit_up_time_rank_dict(datas):
    datas.sort(key=lambda x: int(x[5]))
    max_record = {}
    rank_dict = {}
    for d in datas:
        if d[2] not in max_record:
            max_record[d[2]] = 0
        max_record[d[2]] = max_record[d[2]] + 1
        rank_dict[d[3]] = max_record[d[2]]
    return rank_dict
 
 
# 获取涨停信息
def get_limit_up_info(codes):
    limit_up_data = __kplDataManager.get_data(KPLDataType.LIMIT_UP)
    limit_up_codes = set([val[0] for val in limit_up_data])
    open_limit_up_data = __kplDataManager.get_data(KPLDataType.OPEN_LIMIT_UP)
    open_limit_up_codes = set()
    if open_limit_up_data:
        open_limit_up_codes = set([val[0] for val in open_limit_up_data])
    dict_ = {}
    for code in codes:
        dict_[code] = (code in limit_up_codes, code in open_limit_up_codes)
    return dict_, limit_up_codes, open_limit_up_codes
 
 
# 获取代码分数字典
def get_codes_scores_dict(codes):
    # 获取所有监听中的代码
    first_codes = gpcode_manager.get_first_gp_codes()
    scores = {}
    for code in codes:
        if code not in first_codes:
            continue
        if code not in scores:
            # 获取分数
            try:
                limit_up_time = limit_up_time_manager.get_limit_up_time(code)
                volume_rate, volume_info = code_volumn_manager.get_volume_rate(code, True)
                (score, score_list), score_source_list = first_code_score_manager.get_score(code, volume_rate,
                                                                                            limit_up_time,
                                                                                            True)
                scores[code] = score
            except:
                pass
    return scores
 
 
if __name__ == "__main__":
    get_limit_up_info()