""" 股性分析 """ # 是否有涨停 import copy import json import gpcode_manager # 代码股性记录管理 import tool from db.redis_manager import RedisManager class CodeNatureRecordManager: __redisManager = RedisManager(0) @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() # 保存K线形态 @classmethod def save_k_format(cls, code, k_format): cls.__get_redis().setex(f"k_format-{code}", tool.get_expire(), json.dumps(k_format)) @classmethod def get_k_format(cls, code): val = cls.__get_redis().get(f"k_format-{code}") if val: return json.loads(val) return None # 保存股性 @classmethod def save_nature(cls, code, natures): cls.__get_redis().setex(f"code_nature-{code}", tool.get_expire(), json.dumps(natures)) @classmethod def get_nature(cls, code): val = cls.__get_redis().get(f"code_nature-{code}") if val: return json.loads(val) return None # 设置历史K线 def set_record_datas(code, limit_up_price, record_datas): k_format = get_k_format(limit_up_price, record_datas) CodeNatureRecordManager.save_k_format(code, k_format) natures = get_nature(record_datas) CodeNatureRecordManager.save_nature(code, natures) # 获取K线形态 # 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V) def get_k_format(limit_up_price, record_datas): p1 = get_lowest_price_rate(record_datas) >= 0.249 p2 = __is_new_top(limit_up_price, record_datas) p3 = __is_lowest(record_datas) p4 = __is_near_new_top(limit_up_price, record_datas) p5 = __is_n_model(record_datas) p6 = __is_v_model(record_datas) # N字型包含了N字型 if p5: p6 = False return (p1, p2, p3, p4, p5, p6) # 是否具有K线形态 def is_has_k_format(limit_up_price, record_datas): is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v = get_k_format(limit_up_price, record_datas) # if is_too_high: # return False, "15个交易日涨幅大于24.9%" # if is_near_new_top: # return False, "逼近前高" if is_new_top: return True, "破前高" if is_lowest: return True, "超跌补涨" if is_n: return True, "N字型" if is_v: return True, "V字形" return False, "不满足K线形态" # 获取股性 # 返回(是否涨停,首板溢价率是否大于0.6) def get_nature(record_datas): limit_up = is_have_limit_up(record_datas) premium_rate = get_limit_up_premium_rate(record_datas) result = (limit_up, premium_rate >= 0.6) return result def get_lowest_price_rate(record_datas): datas = copy.deepcopy(record_datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-15:] low_price = datas[0]["close"] for data in datas: if low_price > data["close"]: low_price = data["close"] return (datas[-1]["close"] - low_price) / low_price # 是否有涨停 def is_have_limit_up(datas): datas = copy.deepcopy(datas) for i in range(len(datas)): item = datas[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["close"]) < 0.01: return True return False def is_have_limit_up_by_code(code): return False # 是否破前高 def __is_new_top(limit_up_price, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] max_price = 0 for data in datas: if max_price < data["high"]: max_price = data["high"] if limit_up_price > max_price: return True return False # 接近新高 def __is_near_new_top(limit_up_price, datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] max_volume = 0 price = 0 for data in datas: if max_volume < data["volume"]: max_volume = data["volume"] price = data["high"] print(max_volume) if limit_up_price < price and (price - limit_up_price) / limit_up_price < 0.03: return True return False # 是否跌破箱体 def __is_lowest(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] min_price = 100000 for data in datas: if min_price > data["low"]: min_price = data["low"] # 近5天内的最低价 min_price_5 = 10000 for data in datas[-5:]: if min_price_5 > data["low"]: min_price_5 = data["low"] if abs(min_price_5 - min_price) / min_price < 0.015: return True return False # N字形 def __is_n_model(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-80:] if len(datas) >= 6: max_price = 0 min_price = 1000000 for i in range(len(datas) - 5, len(datas)): item = datas[i] print(item) limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["close"]) < 0.001 and abs( limit_up_price - datas[i - 1]["close"]) >= 0.001: # 涨停,前一天非涨停 max_price = item["close"] elif max_price > 0: if min_price > item["low"]: min_price = item["low"] if max_price > min_price: return True return False # V字形 def __is_v_model(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) datas = datas[-30:] max_price = 0 max_price_index = -1 for i in range(0, len(datas)): if max_price < datas[i]["close"]: max_price = datas[i]["close"] max_price_index = i min_price = max_price min_price_index = max_price_index for i in range(max_price_index, len(datas)): if min_price > datas[i]["close"]: min_price = datas[i]["close"] min_price_index = i if (max_price - min_price) / max_price > 0.249: return True return False # 首板涨停溢价率 def get_limit_up_premium_rate(datas): datas = copy.deepcopy(datas) datas.sort(key=lambda x: x["bob"]) first_rate_list = [] for i in range(0, len(datas)): item = datas[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["close"]) < 0.001 and abs( limit_up_price - datas[i - 1]["close"]) >= 0.001 and 0 < i < len(datas) - 1: # 首板涨停 rate = (datas[i + 1]["high"] - datas[i + 1]["pre_close"]) / datas[i + 1]["pre_close"] first_rate_list.append(rate) if not first_rate_list: return 1 count = 0 for rate in first_rate_list: if rate >= 0.01: count += 1 return count / len(first_rate_list)