Administrator
2023-10-16 5ed30c03ddfedd4cf79cd8fea9fc45b05821d898
code_attribute/code_nature_analyse.py
@@ -10,11 +10,13 @@
# 代码股性记录管理
from utils import tool
from db.redis_manager import RedisManager
from db.redis_manager_delegate import RedisManager, RedisUtils
class CodeNatureRecordManager:
    __redisManager = RedisManager(0)
    __k_format_cache = {}
    __nature_cache = {}
    @classmethod
    def __get_redis(cls):
@@ -23,26 +25,47 @@
    # 保存K线形态
    @classmethod
    def save_k_format(cls, code, k_format):
        cls.__get_redis().setex(f"k_format-{code}", tool.get_expire(), json.dumps(k_format))
        RedisUtils.setex(cls.__get_redis(), f"k_format-{code}", tool.get_expire(), json.dumps(k_format))
    @classmethod
    def get_k_format(cls, code):
        val = cls.__get_redis().get(f"k_format-{code}")
        val = RedisUtils.get(cls.__get_redis(), f"k_format-{code}")
        if val:
            return json.loads(val)
        return None
    @classmethod
    def get_k_format_cache(cls, code):
        val = None
        if code in cls.__k_format_cache:
            val = cls.__k_format_cache[code]
        if not val:
            val = cls.get_k_format(code)
            if val:
                cls.__k_format_cache[code] = val
        # 复制
        return copy.deepcopy(val) if val else None
    # 保存股性
    @classmethod
    def save_nature(cls, code, natures):
        cls.__get_redis().setex(f"code_nature-{code}", tool.get_expire(), json.dumps(natures))
        RedisUtils.setex(cls.__get_redis(), f"code_nature-{code}", tool.get_expire(), json.dumps(natures))
    @classmethod
    def get_nature(cls, code):
        val = cls.__get_redis().get(f"code_nature-{code}")
        val = RedisUtils.get(cls.__get_redis(), f"code_nature-{code}")
        if val:
            return json.loads(val)
        return None
    @classmethod
    def get_nature_cache(cls, code):
        if code in cls.__nature_cache:
            return cls.__nature_cache[code]
        val = cls.get_nature(code)
        if val:
            cls.__nature_cache[code] = val
        return val
# 设置历史K线
@@ -54,7 +77,7 @@
# 获取K线形态
# 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V)
# 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度)
def get_k_format(limit_up_price, record_datas):
    p1_data = get_lowest_price_rate(record_datas)
    p1 = p1_data[0] >= 0.249, p1_data[1]
@@ -71,14 +94,16 @@
    p7 = (p1[0] or p2[0] or p3[0] or p4[0] or p5[0] or p6[0], '')
    return p1, p2, p3, p4, p5, p6, p7, p8
    # 是否具有辨识度
    p9 = is_special(record_datas)
    return p1, p2, p3, p4, p5, p6, p7, p8, p9
# 是否具有K线形态
def is_has_k_format(limit_up_price, record_datas):
    is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info = get_k_format(
        float(limit_up_price),
        record_datas)
    is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info, is_special = get_k_format(
        float(limit_up_price), record_datas)
    if not has_format:
        return False, "不满足K线形态"
    return True, "有形态"
@@ -107,6 +132,64 @@
    return 0, ''
# 是否涨得太高
def is_up_too_high_in_10d(record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-10:]
    limit_ups = []
    limit_up_count = 0
    for data in datas:
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
        date = data['bob'].strftime("%Y-%m-%d")
        if abs(limit_up_price - data["high"]) < 0.01:
            limit_ups.append((date, True))
            limit_up_count += 1
        else:
            limit_ups.append((date, False))
    if limit_up_count < 3:
        return False
    # 连续5天有3天涨停
    for i in range(len(limit_ups)):
        if i + 5 > len(limit_ups):
            break
        temp_datas = limit_ups[i:i + 5]
        t_count = 0
        for t in temp_datas:
            if t[1]:
                t_count += 1
        if t_count >= 3:
            return True
    return False
# 120 天内是否长得太高
def is_up_too_high_in_120d(record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-120:]
    today_limit_up_price = round(float(gpcode_manager.get_limit_up_price_by_preprice(datas[-1]["close"])), 2)
    max_price = 0
    for data in datas:
        if data["high"] > max_price:
            max_price = data["high"]
    if today_limit_up_price <= max_price:
        return False
    # 计算120天的均价
    total_amount = 0
    total_volume = 0
    for data in datas:
        total_amount += data["amount"]
        total_volume += data["volume"]
    average_price = round(total_amount / total_volume, 2)
    if (today_limit_up_price - average_price) / average_price > 0.3:
        return True
    else:
        return False
# 是否有涨停
def get_first_limit_up_count(datas):
    datas = copy.deepcopy(datas)
@@ -114,9 +197,9 @@
    for i in range(len(datas)):
        item = datas[i]
        # 获取首板涨停次数
        if __is_limit_up(item) and i>0 and not __is_limit_up(datas[i-1]):
        if __is_limit_up(item) and i > 0 and not __is_limit_up(datas[i - 1]):
            # 首板涨停
            count+=1
            count += 1
    return count
@@ -264,6 +347,12 @@
    return abs(limit_up_price - data["close"]) < 0.001
# 是否涨停过
def __is_limited_up(data):
    limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
    return abs(limit_up_price - data["high"]) < 0.001
# 首板涨停溢价率
def get_limit_up_premium_rate(datas):
    datas = copy.deepcopy(datas)
@@ -311,5 +400,25 @@
    return count / len(first_rate_list)
# 是否具有辨识度
def is_special(datas):
    # 30个交易日内有≥5天曾涨停且连续涨停数或曾涨停≥2天
    if len(datas) > 30:
        datas = datas[-30:]
    count = 0
    continue_count = 0
    last_index = -1
    for i in range(len(datas)):
        if __is_limited_up(datas[i]):
            if last_index >= 0 and i - last_index == 1:
                continue_count += 1
            count += 1
            last_index = i
    if count >= 5 and continue_count > 0:
        return True, ''
    return False, ''
if __name__ == "__main__":
    print(CodeNatureRecordManager.get_k_format("603717"))