"""
|
涨停数据过滤器
|
"""
|
|
# 判断是龙几,判断是否涨停,判断是否炸板,加载分数
|
import redis
|
|
import code_volumn_manager
|
import gpcode_manager
|
import limit_up_time_manager
|
import tool
|
from db import redis_manager
|
from third_data.kpl_data_manager import KPLDataManager
|
from third_data.kpl_util import KPLDataType
|
from trade import first_code_score_manager
|
|
__kplDataManager = KPLDataManager()
|
|
|
# 忽略代码管理器
|
class IgnoreCodeManager:
|
__redisManager = redis_manager.RedisManager(3)
|
|
def __get_redis(self):
|
return self.__redisManager.getRedis()
|
|
def ignore_code(self, type, code):
|
self.__get_redis().sadd(f"kp_ignore_codes_{type}", code)
|
self.__get_redis().expire(f"kp_ignore_codes_{type}", tool.get_expire())
|
|
def list_ignore_codes(self, type):
|
return self.__get_redis().smembers(f"kp_ignore_codes_{type}")
|
|
|
# 获取涨停顺序(按涨停原因分组)
|
def get_limit_up_time_rank_dict(datas):
|
datas.sort(key=lambda x: int(x[5]))
|
max_record = {}
|
rank_dict = {}
|
for d in datas:
|
if d[2] not in max_record:
|
max_record[d[2]] = 0
|
max_record[d[2]] = max_record[d[2]] + 1
|
rank_dict[d[3]] = max_record[d[2]]
|
return rank_dict
|
|
|
# 获取涨停信息
|
def get_limit_up_info(codes):
|
limit_up_data = __kplDataManager.get_data(KPLDataType.LIMIT_UP)
|
limit_up_codes = set([val[0] for val in limit_up_data])
|
open_limit_up_data = __kplDataManager.get_data(KPLDataType.OPEN_LIMIT_UP)
|
open_limit_up_codes = set()
|
if open_limit_up_data:
|
open_limit_up_codes = set([val[0] for val in open_limit_up_data])
|
dict_ = {}
|
for code in codes:
|
dict_[code] = (code in limit_up_codes, code in open_limit_up_codes)
|
return dict_, limit_up_codes, open_limit_up_codes
|
|
|
# 获取代码分数字典
|
def get_codes_scores_dict(codes):
|
# 获取所有监听中的代码
|
first_codes = gpcode_manager.get_first_gp_codes()
|
scores = {}
|
for code in codes:
|
if code not in first_codes:
|
continue
|
if code not in scores:
|
# 获取分数
|
try:
|
limit_up_time = limit_up_time_manager.get_limit_up_time(code)
|
volume_rate, volume_info = code_volumn_manager.get_volume_rate(code, True)
|
(score, score_list), score_source_list = first_code_score_manager.get_score(code, volume_rate,
|
limit_up_time,
|
True)
|
scores[code] = score
|
except:
|
pass
|
return scores
|
|
|
if __name__ == "__main__":
|
get_limit_up_info()
|