""" 股性分析 """ # 是否有涨停 import copy import json from code_attribute import gpcode_manager # 代码股性记录管理 from db import redis_manager_delegate as redis_manager from utils import tool from db.redis_manager_delegate import RedisUtils from utils.tool import CodeDataCacheUtil class CodeNatureRecordManager: __k_format_cache = {} __nature_cache = {} __db = 0 __instance = None __redis_manager = redis_manager.RedisManager(0) def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(CodeNatureRecordManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "k_format-*") for k in keys: code = k.split("-")[1] val = RedisUtils.get(__redis, k) val = json.loads(val) cls.__k_format_cache[code] = val keys = RedisUtils.keys(__redis, "code_nature-*") for k in keys: code = k.split("-")[1] val = RedisUtils.get(__redis, k) val = json.loads(val) cls.__nature_cache[code] = val except: pass finally: RedisUtils.realse(__redis) # 保存K线形态 def save_k_format(self, code, k_format): self.__k_format_cache[code] = k_format RedisUtils.setex_async(self.__db, f"k_format-{code}", tool.get_expire(), json.dumps(k_format)) def get_k_format(self, code): val = RedisUtils.get(self.__get_redis(), f"k_format-{code}") if val: return json.loads(val) return None def get_k_format_cache(self, code): val = self.__k_format_cache.get(code) # 复制 return copy.deepcopy(val) if val else None def clear(self): self.__k_format_cache.clear() self.__nature_cache.clear() keys = RedisUtils.keys(self.__get_redis(), "k_format-*") for k in keys: RedisUtils.delete(self.__get_redis(), k) keys = RedisUtils.keys(self.__get_redis(), "code_nature-*") for k in keys: RedisUtils.delete(self.__get_redis(), k) # 保存股性 def save_nature(self, code, natures): RedisUtils.setex_async(self.__db, f"code_nature-{code}", tool.get_expire(), json.dumps(natures)) def get_nature(self, code): val = RedisUtils.get(self.__get_redis(), f"code_nature-{code}") if val: return json.loads(val) return None def get_nature_cache(self, code): return self.__nature_cache.get(code) class LatestMaxVolumeManager: __db = 0 __instance = None __redis_manager = redis_manager.RedisManager(0) __max_volume_cache = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(LatestMaxVolumeManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "is_latest_max_volume-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) CodeDataCacheUtil.set_cache(cls.__max_volume_cache, code, val) finally: RedisUtils.realse(__redis) @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() def __save_has_latest_max_volume(self, code): RedisUtils.setex_async(self.__db, f"is_latest_max_volume-{code}", tool.get_expire(), "1") # 设置最近有最大量 def set_has_latest_max_volume(self, code): CodeDataCacheUtil.set_cache(self.__max_volume_cache, code, 1) self.__save_has_latest_max_volume(code) # 最近是否有最大量 def is_latest_max_volume(self, code): return code in self.__max_volume_cache def clear(self): self.__max_volume_cache.clear() keys = RedisUtils.keys(self.__get_redis(), "is_latest_max_volume-*") for k in keys: RedisUtils.delete_async(self.__db, k) # 涨幅过高的票管理 class HighIncreaseCodeManager: __db = 0 __instance = None __redis_manager = redis_manager.RedisManager(0) __high_increase_codes = set() def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(HighIncreaseCodeManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: cls.__high_increase_codes = RedisUtils.smembers(__redis, "high_increase_codes") finally: RedisUtils.realse(__redis) @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() def add_code(self, code): if self.__high_increase_codes is None: self.__high_increase_codes = set() self.__high_increase_codes.add(code) RedisUtils.sadd_async(self.__db, "high_increase_codes", code) RedisUtils.expire_async(self.__db, "high_increase_codes", tool.get_expire()) def is_in(self, code): return code in self.__high_increase_codes # 加载全部 def list_all(self): return self.__high_increase_codes def clear(self): if self.__high_increase_codes: self.__high_increase_codes.clear() RedisUtils.delete_async(self.__db, "high_increase_codes") # 设置历史K线 def set_record_datas(code, limit_up_price, record_datas): k_format = get_k_format(code, float(limit_up_price), record_datas) CodeNatureRecordManager().save_k_format(code, k_format) natures = get_nature(code, record_datas) CodeNatureRecordManager().save_nature(code, natures) # 获取K线形态 # 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度,近2天有10天内最大量,上个交易日是否炸板, 上个交易日是否跌停) def get_k_format(code, limit_up_price, record_datas): p1_data = get_lowest_price_rate(code, record_datas) p1 = p1_data[0] >= 0.249, p1_data[1] p2 = __is_new_top(code, limit_up_price, record_datas) p3 = __is_lowest(code, record_datas) p4 = __is_near_new_top(code, limit_up_price, record_datas) p5 = __is_n_model(code, record_datas) p6 = __is_v_model(code, record_datas) p8 = __get_big_volumn_info(code, record_datas) # # N字型包含了N字型 # if p5: # p6 = False, '' p7 = (p1[0] or p2[0] or p3[0] or p4[0] or p5[0] or p6[0], '') # 是否具有辨识度 p9 = is_special(code, record_datas) p10 = is_latest_10d_max_volume_at_latest_2d(code, record_datas) # 最近5天是否有炸板/涨停/跌停 p11 = __has_latest_throwing_pressure(code, record_datas, 5) # 90天内是否有涨停 p12 = __has_limit_up(code, record_datas, 90) # 上个交易日是否振幅过大 p13 = __is_pre_day_limit_rate_too_low(code, record_datas) # 60个交易日是否曾涨停 p14 = __has_limited_up(code, record_datas, 60) # 昨日是否涨停过 p15 = __has_limited_up(code, record_datas, 1) # 昨日是否跌停 p16 = __is_latest_limit_down(code, record_datas, 1) return p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16 # 是否具有K线形态 def is_has_k_format(code, limit_up_price, record_datas): is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info, is_special, has_max_volume, open_limit_up, is_limit_up_in_30days, is_latest_limit_down = get_k_format( code, float(limit_up_price), record_datas) if not has_format: return False, "不满足K线形态" return True, "有形态" # 获取股性 # 返回(是否涨停,首板溢价率,首板炸板溢价率) def get_nature(code, record_datas): limit_up_count = get_first_limit_up_count(code, record_datas) premium_rate = get_limit_up_premium_rate(code, record_datas) open_premium_rate = get_open_limit_up_premium_rate(code, record_datas) result = (limit_up_count, premium_rate, open_premium_rate) return result # 获取涨幅 def get_lowest_price_rate(code, record_datas): datas = copy.deepcopy(record_datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-10:] for data in datas: limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"])) if abs(limit_up_price - data["high"]) < 0.001: date = data['bob'].strftime("%Y-%m-%d") return round((datas[-1]["close"] - data["close"]) / data["close"], 4), date return 0, '' # 是否涨得太高 def is_up_too_high_in_10d_with_limit_up(code, record_datas): datas = copy.deepcopy(record_datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-10:] limit_ups = [] limit_up_count = 0 max_price = datas[0]["high"] for data in datas: limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"])) date = data['bob'].strftime("%Y-%m-%d") if data["high"] > max_price: max_price = data["high"] if abs(limit_up_price - data["high"]) < 0.001: limit_ups.append((date, True)) limit_up_count += 1 else: limit_ups.append((date, False)) # 下降幅度 limit_down_rate = round((max_price - datas[-1]["close"]) / max_price, 3) if limit_up_count < 3: return False # 连续5天有3天涨停 for i in range(len(limit_ups)): if i + 5 > len(limit_ups): break temp_datas = limit_ups[i:i + 5] t_count = 0 for t in temp_datas: if t[1]: t_count += 1 if t_count >= 3 and limit_down_rate < 0.15: # 降幅小于20% return True return False # 10天内的最高量是否集中在最近两天 def is_latest_10d_max_volume_at_latest_2d(code, record_datas): datas = copy.deepcopy(record_datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-10:] max_volume_info = None for i in range(0, len(datas)): if not max_volume_info: max_volume_info = (i, datas[i]["volume"]) else: if max_volume_info[1] < datas[i]["volume"]: max_volume_info = (i, datas[i]["volume"]) return len(datas) - max_volume_info[0] <= 2 # 120 天内是否长得太高 def is_up_too_high_in_120d(code, record_datas): datas = copy.deepcopy(record_datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-120:] today_limit_up_price = round( float(gpcode_manager.get_limit_up_price_by_preprice(code, datas[-1]["close"])), 2) max_price = 0 for data in datas: if data["high"] > max_price: max_price = data["high"] if today_limit_up_price <= max_price: return False # 计算120天的均价 total_amount = 0 total_volume = 0 for data in datas: total_amount += data["amount"] total_volume += data["volume"] average_price = round(total_amount / total_volume, 2) if (today_limit_up_price - average_price) / average_price > 0.3: return True else: return False # 暂时不使用 # 从最近一次涨停开始,是否涨幅过高 def is_up_too_high_from_latest_limit_up(code, record_datas): datas = copy.deepcopy(record_datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-20:] datas.reverse() today_limit_up_price = round( float(gpcode_manager.get_limit_up_price_by_preprice(code, datas[0]["close"])), 2) max_price = 0 limit_up_price = None for i in range(0, len(datas)): item = datas[i] if item['high'] > max_price: max_price = item['high'] if __is_limited_up(code, item): limit_up_price = item['high'] break if not limit_up_price: return False if today_limit_up_price < max_price: return False if (today_limit_up_price - limit_up_price) / limit_up_price > 0.25: return True return False # 最近几天是否有最大量 def is_have_latest_max_volume(code, record_datas, day_count): datas = copy.deepcopy(record_datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-120:] max_volume = (0, datas[0]["volume"]) for i in range(0, len(datas)): if max_volume[1] < datas[i]["volume"]: max_volume = (i, datas[i]["volume"]) if len(datas) - max_volume[0] <= day_count: return True return False # 在最近几天内股价是否长得太高 def is_price_too_high_in_days(code, record_datas, limit_up_price, day_count=5): datas = copy.deepcopy(record_datas) datas.sort(key=lambda x: x["bob"]) datas = datas[0 - day_count:] min_price_info = None max_price_info = None for d in datas: if min_price_info is None: min_price_info = d["low"], d if max_price_info is None: max_price_info = d["high"], d if min_price_info[0] > d["low"]: min_price_info = d["low"], d if max_price_info[0] < d["high"]: max_price_info = d["high"], d # if max_price > float(limit_up_price): # return False rate = (max_price_info[1]["high"] - min_price_info[1]["low"]) / min_price_info[1]["low"] THRESHOLD_RATE = 0.319 * 2 if tool.is_ge_code(code) else 0.319 if rate >= THRESHOLD_RATE: return True, rate return False, rate # 连续涨停后是否回调不足够 def is_continue_limit_up_not_enough_fall_dwon(code, record_datas): # 10 天内是否有连续3板 datas = copy.deepcopy(record_datas) datas.sort(key=lambda x: x["bob"]) datas = datas[0 - 10:] limit_up_continue_count_info = None max_limit_up_continue_count_info_list = [] # [连续涨停次数,涨停起点] for i in range(len(datas)): item = datas[i] if __is_limit_up(code, item): if not limit_up_continue_count_info: limit_up_continue_count_info = [1, i] else: limit_up_continue_count_info[0] += 1 else: if limit_up_continue_count_info: max_limit_up_continue_count_info_list.append(limit_up_continue_count_info) limit_up_continue_count_info = None max_limit_up_info = None for x in max_limit_up_continue_count_info_list: if max_limit_up_info is None: max_limit_up_info = x if max_limit_up_info[0] <= x[0]: max_limit_up_info = x if not max_limit_up_info or max_limit_up_info[0] < 3: return False start_index = max_limit_up_info[1] max_price_info = [0, None] for i in range(start_index, len(datas)): item = datas[i] if item["high"] > max_price_info[0]: max_price_info = [item["high"], i] # 计算回踩价格 lowest_price_threhhold = round((1 - 0.28) * max_price_info[0], 2) for i in range(max_price_info[1] + 1, len(datas)): item = datas[i] if item["low"] < lowest_price_threhhold: return False return True # 是否有涨停 def get_first_limit_up_count(code, datas): datas = copy.deepcopy(datas) count = 0 for i in range(len(datas)): item = datas[i] # 获取首板涨停次数 if __is_limit_up(code, item) and i > 0 and not __is_limit_up(code, datas[i - 1]): # 首板涨停 count += 1 return count # 是否破前高 def __is_new_top(code, limit_up_price, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-60:] max_price = 0 for data in datas: if max_price < data["high"]: max_price = data["high"] if limit_up_price >= max_price: return True, '' return False, '' def is_new_top(code, limit_up_price, datas): return __is_new_top(code, float(limit_up_price), datas)[0] def is_near_top(code, limit_up_price, datas): return __is_near_new_top(code, float(limit_up_price), datas)[0] # 接近新高 def __is_near_new_top(code, limit_up_price, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] max_price = 0 max_price_index = 0 for index in range(0, len(datas)): data = datas[index] if data["high"] > max_price: max_price = data["high"] max_price_index = index rate = (max_price - float(limit_up_price)) / max_price if 0 < rate < 0.02: return True, datas[max_price_index]['bob'].strftime("%Y-%m-%d") return False, '' # 是否跌破箱体 def __is_lowest(code, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] min_price = 100000 for data in datas: if min_price > data["low"]: min_price = data["low"] # 近5天内的最低价 date = '' min_price_5 = 10000 for data in datas[-5:]: if min_price_5 > data["low"]: min_price_5 = data["low"] date = data['bob'] if abs(min_price_5 - min_price) / min_price < 0.015: return True, date.strftime("%Y-%m-%d") return False, '' # N字形 def __is_n_model(code, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] if len(datas) >= 6: max_price = 0 min_price = 1000000 for i in range(len(datas) - 5, len(datas)): item = datas[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"])) if abs(limit_up_price - item["high"]) < 0.001 and abs( limit_up_price - datas[i - 1]["high"]) >= 0.001: # 涨停,前一天非涨停 max_price = item["close"] elif max_price > 0: if min_price > item["low"]: min_price = item["low"] if max_price > min_price: return True, '' return False, '' def __has_latest_throwing_pressure(code, datas, day_count): """ 最近释放有抛压 @param code: @param datas: @param day_count: @return: 是否有抛压, None/(p高价数据, t高价数据) """ datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) items = datas[0 - day_count:] target_item = None for i in range(len(items) - 1, -1, -1): item = items[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"])) limit_down_price = float(gpcode_manager.get_limit_down_price_by_preprice(code, item["pre_close"])) if abs(limit_up_price - item["high"]) < 0.001 or abs(limit_down_price - item["close"]) < 0.001: # 炸板 # 或涨停 # 或者跌停 target_item = item break if not target_item: return False, None p_price, p_volume = target_item["high"], target_item["volume"] t_price, t_volume = 0, 0 for i in range(len(items) - 1, -1, -1): item = items[i] if item["bob"] == target_item["bob"]: break if item["high"] >= p_price * 1.03: t_price, t_volume = item["high"], item["volume"] break if t_price > 0: return True, ((p_price, p_volume), (t_price, t_volume)) else: return True, ((p_price, p_volume), None) def __is_latest_limit_down(code, datas, day_count): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) items = datas[0 - day_count:] for item in items: # 是否有跌停 limit_down_price = float(gpcode_manager.get_limit_down_price_by_preprice(code, item["pre_close"])) if abs(limit_down_price - item["close"]) < 0.001: # 跌停 return True return False def __is_pre_day_limit_rate_too_low(code, datas): """ 上个交易日是否跌幅过大 @param code: @param datas: @return: """ datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) items = datas[-1:] for item in items: # 是否有跌停 # 获取当日涨幅 rate_open = (item["open"] - item["pre_close"]) / item["pre_close"] rate_close = (item["close"] - item["pre_close"]) / item["pre_close"] threshold_rate_ = 0.15 if abs(rate_open - rate_close) >= threshold_rate_: return True return False # V字形 def __is_v_model(code, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-30:] max_price = 0 max_price_index = -1 for i in range(0, len(datas)): if max_price < datas[i]["close"]: max_price = datas[i]["close"] max_price_index = i min_price = max_price for i in range(max_price_index, len(datas)): if min_price > datas[i]["close"]: min_price = datas[i]["close"] if (max_price - min_price) / max_price > 0.249: return True, '' return False, '' # 是否天量大阳 def __get_big_volumn_info(code, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-30:] max_volume = 0 total_volume = 0 for data in datas: if max_volume < data["volume"]: max_volume = data["volume"] total_volume += data["volume"] average_volume = total_volume // len(datas) return max_volume, average_volume # 是否涨停 def __is_limit_up(code, data): limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"])) return abs(limit_up_price - data["close"]) < 0.009 # 是否涨停过 def __is_limited_up(code, data): limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"])) return abs(limit_up_price - data["high"]) < 0.009 # 多少天内是否有涨停/曾涨停 def __has_limit_up(code, datas, day_count): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) if len(datas) > day_count: datas = datas[0 - day_count:] if len(datas) >= 1: for i in range(0, len(datas)): item = datas[i] if __is_limit_up(code, item): return True return False # 多少天内是否曾涨停 def __has_limited_up(code, datas, day_count): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) if len(datas) > day_count: datas = datas[0 - day_count:] if len(datas) >= 1: for i in range(0, len(datas)): item = datas[i] if __is_limited_up(code, item): return True return False # 首板涨停溢价率 def get_limit_up_premium_rate(code, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) first_rate_list = [] for i in range(0, len(datas)): item = datas[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"])) if abs(limit_up_price - item["close"]) < 0.001: if 0 < i < len(datas) - 1 and not __is_limit_up(code, datas[i - 1]): # 首板涨停 rate = (datas[i + 1]["high"] - datas[i + 1]["pre_close"]) / datas[i + 1]["pre_close"] first_rate_list.append(rate) if not first_rate_list: return None count = 0 for rate in first_rate_list: if rate >= 0.01: count += 1 return count / len(first_rate_list) # 首板炸板溢价率 def get_open_limit_up_premium_rate(code, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) first_rate_list = [] for i in range(0, len(datas)): item = datas[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"])) if abs(limit_up_price - item["high"]) < 0.001 and abs(limit_up_price - item["close"]) > 0.001: if 0 < i < len(datas) - 1 and not __is_limit_up(code, datas[i - 1]): # 前一天未涨停 rate = (datas[i + 1]["high"] - item["high"]) / item["high"] first_rate_list.append(rate) if not first_rate_list: return None count = 0 for rate in first_rate_list: if rate >= 0.01: count += 1 return count / len(first_rate_list) # 是否具有辨识度 def is_special(code, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas_30 = datas[-30:] datas_90 = datas[-90:] count = 0 # 30个交易日内累计涨停次数≥4次 continue_count = 0 has_continue = False for item in datas_30: if __is_limit_up(code, item): continue_count += 1 count += 1 if continue_count >= 4: has_continue = True else: continue_count = 0 if count >= 5 and has_continue: return True, "短期辨识度" count = 0 has_continue = False # 90个交易日内涨停次数≥6次 for item in datas_90: if __is_limit_up(code, item): continue_count += 1 count += 1 if continue_count >= 4: has_continue = True else: continue_count = 0 if count >= 6 and has_continue: return True, "长期辨识度" return False, "" if __name__ == "__main__": code = "000333" threshold_rate = 0 - ((1 - tool.get_limit_down_rate(code)) * 0.9) print(threshold_rate)