Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
code_attribute/code_nature_analyse.py
@@ -5,15 +5,13 @@
# 是否有涨停
import copy
import json
import random
import time
from code_attribute import gpcode_manager
# 代码股性记录管理
from db import redis_manager
from db import redis_manager_delegate as redis_manager
from utils import tool
from db.redis_manager_delegate import RedisManager, RedisUtils
from db.redis_manager_delegate import RedisUtils
from utils.tool import CodeDataCacheUtil
@@ -191,23 +189,23 @@
# 设置历史K线
def set_record_datas(code, limit_up_price, record_datas):
    k_format = get_k_format(float(limit_up_price), record_datas)
    k_format = get_k_format(code, float(limit_up_price), record_datas)
    CodeNatureRecordManager().save_k_format(code, k_format)
    natures = get_nature(record_datas)
    natures = get_nature(code, record_datas)
    CodeNatureRecordManager().save_nature(code, natures)
# 获取K线形态
# 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度,近2天有10天内最大量,上个交易日是否炸板)
def get_k_format(limit_up_price, record_datas):
    p1_data = get_lowest_price_rate(record_datas)
# 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度,近2天有10天内最大量,上个交易日是否炸板, 上个交易日是否跌停)
def get_k_format(code, limit_up_price, record_datas):
    p1_data = get_lowest_price_rate(code, record_datas)
    p1 = p1_data[0] >= 0.249, p1_data[1]
    p2 = __is_new_top(limit_up_price, record_datas)
    p3 = __is_lowest(record_datas)
    p4 = __is_near_new_top(limit_up_price, record_datas)
    p5 = __is_n_model(record_datas)
    p6 = __is_v_model(record_datas)
    p8 = __get_big_volumn_info(record_datas)
    p2 = __is_new_top(code, limit_up_price, record_datas)
    p3 = __is_lowest(code, record_datas)
    p4 = __is_near_new_top(code, limit_up_price, record_datas)
    p5 = __is_n_model(code, record_datas)
    p6 = __is_v_model(code, record_datas)
    p8 = __get_big_volumn_info(code, record_datas)
    # # N字型包含了N字型
    # if p5:
@@ -216,19 +214,28 @@
    p7 = (p1[0] or p2[0] or p3[0] or p4[0] or p5[0] or p6[0], '')
    # 是否具有辨识度
    p9 = is_special(record_datas)
    p10 = is_latest_10d_max_volume_at_latest_2d(record_datas)
    # 最近5天是否跌停/炸板
    p11 = __is_latest_open_limit_up_or_limit_down(record_datas, 5)
    # 30天内是否有涨停
    p12 = __has_limit_up(record_datas, 30)
    p9 = is_special(code, record_datas)
    p10 = is_latest_10d_max_volume_at_latest_2d(code, record_datas)
    # 最近5天是否有炸板/涨停/跌停
    p11 = __has_latest_throwing_pressure(code, record_datas, 5)
    # 90天内是否有涨停
    p12 = __has_limit_up(code, record_datas, 90)
    # 上个交易日是否振幅过大
    p13 = __is_pre_day_limit_rate_too_low(code, record_datas)
    # 60个交易日是否曾涨停
    p14 = __has_limited_up(code, record_datas, 60)
    # 昨日是否涨停过
    p15 = __has_limited_up(code, record_datas, 1)
    # 昨日是否跌停
    p16 = __is_latest_limit_down(code, record_datas, 1)
    return p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12
    return p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16
