""" 股性分析 """ # 是否有涨停 import copy import json import gpcode_manager # 代码股性记录管理 import tool from db.redis_manager import RedisManager class CodeNatureRecordManager: __redisManager = RedisManager(0) @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() # 保存K线形态 @classmethod def save_k_format(cls, code, k_format): cls.__get_redis().setex(f"k_format-{code}", tool.get_expire(), json.dumps(k_format)) @classmethod def get_k_format(cls, code): val = cls.__get_redis().get(f"k_format-{code}") if val: return json.loads(val) return None # 保存股性 @classmethod def save_nature(cls, code, natures): cls.__get_redis().setex(f"code_nature-{code}", tool.get_expire(), json.dumps(natures)) @classmethod def get_nature(cls, code): val = cls.__get_redis().get(f"code_nature-{code}") if val: return json.loads(val) return None # 设置历史K线 def set_record_datas(code, limit_up_price, record_datas): k_format = get_k_format(float(limit_up_price), record_datas) CodeNatureRecordManager.save_k_format(code, k_format) natures = get_nature(record_datas) CodeNatureRecordManager.save_nature(code, natures) # 获取K线形态 # 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V) def get_k_format(limit_up_price, record_datas): p1_data = get_lowest_price_rate(record_datas) p1 = p1_data[0] >= 0.249, p1_data[1] p2 = __is_new_top(limit_up_price, record_datas) p3 = __is_lowest(record_datas) p4 = __is_near_new_top(limit_up_price, record_datas) p5 = __is_n_model(record_datas) p6 = __is_v_model(record_datas) p8 = __get_big_volumn_info(record_datas) # # N字型包含了N字型 # if p5: # p6 = False, '' p7 = (p1[0] or p2[0] or p3[0] or p4[0] or p5[0] or p6[0], '') return p1, p2, p3, p4, p5, p6, p7, p8 # 是否具有K线形态 def is_has_k_format(limit_up_price, record_datas): is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info = get_k_format( float(limit_up_price), record_datas) if not has_format: return False, "不满足K线形态" return True, "有形态" # 获取股性 # 返回(是否涨停,首板溢价率,首板炸板溢价率) def get_nature(record_datas): limit_up_count = get_first_limit_up_count(record_datas) premium_rate = get_limit_up_premium_rate(record_datas) open_premium_rate = get_open_limit_up_premium_rate(record_datas) result = (limit_up_count, premium_rate, open_premium_rate) return result # 获取涨幅 def get_lowest_price_rate(record_datas): datas = copy.deepcopy(record_datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-10:] for data in datas: limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"])) if abs(limit_up_price - data["high"]) < 0.01: date = data['bob'].strftime("%Y-%m-%d") return round((datas[-1]["close"] - data["close"]) / data["close"], 4), date return 0, '' # 是否有涨停 def get_first_limit_up_count(datas): datas = copy.deepcopy(datas) count = 0 for i in range(len(datas)): item = datas[i] # 获取首板涨停次数 if __is_limit_up(item) and i>0 and not __is_limit_up(datas[i-1]): # 首板涨停 count+=1 return count # 是否破前高 def __is_new_top(limit_up_price, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] max_price = 0 for data in datas: if max_price < data["high"]: max_price = data["high"] if limit_up_price >= max_price: return True, '' return False, '' def is_new_top(limit_up_price, datas): return __is_new_top(float(limit_up_price), datas)[0] # 接近新高 def __is_near_new_top(limit_up_price, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] max_volume = 0 max_volume_index = 0 for index in range(0, len(datas)): data = datas[index] if max_volume < data["volume"]: max_volume = data["volume"] max_volume_index = index price = 0 price_index = 0 for index in range(max_volume_index, len(datas)): data = datas[index] if data["high"] > price: price = data["high"] price_index = index index = price_index # 最大量当日最高价比当日之后的最高价涨幅在15%以内 if (price - datas[max_volume_index]["high"]) / datas[max_volume_index]["high"] < 0.15: price = datas[max_volume_index]["high"] index = max_volume_index print(max_volume) rate = (price - limit_up_price) / limit_up_price if 0 < rate < 0.03: return True, datas[index]['bob'].strftime("%Y-%m-%d") return False, '' # 是否跌破箱体 def __is_lowest(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] min_price = 100000 for data in datas: if min_price > data["low"]: min_price = data["low"] # 近5天内的最低价 date = '' min_price_5 = 10000 for data in datas[-5:]: if min_price_5 > data["low"]: min_price_5 = data["low"] date = data['bob'] if abs(min_price_5 - min_price) / min_price < 0.015: return True, date.strftime("%Y-%m-%d") return False, '' # N字形 def __is_n_model(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] if len(datas) >= 6: max_price = 0 min_price = 1000000 for i in range(len(datas) - 5, len(datas)): item = datas[i] print(item) limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["high"]) < 0.001 and abs( limit_up_price - datas[i - 1]["high"]) >= 0.001: # 涨停,前一天非涨停 max_price = item["close"] elif max_price > 0: if min_price > item["low"]: min_price = item["low"] if max_price > min_price: return True, '' return False, '' # V字形 def __is_v_model(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-30:] max_price = 0 max_price_index = -1 for i in range(0, len(datas)): if max_price < datas[i]["close"]: max_price = datas[i]["close"] max_price_index = i min_price = max_price min_price_index = max_price_index for i in range(max_price_index, len(datas)): if min_price > datas[i]["close"]: min_price = datas[i]["close"] min_price_index = i if (max_price - min_price) / max_price > 0.249: return True, '' return False, '' # 是否天量大阳 def __get_big_volumn_info(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-30:] max_volume = 0 total_volume = 0 for data in datas: if max_volume < data["volume"]: max_volume = data["volume"] total_volume += data["volume"] average_volume = total_volume // len(datas) return max_volume, average_volume # 是否涨停 def __is_limit_up(data): limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"])) return abs(limit_up_price - data["close"]) < 0.001 # 首板涨停溢价率 def get_limit_up_premium_rate(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) first_rate_list = [] for i in range(0, len(datas)): item = datas[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["close"]) < 0.001: if 0 < i < len(datas) - 1 and not __is_limit_up(datas[i - 1]): # 首板涨停 rate = (datas[i + 1]["high"] - datas[i + 1]["pre_close"]) / datas[i + 1]["pre_close"] first_rate_list.append(rate) if not first_rate_list: return None count = 0 for rate in first_rate_list: if rate >= 0.01: count += 1 return count / len(first_rate_list) # 首板炸板溢价率 def get_open_limit_up_premium_rate(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) first_rate_list = [] for i in range(0, len(datas)): item = datas[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["high"]) < 0.001 and abs(limit_up_price - item["close"]) > 0.001: # limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(datas[i - 1]["pre_close"])) if 0 < i < len(datas) - 1 and not __is_limit_up(datas[i - 1]): # 前一天未涨停 rate = (datas[i + 1]["high"] - item["high"]) / item["high"] first_rate_list.append(rate) if not first_rate_list: return None count = 0 for rate in first_rate_list: if rate >= 0.01: count += 1 return count / len(first_rate_list) if __name__ == "__main__": print(CodeNatureRecordManager.get_k_format("603717"))