"""
|
股性分析
|
"""
|
|
# 是否有涨停
|
import copy
|
import json
|
import random
|
import time
|
|
from code_attribute import gpcode_manager
|
|
# 代码股性记录管理
|
from db import redis_manager
|
from utils import tool
|
from db.redis_manager_delegate import RedisManager, RedisUtils
|
from utils.tool import CodeDataCacheUtil
|
|
|
class CodeNatureRecordManager:
|
__k_format_cache = {}
|
__nature_cache = {}
|
|
__db = 0
|
__instance = None
|
__redis_manager = redis_manager.RedisManager(0)
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(CodeNatureRecordManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "k_format-*")
|
for k in keys:
|
code = k.split("-")[1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
cls.__k_format_cache[code] = val
|
keys = RedisUtils.keys(__redis, "code_nature-*")
|
for k in keys:
|
code = k.split("-")[1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
cls.__nature_cache[code] = val
|
except:
|
pass
|
finally:
|
RedisUtils.realse(__redis)
|
|
# 保存K线形态
|
def save_k_format(self, code, k_format):
|
self.__k_format_cache[code] = k_format
|
RedisUtils.setex_async(self.__db, f"k_format-{code}", tool.get_expire(), json.dumps(k_format))
|
|
def get_k_format(self, code):
|
val = RedisUtils.get(self.__get_redis(), f"k_format-{code}")
|
if val:
|
return json.loads(val)
|
return None
|
|
def get_k_format_cache(self, code):
|
val = self.__k_format_cache.get(code)
|
# 复制
|
return copy.deepcopy(val) if val else None
|
|
def clear(self):
|
self.__k_format_cache.clear()
|
self.__nature_cache.clear()
|
keys = RedisUtils.keys(self.__get_redis(), "k_format-*")
|
for k in keys:
|
RedisUtils.delete(self.__get_redis(), k)
|
keys = RedisUtils.keys(self.__get_redis(), "code_nature-*")
|
for k in keys:
|
RedisUtils.delete(self.__get_redis(), k)
|
|
# 保存股性
|
|
def save_nature(self, code, natures):
|
RedisUtils.setex_async(self.__db, f"code_nature-{code}", tool.get_expire(), json.dumps(natures))
|
|
def get_nature(self, code):
|
val = RedisUtils.get(self.__get_redis(), f"code_nature-{code}")
|
if val:
|
return json.loads(val)
|
return None
|
|
def get_nature_cache(self, code):
|
return self.__nature_cache.get(code)
|
|
|
class LatestMaxVolumeManager:
|
__db = 0
|
__instance = None
|
__redis_manager = redis_manager.RedisManager(0)
|
__max_volume_cache = {}
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(LatestMaxVolumeManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "is_latest_max_volume-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
CodeDataCacheUtil.set_cache(cls.__max_volume_cache, code, val)
|
finally:
|
RedisUtils.realse(__redis)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
def __save_has_latest_max_volume(self, code):
|
RedisUtils.setex_async(self.__db, f"is_latest_max_volume-{code}", tool.get_expire(), "1")
|
|
# 设置最近有最大量
|
def set_has_latest_max_volume(self, code):
|
CodeDataCacheUtil.set_cache(self.__max_volume_cache, code, 1)
|
self.__save_has_latest_max_volume(code)
|
|
# 最近是否有最大量
|
def is_latest_max_volume(self, code):
|
return code in self.__max_volume_cache
|
|
def clear(self):
|
self.__max_volume_cache.clear()
|
keys = RedisUtils.keys(self.__get_redis(), "is_latest_max_volume-*")
|
for k in keys:
|
RedisUtils.delete_async(self.__db, k)
|
|
|
# 涨幅过高的票管理
|
class HighIncreaseCodeManager:
|
__db = 0
|
__instance = None
|
__redis_manager = redis_manager.RedisManager(0)
|
__high_increase_codes = set()
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(HighIncreaseCodeManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
cls.__high_increase_codes = RedisUtils.smembers(__redis, "high_increase_codes")
|
finally:
|
RedisUtils.realse(__redis)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
def add_code(self, code):
|
if self.__high_increase_codes is None:
|
self.__high_increase_codes = set()
|
self.__high_increase_codes.add(code)
|
RedisUtils.sadd_async(self.__db, "high_increase_codes", code)
|
RedisUtils.expire_async(self.__db, "high_increase_codes", tool.get_expire())
|
|
def is_in(self, code):
|
return code in self.__high_increase_codes
|
|
# 加载全部
|
def list_all(self):
|
return self.__high_increase_codes
|
|
def clear(self):
|
if self.__high_increase_codes:
|
self.__high_increase_codes.clear()
|
RedisUtils.delete_async(self.__db, "high_increase_codes")
|
|
|
# 设置历史K线
|
def set_record_datas(code, limit_up_price, record_datas):
|
k_format = get_k_format(float(limit_up_price), record_datas)
|
CodeNatureRecordManager().save_k_format(code, k_format)
|
natures = get_nature(record_datas)
|
CodeNatureRecordManager().save_nature(code, natures)
|
|
|
# 获取K线形态
|
# 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度,近2天有10天内最大量,上个交易日是否炸板)
|
def get_k_format(limit_up_price, record_datas):
|
p1_data = get_lowest_price_rate(record_datas)
|
p1 = p1_data[0] >= 0.249, p1_data[1]
|
p2 = __is_new_top(limit_up_price, record_datas)
|
p3 = __is_lowest(record_datas)
|
p4 = __is_near_new_top(limit_up_price, record_datas)
|
p5 = __is_n_model(record_datas)
|
p6 = __is_v_model(record_datas)
|
p8 = __get_big_volumn_info(record_datas)
|
|
# # N字型包含了N字型
|
# if p5:
|
# p6 = False, ''
|
|
p7 = (p1[0] or p2[0] or p3[0] or p4[0] or p5[0] or p6[0], '')
|
|
# 是否具有辨识度
|
p9 = is_special(record_datas)
|
p10 = is_latest_10d_max_volume_at_latest_2d(record_datas)
|
p11 = __is_yesterday_open_limit_up(record_datas)
|
|
return p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11
|
|
|
# 是否具有K线形态
|
def is_has_k_format(limit_up_price, record_datas):
|
is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info, is_special, has_max_volume, open_limit_up = get_k_format(
|
float(limit_up_price), record_datas)
|
if not has_format:
|
return False, "不满足K线形态"
|
return True, "有形态"
|
|
|
# 获取股性
|
# 返回(是否涨停,首板溢价率,首板炸板溢价率)
|
def get_nature(record_datas):
|
limit_up_count = get_first_limit_up_count(record_datas)
|
premium_rate = get_limit_up_premium_rate(record_datas)
|
open_premium_rate = get_open_limit_up_premium_rate(record_datas)
|
result = (limit_up_count, premium_rate, open_premium_rate)
|
return result
|
|
|
# 获取涨幅
|
def get_lowest_price_rate(record_datas):
|
datas = copy.deepcopy(record_datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[-10:]
|
for data in datas:
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
|
if abs(limit_up_price - data["high"]) < 0.01:
|
date = data['bob'].strftime("%Y-%m-%d")
|
return round((datas[-1]["close"] - data["close"]) / data["close"], 4), date
|
return 0, ''
|
|
|
# 是否涨得太高
|
def is_up_too_high_in_10d_with_limit_up(record_datas):
|
datas = copy.deepcopy(record_datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[-10:]
|
limit_ups = []
|
limit_up_count = 0
|
max_price = datas[0]["high"]
|
for data in datas:
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
|
date = data['bob'].strftime("%Y-%m-%d")
|
if data["high"] > max_price:
|
max_price = data["high"]
|
if abs(limit_up_price - data["high"]) < 0.01:
|
limit_ups.append((date, True))
|
limit_up_count += 1
|
else:
|
limit_ups.append((date, False))
|
# 下降幅度
|
limit_down_rate = round((max_price - datas[-1]["close"]) / max_price, 3)
|
|
if limit_up_count < 3:
|
return False
|
# 连续5天有3天涨停
|
for i in range(len(limit_ups)):
|
if i + 5 > len(limit_ups):
|
break
|
temp_datas = limit_ups[i:i + 5]
|
t_count = 0
|
for t in temp_datas:
|
if t[1]:
|
t_count += 1
|
if t_count >= 3 and limit_down_rate < 0.15:
|
# 降幅小于20%
|
return True
|
|
return False
|
|
|
# 10天内的最高量是否集中在最近两天
|
def is_latest_10d_max_volume_at_latest_2d(record_datas):
|
datas = copy.deepcopy(record_datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[-10:]
|
max_volume_info = None
|
for i in range(0, len(datas)):
|
if not max_volume_info:
|
max_volume_info = (i, datas[i]["volume"])
|
else:
|
if max_volume_info[1] < datas[i]["volume"]:
|
max_volume_info = (i, datas[i]["volume"])
|
return len(datas) - max_volume_info[0] <= 2
|
|
|
# 120 天内是否长得太高
|
def is_up_too_high_in_120d(record_datas):
|
datas = copy.deepcopy(record_datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[-120:]
|
today_limit_up_price = round(float(gpcode_manager.get_limit_up_price_by_preprice(datas[-1]["close"])), 2)
|
max_price = 0
|
for data in datas:
|
if data["high"] > max_price:
|
max_price = data["high"]
|
if today_limit_up_price <= max_price:
|
return False
|
|
# 计算120天的均价
|
total_amount = 0
|
total_volume = 0
|
for data in datas:
|
total_amount += data["amount"]
|
total_volume += data["volume"]
|
average_price = round(total_amount / total_volume, 2)
|
if (today_limit_up_price - average_price) / average_price > 0.3:
|
return True
|
else:
|
return False
|
|
|
# 最近几天是否有最大量
|
def is_have_latest_max_volume(record_datas, day_count):
|
datas = copy.deepcopy(record_datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[-120:]
|
max_volume = (0, datas[0]["volume"])
|
for i in range(0, len(datas)):
|
if max_volume[1] < datas[i]["volume"]:
|
max_volume = (i, datas[i]["volume"])
|
if len(datas) - max_volume[0] <= day_count:
|
return True
|
return False
|
|
|
# 在最近几天内股价是否长得太高
|
def is_price_too_high_in_days(record_datas, limit_up_price, day_count=6):
|
datas = copy.deepcopy(record_datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[0 - day_count:]
|
min_price = None
|
max_price = None
|
for d in datas:
|
if min_price is None:
|
min_price = d["low"]
|
if max_price is None:
|
max_price = d["high"]
|
if min_price > d["low"]:
|
min_price = d["low"]
|
if max_price < d["high"]:
|
max_price = d["high"]
|
if max_price > float(limit_up_price):
|
return False
|
rate = (float(limit_up_price) - min_price) / min_price
|
# print(rate)
|
if rate >= 0.28:
|
return True
|
return False
|
|
|
# 是否有涨停
|
def get_first_limit_up_count(datas):
|
datas = copy.deepcopy(datas)
|
count = 0
|
for i in range(len(datas)):
|
item = datas[i]
|
# 获取首板涨停次数
|
if __is_limit_up(item) and i > 0 and not __is_limit_up(datas[i - 1]):
|
# 首板涨停
|
count += 1
|
|
return count
|
|
|
# 是否破前高
|
def __is_new_top(limit_up_price, datas):
|
datas = copy.deepcopy(datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[-80:]
|
max_price = 0
|
for data in datas:
|
if max_price < data["high"]:
|
max_price = data["high"]
|
if limit_up_price >= max_price:
|
return True, ''
|
return False, ''
|
|
|
def is_new_top(limit_up_price, datas):
|
return __is_new_top(float(limit_up_price), datas)[0]
|
|
|
# 接近新高
|
def __is_near_new_top(limit_up_price, datas):
|
datas = copy.deepcopy(datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[-80:]
|
|
max_price = 0
|
max_price_index = 0
|
for index in range(0, len(datas)):
|
data = datas[index]
|
if data["high"] > max_price:
|
max_price = data["high"]
|
max_price_index = index
|
rate = (max_price - float(limit_up_price)) / max_price
|
if 0 < rate < 0.02:
|
return True, datas[max_price_index]['bob'].strftime("%Y-%m-%d")
|
return False, ''
|
|
|
# 是否跌破箱体
|
def __is_lowest(datas):
|
datas = copy.deepcopy(datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[-80:]
|
min_price = 100000
|
for data in datas:
|
if min_price > data["low"]:
|
min_price = data["low"]
|
# 近5天内的最低价
|
date = ''
|
min_price_5 = 10000
|
for data in datas[-5:]:
|
if min_price_5 > data["low"]:
|
min_price_5 = data["low"]
|
date = data['bob']
|
if abs(min_price_5 - min_price) / min_price < 0.015:
|
return True, date.strftime("%Y-%m-%d")
|
return False, ''
|
|
|
# N字形
|
def __is_n_model(datas):
|
datas = copy.deepcopy(datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[-80:]
|
if len(datas) >= 6:
|
max_price = 0
|
min_price = 1000000
|
for i in range(len(datas) - 5, len(datas)):
|
item = datas[i]
|
print(item)
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
|
if abs(limit_up_price - item["high"]) < 0.001 and abs(
|
limit_up_price - datas[i - 1]["high"]) >= 0.001:
|
# 涨停,前一天非涨停
|
max_price = item["close"]
|
elif max_price > 0:
|
if min_price > item["low"]:
|
min_price = item["low"]
|
if max_price > min_price:
|
return True, ''
|
return False, ''
|
|
|
# 昨天是否炸板
|
def __is_yesterday_open_limit_up(datas):
|
datas = copy.deepcopy(datas)
|
datas.sort(key=lambda x: x["bob"])
|
item = datas[-1]
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
|
if abs(limit_up_price - item["high"]) < 0.001 and abs(limit_up_price - item["close"]) > 0.001:
|
# 炸板
|
return True
|
return False
|
|
|
# V字形
|
def __is_v_model(datas):
|
datas = copy.deepcopy(datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[-30:]
|
max_price = 0
|
max_price_index = -1
|
for i in range(0, len(datas)):
|
if max_price < datas[i]["close"]:
|
max_price = datas[i]["close"]
|
max_price_index = i
|
min_price = max_price
|
min_price_index = max_price_index
|
for i in range(max_price_index, len(datas)):
|
if min_price > datas[i]["close"]:
|
min_price = datas[i]["close"]
|
min_price_index = i
|
|
if (max_price - min_price) / max_price > 0.249:
|
return True, ''
|
|
return False, ''
|
|
|
# 是否天量大阳
|
def __get_big_volumn_info(datas):
|
datas = copy.deepcopy(datas)
|
datas.sort(key=lambda x: x["bob"])
|
datas = datas[-30:]
|
max_volume = 0
|
total_volume = 0
|
for data in datas:
|
if max_volume < data["volume"]:
|
max_volume = data["volume"]
|
total_volume += data["volume"]
|
average_volume = total_volume // len(datas)
|
return max_volume, average_volume
|
|
|
# 是否涨停
|
def __is_limit_up(data):
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
|
return abs(limit_up_price - data["close"]) < 0.001
|
|
|
# 是否涨停过
|
def __is_limited_up(data):
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
|
return abs(limit_up_price - data["high"]) < 0.001
|
|
|
# 首板涨停溢价率
|
def get_limit_up_premium_rate(datas):
|
datas = copy.deepcopy(datas)
|
datas.sort(key=lambda x: x["bob"])
|
first_rate_list = []
|
for i in range(0, len(datas)):
|
item = datas[i]
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
|
if abs(limit_up_price - item["close"]) < 0.001:
|
if 0 < i < len(datas) - 1 and not __is_limit_up(datas[i - 1]):
|
# 首板涨停
|
rate = (datas[i + 1]["high"] - datas[i + 1]["pre_close"]) / datas[i + 1]["pre_close"]
|
first_rate_list.append(rate)
|
if not first_rate_list:
|
return None
|
count = 0
|
for rate in first_rate_list:
|
if rate >= 0.01:
|
count += 1
|
return count / len(first_rate_list)
|
|
|
# 首板炸板溢价率
|
def get_open_limit_up_premium_rate(datas):
|
datas = copy.deepcopy(datas)
|
datas.sort(key=lambda x: x["bob"])
|
first_rate_list = []
|
for i in range(0, len(datas)):
|
item = datas[i]
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
|
|
if abs(limit_up_price - item["high"]) < 0.001 and abs(limit_up_price - item["close"]) > 0.001:
|
#
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(datas[i - 1]["pre_close"]))
|
if 0 < i < len(datas) - 1 and not __is_limit_up(datas[i - 1]):
|
# 前一天未涨停
|
rate = (datas[i + 1]["high"] - item["high"]) / item["high"]
|
first_rate_list.append(rate)
|
if not first_rate_list:
|
return None
|
count = 0
|
for rate in first_rate_list:
|
if rate >= 0.01:
|
count += 1
|
return count / len(first_rate_list)
|
|
|
# 是否具有辨识度
|
def is_special(datas):
|
# 30个交易日内有≥5天曾涨停且连续涨停数或曾涨停≥2天
|
if len(datas) > 30:
|
datas = datas[-30:]
|
|
count = 0
|
continue_count = 0
|
last_index = -1
|
for i in range(len(datas)):
|
if __is_limited_up(datas[i]):
|
if last_index >= 0 and i - last_index == 1:
|
continue_count += 1
|
count += 1
|
last_index = i
|
if count >= 5 and continue_count > 0:
|
return True, ''
|
return False, ''
|
|
|
if __name__ == "__main__":
|
HighIncreaseCodeManager().add_code("000333")
|
print(HighIncreaseCodeManager().is_in("000333"))
|
print(HighIncreaseCodeManager().is_in("000222"))
|