# 是否具有K线形态
def is_has_k_format(limit_up_price, record_datas):
    is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info, is_special, has_max_volume, open_limit_up, is_limit_up_in_30days = get_k_format(
def is_has_k_format(code, limit_up_price, record_datas):
    is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info, is_special, has_max_volume, open_limit_up, is_limit_up_in_30days, is_latest_limit_down = get_k_format(
        code,
        float(limit_up_price), record_datas)
    if not has_format:
        return False, "不满足K线形态"
@@ -237,29 +244,29 @@
# 获取股性
# 返回(是否涨停,首板溢价率,首板炸板溢价率)
def get_nature(record_datas):
    limit_up_count = get_first_limit_up_count(record_datas)
    premium_rate = get_limit_up_premium_rate(record_datas)
    open_premium_rate = get_open_limit_up_premium_rate(record_datas)
def get_nature(code, record_datas):
    limit_up_count = get_first_limit_up_count(code, record_datas)
    premium_rate = get_limit_up_premium_rate(code, record_datas)
    open_premium_rate = get_open_limit_up_premium_rate(code, record_datas)
    result = (limit_up_count, premium_rate, open_premium_rate)
    return result
# 获取涨幅
def get_lowest_price_rate(record_datas):
def get_lowest_price_rate(code, record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-10:]
    for data in datas:
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
        if abs(limit_up_price - data["high"]) < 0.01:
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"]))
        if abs(limit_up_price - data["high"]) < 0.001:
            date = data['bob'].strftime("%Y-%m-%d")
            return round((datas[-1]["close"] - data["close"]) / data["close"], 4), date
    return 0, ''
# 是否涨得太高
def is_up_too_high_in_10d_with_limit_up(record_datas):
def is_up_too_high_in_10d_with_limit_up(code, record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-10:]
@@ -267,11 +274,11 @@
    limit_up_count = 0
    max_price = datas[0]["high"]
    for data in datas:
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"]))
        date = data['bob'].strftime("%Y-%m-%d")
        if data["high"] > max_price:
            max_price = data["high"]
        if abs(limit_up_price - data["high"]) < 0.01:
        if abs(limit_up_price - data["high"]) < 0.001:
            limit_ups.append((date, True))
            limit_up_count += 1
        else:
@@ -298,7 +305,7 @@
# 10天内的最高量是否集中在最近两天
def is_latest_10d_max_volume_at_latest_2d(record_datas):
def is_latest_10d_max_volume_at_latest_2d(code, record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-10:]
@@ -313,11 +320,12 @@
# 120 天内是否长得太高
def is_up_too_high_in_120d(record_datas):
def is_up_too_high_in_120d(code, record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-120:]
    today_limit_up_price = round(float(gpcode_manager.get_limit_up_price_by_preprice(datas[-1]["close"])), 2)
    today_limit_up_price = round(
        float(gpcode_manager.get_limit_up_price_by_preprice(code, datas[-1]["close"])), 2)
    max_price = 0
    for data in datas:
        if data["high"] > max_price:
@@ -338,8 +346,35 @@
        return False
# 暂时不使用
# 从最近一次涨停开始,是否涨幅过高
def is_up_too_high_from_latest_limit_up(code, record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-20:]
    datas.reverse()
    today_limit_up_price = round(
        float(gpcode_manager.get_limit_up_price_by_preprice(code, datas[0]["close"])), 2)
    max_price = 0
    limit_up_price = None
    for i in range(0, len(datas)):
        item = datas[i]
        if item['high'] > max_price:
            max_price = item['high']
        if __is_limited_up(code, item):
            limit_up_price = item['high']
            break
    if not limit_up_price:
        return False
    if today_limit_up_price < max_price:
        return False
    if (today_limit_up_price - limit_up_price) / limit_up_price > 0.25:
        return True
    return False
# 最近几天是否有最大量
def is_have_latest_max_volume(record_datas, day_count):
def is_have_latest_max_volume(code, record_datas, day_count):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-120:]
@@ -353,38 +388,82 @@
# 在最近几天内股价是否长得太高
def is_price_too_high_in_days(record_datas, limit_up_price, day_count=5):
def is_price_too_high_in_days(code, record_datas, limit_up_price, day_count=5):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[0 - day_count:]
    min_price = None
    max_price = None
    min_price_info = None
    max_price_info = None
    for d in datas:
        if min_price is None:
            min_price = d["low"]
        if max_price is None:
            max_price = d["high"]
        if min_price > d["low"]:
            min_price = d["low"]
        if max_price < d["high"]:
            max_price = d["high"]
        if min_price_info is None:
            min_price_info = d["low"], d
        if max_price_info is None:
            max_price_info = d["high"], d
        if min_price_info[0] > d["low"]:
            min_price_info = d["low"], d
        if max_price_info[0] < d["high"]:
            max_price_info = d["high"], d
    # if max_price > float(limit_up_price):
    #     return False
    rate = (float(limit_up_price) - min_price) / min_price
    # print(rate)
    if rate >= 0.28:
        return True
    return False
    rate = (max_price_info[1]["high"] - min_price_info[1]["low"]) / min_price_info[1]["low"]
    THRESHOLD_RATE = 0.319 * 2 if tool.is_ge_code(code) else 0.319
    if rate >= THRESHOLD_RATE:
        return True, rate
    return False, rate
