""" 股性分析 """ # 是否有涨停 import copy import json import gpcode_manager # 代码股性记录管理 import tool from db.redis_manager import RedisManager class CodeNatureRecordManager: __redisManager = RedisManager(0) @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() # 保存K线形态 @classmethod def save_k_format(cls, code, k_format): cls.__get_redis().setex(f"k_format-{code}", tool.get_expire(), json.dumps(k_format)) @classmethod def get_k_format(cls, code): val = cls.__get_redis().get(f"k_format-{code}") if val: return json.loads(val) return None # 保存股性 @classmethod def save_nature(cls, code, natures): cls.__get_redis().setex(f"code_nature-{code}", tool.get_expire(), json.dumps(natures)) @classmethod def get_nature(cls, code): val = cls.__get_redis().get(f"code_nature-{code}") if val: return json.loads(val) return None # 设置历史K线 def set_record_datas(code, limit_up_price, record_datas): k_format = get_k_format(float(limit_up_price), record_datas) CodeNatureRecordManager.save_k_format(code, k_format) natures = get_nature(record_datas) CodeNatureRecordManager.save_nature(code, natures) # 获取K线形态 # 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V) def get_k_format(limit_up_price, record_datas): p1_data = get_lowest_price_rate(record_datas) p1 = p1_data[0] >= 0.249, p1_data[1] p2 = __is_new_top(limit_up_price, record_datas) p3 = __is_lowest(record_datas) p4 = __is_near_new_top(limit_up_price, record_datas) p5 = __is_n_model(record_datas) p6 = __is_v_model(record_datas) # N字型包含了N字型 if p5: p6 = False, '' return (p1, p2, p3, p4, p5, p6) # 是否具有K线形态 def is_has_k_format(limit_up_price, record_datas): is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v = get_k_format(float(limit_up_price), record_datas) # if is_too_high: # return False, "15个交易日涨幅大于24.9%" # if is_near_new_top: # return False, "逼近前高" if is_new_top: return True, "破前高" if is_lowest: return True, "超跌补涨" if is_n: return True, "N字型" if is_v: return True, "V字形" return False, "不满足K线形态" # 获取股性 # 返回(是否涨停,首板溢价率是否大于0.6) def get_nature(record_datas): limit_up = is_have_limit_up(record_datas) premium_rate = get_limit_up_premium_rate(record_datas) result = (limit_up, premium_rate >= 0.6,premium_rate) return result def get_lowest_price_rate(record_datas): datas = copy.deepcopy(record_datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-15:] low_price = datas[0]["close"] date = None for data in datas: if low_price > data["close"]: low_price = data["close"] date = data['bob'].strftime("%Y-%m-%d") return (datas[-1]["close"] - low_price) / low_price, date # 是否有涨停 def is_have_limit_up(datas): datas = copy.deepcopy(datas) for i in range(len(datas)): item = datas[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["close"]) < 0.01: return True, item['bob'].strftime("%Y-%m-%d") return False, '' # 是否破前高 def __is_new_top(limit_up_price, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] max_price = 0 for data in datas: if max_price < data["high"]: max_price = data["high"] if limit_up_price >= max_price: return True, '' return False, '' def is_new_top(limit_up_price, datas): return __is_new_top(float(limit_up_price), datas)[0] # 接近新高 def __is_near_new_top(limit_up_price, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] max_volume = 0 max_volume_index = 0 for index in range(0, len(datas)): data = datas[index] if max_volume < data["volume"]: max_volume = data["volume"] max_volume_index = index price = 0 price_index = 0 for index in range(max_volume_index, len(datas)): data = datas[index] if data["high"] > price: price = data["high"] price_index = index index = price_index # 最大量当日最高价比当日之后的最高价涨幅在15%以内 if (price - datas[max_volume_index]["high"]) / datas[max_volume_index]["high"] < 0.15: price = datas[max_volume_index]["high"] index = max_volume_index print(max_volume) rate = (price - limit_up_price) / limit_up_price if 0 < rate < 0.03: return True, datas[index]['bob'].strftime("%Y-%m-%d") return False, '' # 是否跌破箱体 def __is_lowest(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] min_price = 100000 for data in datas: if min_price > data["low"]: min_price = data["low"] # 近5天内的最低价 date = '' min_price_5 = 10000 for data in datas[-5:]: if min_price_5 > data["low"]: min_price_5 = data["low"] date = data['bob'] if abs(min_price_5 - min_price) / min_price < 0.015: return True, date.strftime("%Y-%m-%d") return False, '' # N字形 def __is_n_model(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] if len(datas) >= 6: max_price = 0 min_price = 1000000 for i in range(len(datas) - 5, len(datas)): item = datas[i] print(item) limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["close"]) < 0.001 and abs( limit_up_price - datas[i - 1]["close"]) >= 0.001: # 涨停,前一天非涨停 max_price = item["close"] elif max_price > 0: if min_price > item["low"]: min_price = item["low"] if max_price > min_price: return True, '' return False, '' # V字形 def __is_v_model(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-30:] max_price = 0 max_price_index = -1 for i in range(0, len(datas)): if max_price < datas[i]["close"]: max_price = datas[i]["close"] max_price_index = i min_price = max_price min_price_index = max_price_index for i in range(max_price_index, len(datas)): if min_price > datas[i]["close"]: min_price = datas[i]["close"] min_price_index = i if (max_price - min_price) / max_price > 0.249: return True, '' return False, '' # 首板涨停溢价率 def get_limit_up_premium_rate(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) first_rate_list = [] for i in range(0, len(datas)): item = datas[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["close"]) < 0.001 and abs( limit_up_price - datas[i - 1]["close"]) >= 0.001 and 0 < i < len(datas) - 1: # 首板涨停 rate = (datas[i + 1]["high"] - datas[i + 1]["pre_close"]) / datas[i + 1]["pre_close"] first_rate_list.append(rate) if not first_rate_list: return 0 count = 0 for rate in first_rate_list: if rate >= 0.01: count += 1 return count / len(first_rate_list) if __name__ == "__main__": print(CodeNatureRecordManager.get_k_format("603717"))