# 连续涨停后是否回调不足够
def is_continue_limit_up_not_enough_fall_dwon(code, record_datas):
    # 10 天内是否有连续3板
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[0 - 10:]
    limit_up_continue_count_info = None
    max_limit_up_continue_count_info_list = []  # [连续涨停次数,涨停起点]
    for i in range(len(datas)):
        item = datas[i]
        if __is_limit_up(code, item):
            if not limit_up_continue_count_info:
                limit_up_continue_count_info = [1, i]
            else:
                limit_up_continue_count_info[0] += 1
        else:
            if limit_up_continue_count_info:
                max_limit_up_continue_count_info_list.append(limit_up_continue_count_info)
                limit_up_continue_count_info = None
    max_limit_up_info = None
    for x in max_limit_up_continue_count_info_list:
        if max_limit_up_info is None:
            max_limit_up_info = x
        if max_limit_up_info[0] <= x[0]:
            max_limit_up_info = x
    if not max_limit_up_info or max_limit_up_info[0] < 3:
        return False
    start_index = max_limit_up_info[1]
    max_price_info = [0, None]
    for i in range(start_index, len(datas)):
        item = datas[i]
        if item["high"] > max_price_info[0]:
            max_price_info = [item["high"], i]
    # 计算回踩价格
    lowest_price_threhhold = round((1 - 0.28) * max_price_info[0], 2)
    for i in range(max_price_info[1] + 1, len(datas)):
        item = datas[i]
        if item["low"] < lowest_price_threhhold:
            return False
    return True
# 是否有涨停
def get_first_limit_up_count(datas):
def get_first_limit_up_count(code, datas):
    datas = copy.deepcopy(datas)
    count = 0
    for i in range(len(datas)):
        item = datas[i]
        # 获取首板涨停次数
        if __is_limit_up(item) and i > 0 and not __is_limit_up(datas[i - 1]):
        if __is_limit_up(code, item) and i > 0 and not __is_limit_up(code, datas[i - 1]):
            # 首板涨停
            count += 1
@@ -392,10 +471,10 @@
# 是否破前高
def __is_new_top(limit_up_price, datas):
def __is_new_top(code, limit_up_price, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-80:]
    datas = datas[-60:]
    max_price = 0
    for data in datas:
        if max_price < data["high"]:
@@ -405,16 +484,16 @@
    return False, ''
def is_new_top(limit_up_price, datas):
    return __is_new_top(float(limit_up_price), datas)[0]
def is_new_top(code, limit_up_price, datas):
    return __is_new_top(code, float(limit_up_price), datas)[0]
def is_near_top(limit_up_price, datas):
    return __is_near_new_top(float(limit_up_price), datas)[0]
def is_near_top(code, limit_up_price, datas):
    return __is_near_new_top(code, float(limit_up_price), datas)[0]
# 接近新高
def __is_near_new_top(limit_up_price, datas):
def __is_near_new_top(code, limit_up_price, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-80:]
@@ -433,7 +512,7 @@
# 是否跌破箱体
def __is_lowest(datas):
def __is_lowest(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-80:]
@@ -454,7 +533,7 @@
# N字形
def __is_n_model(datas):
def __is_n_model(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-80:]
@@ -463,8 +542,7 @@
        min_price = 1000000
        for i in range(len(datas) - 5, len(datas)):
            item = datas[i]
            print(item)
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"]))
            if abs(limit_up_price - item["high"]) < 0.001 and abs(
                    limit_up_price - datas[i - 1]["high"]) >= 0.001:
                # 涨停,前一天非涨停
@@ -477,26 +555,79 @@
    return False, ''
# 最近几天是否有炸板或跌停
def __is_latest_open_limit_up_or_limit_down(datas, day_count):
def __has_latest_throwing_pressure(code, datas, day_count):
    """
    最近释放有抛压
    @param code:
    @param datas:
    @param day_count:
    @return: 是否有抛压, None/(p高价数据, t高价数据)
    """
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    items = datas[0-day_count:]
    items = datas[0 - day_count:]
    target_item = None
    for i in range(len(items) - 1, -1, -1):
        item = items[i]
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"]))
        limit_down_price = float(gpcode_manager.get_limit_down_price_by_preprice(code, item["pre_close"]))
        if abs(limit_up_price - item["high"]) < 0.001 or abs(limit_down_price - item["close"]) < 0.001:
            # 炸板 # 或涨停 # 或者跌停
            target_item = item
            break
    if not target_item:
        return False, None
    p_price, p_volume = target_item["high"], target_item["volume"]
    t_price, t_volume = 0, 0
    for i in range(len(items) - 1, -1, -1):
        item = items[i]
        if item["bob"] == target_item["bob"]:
            break
        if item["high"] >= p_price * 1.03:
            t_price, t_volume = item["high"], item["volume"]
            break
    if t_price > 0:
        return True, ((p_price, p_volume), (t_price, t_volume))
    else:
        return True, ((p_price, p_volume), None)
def __is_latest_limit_down(code, datas, day_count):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    items = datas[0 - day_count:]
    for item in items:
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
        if abs(limit_up_price - item["high"]) < 0.001 and abs(limit_up_price - item["close"]) > 0.001:
            # 炸板
            return True
        # 是否有跌停
        limit_down_price = float(gpcode_manager.get_limit_down_price_by_preprice(item["pre_close"]))
        limit_down_price = float(gpcode_manager.get_limit_down_price_by_preprice(code, item["pre_close"]))
        if abs(limit_down_price - item["close"]) < 0.001:
            # 跌停
            return True
    return False
def __is_pre_day_limit_rate_too_low(code, datas):
    """
    上个交易日是否跌幅过大
    @param code:
    @param datas:
    @return:
    """
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    items = datas[-1:]
    for item in items:
        # 是否有跌停
        # 获取当日涨幅
        rate_open = (item["open"] - item["pre_close"]) / item["pre_close"]
        rate_close = (item["close"] - item["pre_close"]) / item["pre_close"]
        threshold_rate_ = 0.15
        if abs(rate_open - rate_close) >= threshold_rate_:
            return True
    return False
# V字形
def __is_v_model(datas):
def __is_v_model(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-30:]
@@ -507,11 +638,9 @@
            max_price = datas[i]["close"]
            max_price_index = i
    min_price = max_price
    min_price_index = max_price_index
    for i in range(max_price_index, len(datas)):
        if min_price > datas[i]["close"]:
            min_price = datas[i]["close"]
            min_price_index = i
    if (max_price - min_price) / max_price > 0.249:
        return True, ''
@@ -520,7 +649,7 @@
# 是否天量大阳
def __get_big_volumn_info(datas):
def __get_big_volumn_info(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-30:]
@@ -535,40 +664,55 @@
# 是否涨停
def __is_limit_up(data):
    limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
    return abs(limit_up_price - data["close"]) < 0.001
def __is_limit_up(code, data):
    limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"]))
    return abs(limit_up_price - data["close"]) < 0.009
# 是否涨停过
def __is_limited_up(data):
    limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
    return abs(limit_up_price - data["high"]) < 0.001
def __is_limited_up(code, data):
    limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"]))
    return abs(limit_up_price - data["high"]) < 0.009
# 多少天内是否有涨停/曾涨停
def __has_limit_up(datas, day_count):
def __has_limit_up(code, datas, day_count):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[0 - day_count:]
    if len(datas) > day_count:
        datas = datas[0 - day_count:]
    if len(datas) >= 1:
        for i in range(0, len(datas)):
            item = datas[i]
            if __is_limit_up(item):
            if __is_limit_up(code, item):
                return True
    return False
# 多少天内是否曾涨停
def __has_limited_up(code, datas, day_count):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    if len(datas) > day_count:
        datas = datas[0 - day_count:]
    if len(datas) >= 1:
        for i in range(0, len(datas)):
            item = datas[i]
            if __is_limited_up(code, item):
                return True
    return False
# 首板涨停溢价率
def get_limit_up_premium_rate(datas):
def get_limit_up_premium_rate(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    first_rate_list = []
    for i in range(0, len(datas)):
        item = datas[i]
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"]))
        if abs(limit_up_price - item["close"]) < 0.001:
            if 0 < i < len(datas) - 1 and not __is_limit_up(datas[i - 1]):
            if 0 < i < len(datas) - 1 and not __is_limit_up(code, datas[i - 1]):
                # 首板涨停
                rate = (datas[i + 1]["high"] - datas[i + 1]["pre_close"]) / datas[i + 1]["pre_close"]
                first_rate_list.append(rate)
@@ -582,18 +726,16 @@
# 首板炸板溢价率
def get_open_limit_up_premium_rate(datas):
def get_open_limit_up_premium_rate(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    first_rate_list = []
    for i in range(0, len(datas)):
        item = datas[i]
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"]))
        if abs(limit_up_price - item["high"]) < 0.001 and abs(limit_up_price - item["close"]) > 0.001:
            #
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(datas[i - 1]["pre_close"]))
            if 0 < i < len(datas) - 1 and not __is_limit_up(datas[i - 1]):
            if 0 < i < len(datas) - 1 and not __is_limit_up(code, datas[i - 1]):
                # 前一天未涨停
                rate = (datas[i + 1]["high"] - item["high"]) / item["high"]
                first_rate_list.append(rate)
@@ -607,26 +749,43 @@
# 是否具有辨识度
def is_special(datas):
    # 30个交易日内有≥5天曾涨停且连续涨停数或曾涨停≥2天
    if len(datas) > 30:
        datas = datas[-30:]
def is_special(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas_30 = datas[-30:]
    datas_90 = datas[-90:]
    count = 0
    # 30个交易日内累计涨停次数≥4次
    continue_count = 0
    has_continue = False
    for item in datas_30:
        if __is_limit_up(code, item):
            continue_count += 1
            count += 1
            if continue_count >= 4:
                has_continue = True
        else:
            continue_count = 0
    if count >= 5 and has_continue:
        return True, "短期辨识度"
    count = 0
    continue_count = 0
    last_index = -1
    for i in range(len(datas)):
        if __is_limited_up(datas[i]):
            if last_index >= 0 and i - last_index == 1:
                continue_count += 1
    has_continue = False
    # 90个交易日内涨停次数≥6次
    for item in datas_90:
        if __is_limit_up(code, item):
            continue_count += 1
            count += 1
            last_index = i
    if count >= 5 and continue_count > 0:
        return True, ''
    return False, ''
            if continue_count >= 4:
                has_continue = True
        else:
            continue_count = 0
    if count >= 6 and has_continue:
        return True, "长期辨识度"
    return False, ""
if __name__ == "__main__":
    HighIncreaseCodeManager().add_code("000333")
    print(HighIncreaseCodeManager().is_in("000333"))
    print(HighIncreaseCodeManager().is_in("000222"))
    code = "000333"
    threshold_rate = 0 - ((1 - tool.get_limit_down_rate(code)) * 0.9)
    print(threshold_rate)