Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
code_attribute/code_nature_analyse.py
@@ -5,15 +5,13 @@
# 是否有涨停
import copy
import json
import random
import time
from code_attribute import gpcode_manager
# 代码股性记录管理
from db import redis_manager
from db import redis_manager_delegate as redis_manager
from utils import tool
from db.redis_manager_delegate import RedisManager, RedisUtils
from db.redis_manager_delegate import RedisUtils
from utils.tool import CodeDataCacheUtil
@@ -191,23 +189,23 @@
# 设置历史K线
def set_record_datas(code, limit_up_price, record_datas):
    k_format = get_k_format(float(limit_up_price), record_datas)
    k_format = get_k_format(code, float(limit_up_price), record_datas)
    CodeNatureRecordManager().save_k_format(code, k_format)
    natures = get_nature(record_datas)
    natures = get_nature(code, record_datas)
    CodeNatureRecordManager().save_nature(code, natures)
# 获取K线形态
# 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度,近2天有10天内最大量,上个交易日是否炸板)
def get_k_format(limit_up_price, record_datas):
    p1_data = get_lowest_price_rate(record_datas)
# 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度,近2天有10天内最大量,上个交易日是否炸板, 上个交易日是否跌停)
def get_k_format(code, limit_up_price, record_datas):
    p1_data = get_lowest_price_rate(code, record_datas)
    p1 = p1_data[0] >= 0.249, p1_data[1]
    p2 = __is_new_top(limit_up_price, record_datas)
    p3 = __is_lowest(record_datas)
    p4 = __is_near_new_top(limit_up_price, record_datas)
    p5 = __is_n_model(record_datas)
    p6 = __is_v_model(record_datas)
    p8 = __get_big_volumn_info(record_datas)
    p2 = __is_new_top(code, limit_up_price, record_datas)
    p3 = __is_lowest(code, record_datas)
    p4 = __is_near_new_top(code, limit_up_price, record_datas)
    p5 = __is_n_model(code, record_datas)
    p6 = __is_v_model(code, record_datas)
    p8 = __get_big_volumn_info(code, record_datas)
    # # N字型包含了N字型
    # if p5:
@@ -216,21 +214,28 @@
    p7 = (p1[0] or p2[0] or p3[0] or p4[0] or p5[0] or p6[0], '')
    # 是否具有辨识度
    p9 = is_special(record_datas)
    p10 = is_latest_10d_max_volume_at_latest_2d(record_datas)
    # 最近5天是否炸板
    p11 = __is_latest_open_limit_up(record_datas, 5)
    # 30天内是否有涨停
    p12 = __has_limit_up(record_datas, 30)
    # 最近5天是否跌停
    p13 = __is_latest_limit_down(record_datas, 5)
    p9 = is_special(code, record_datas)
    p10 = is_latest_10d_max_volume_at_latest_2d(code, record_datas)
    # 最近5天是否有炸板/涨停/跌停
    p11 = __has_latest_throwing_pressure(code, record_datas, 5)
    # 90天内是否有涨停
    p12 = __has_limit_up(code, record_datas, 90)
    # 上个交易日是否振幅过大
    p13 = __is_pre_day_limit_rate_too_low(code, record_datas)
    # 60个交易日是否曾涨停
    p14 = __has_limited_up(code, record_datas, 60)
    # 昨日是否涨停过
    p15 = __has_limited_up(code, record_datas, 1)
    # 昨日是否跌停
    p16 = __is_latest_limit_down(code, record_datas, 1)
    return p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13
    return p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16
# 是否具有K线形态
def is_has_k_format(limit_up_price, record_datas):
def is_has_k_format(code, limit_up_price, record_datas):
    is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info, is_special, has_max_volume, open_limit_up, is_limit_up_in_30days, is_latest_limit_down = get_k_format(
        code,
        float(limit_up_price), record_datas)
    if not has_format:
        return False, "不满足K线形态"
@@ -239,29 +244,29 @@
# 获取股性
# 返回(是否涨停,首板溢价率,首板炸板溢价率)
def get_nature(record_datas):
    limit_up_count = get_first_limit_up_count(record_datas)
    premium_rate = get_limit_up_premium_rate(record_datas)
    open_premium_rate = get_open_limit_up_premium_rate(record_datas)
def get_nature(code, record_datas):
    limit_up_count = get_first_limit_up_count(code, record_datas)
    premium_rate = get_limit_up_premium_rate(code, record_datas)
    open_premium_rate = get_open_limit_up_premium_rate(code, record_datas)
    result = (limit_up_count, premium_rate, open_premium_rate)
    return result
# 获取涨幅
def get_lowest_price_rate(record_datas):
def get_lowest_price_rate(code, record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-10:]
    for data in datas:
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
        if abs(limit_up_price - data["high"]) < 0.01:
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"]))
        if abs(limit_up_price - data["high"]) < 0.001:
            date = data['bob'].strftime("%Y-%m-%d")
            return round((datas[-1]["close"] - data["close"]) / data["close"], 4), date
    return 0, ''
# 是否涨得太高
def is_up_too_high_in_10d_with_limit_up(record_datas):
def is_up_too_high_in_10d_with_limit_up(code, record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-10:]
@@ -269,11 +274,11 @@
    limit_up_count = 0
    max_price = datas[0]["high"]
    for data in datas:
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"]))
        date = data['bob'].strftime("%Y-%m-%d")
        if data["high"] > max_price:
            max_price = data["high"]
        if abs(limit_up_price - data["high"]) < 0.01:
        if abs(limit_up_price - data["high"]) < 0.001:
            limit_ups.append((date, True))
            limit_up_count += 1
        else:
@@ -300,7 +305,7 @@
# 10天内的最高量是否集中在最近两天
def is_latest_10d_max_volume_at_latest_2d(record_datas):
def is_latest_10d_max_volume_at_latest_2d(code, record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-10:]
@@ -315,11 +320,12 @@
# 120 天内是否长得太高
def is_up_too_high_in_120d(record_datas):
def is_up_too_high_in_120d(code, record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-120:]
    today_limit_up_price = round(float(gpcode_manager.get_limit_up_price_by_preprice(datas[-1]["close"])), 2)
    today_limit_up_price = round(
        float(gpcode_manager.get_limit_up_price_by_preprice(code, datas[-1]["close"])), 2)
    max_price = 0
    for data in datas:
        if data["high"] > max_price:
@@ -342,19 +348,20 @@
# 暂时不使用
# 从最近一次涨停开始,是否涨幅过高
def is_up_too_high_from_latest_limit_up(record_datas):
def is_up_too_high_from_latest_limit_up(code, record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-20:]
    datas.reverse()
    today_limit_up_price = round(float(gpcode_manager.get_limit_up_price_by_preprice(datas[0]["close"])), 2)
    today_limit_up_price = round(
        float(gpcode_manager.get_limit_up_price_by_preprice(code, datas[0]["close"])), 2)
    max_price = 0
    limit_up_price = None
    for i in range(0, len(datas)):
        item = datas[i]
        if item['high'] > max_price:
            max_price = item['high']
        if __is_limited_up(item):
        if __is_limited_up(code, item):
            limit_up_price = item['high']
            break
    if not limit_up_price:
@@ -367,7 +374,7 @@
# 最近几天是否有最大量
def is_have_latest_max_volume(record_datas, day_count):
def is_have_latest_max_volume(code, record_datas, day_count):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-120:]
@@ -381,32 +388,32 @@
# 在最近几天内股价是否长得太高
def is_price_too_high_in_days(record_datas, limit_up_price, day_count=5):
def is_price_too_high_in_days(code, record_datas, limit_up_price, day_count=5):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[0 - day_count:]
    min_price = None
    max_price = None
    min_price_info = None
    max_price_info = None
    for d in datas:
        if min_price is None:
            min_price = d["low"]
        if max_price is None:
            max_price = d["high"]
        if min_price > d["low"]:
            min_price = d["low"]
        if max_price < d["high"]:
            max_price = d["high"]
        if min_price_info is None:
            min_price_info = d["low"], d
        if max_price_info is None:
            max_price_info = d["high"], d
        if min_price_info[0] > d["low"]:
            min_price_info = d["low"], d
        if max_price_info[0] < d["high"]:
            max_price_info = d["high"], d
    # if max_price > float(limit_up_price):
    #     return False
    rate = (float(limit_up_price) - min_price) / min_price
    # print(rate)
    if rate >= 0.319:
        return True
    return False
    rate = (max_price_info[1]["high"] - min_price_info[1]["low"]) / min_price_info[1]["low"]
    THRESHOLD_RATE = 0.319 * 2 if tool.is_ge_code(code) else 0.319
    if rate >= THRESHOLD_RATE:
        return True, rate
    return False, rate
# 连续涨停后是否回调不足够
def is_continue_limit_up_not_enough_fall_dwon(record_datas):
def is_continue_limit_up_not_enough_fall_dwon(code, record_datas):
    # 10 天内是否有连续3板
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
@@ -416,7 +423,7 @@
    for i in range(len(datas)):
        item = datas[i]
        if __is_limit_up(item):
        if __is_limit_up(code, item):
            if not limit_up_continue_count_info:
                limit_up_continue_count_info = [1, i]
            else:
@@ -429,11 +436,10 @@
    for x in max_limit_up_continue_count_info_list:
        if max_limit_up_info is None:
            max_limit_up_info = x
        if max_limit_up_info[0] <=  x[0]:
        if max_limit_up_info[0] <= x[0]:
            max_limit_up_info = x
    if not max_limit_up_info or max_limit_up_info[0] < 3:
        print("无3连板")
        return False
    start_index = max_limit_up_info[1]
    max_price_info = [0, None]
@@ -442,23 +448,22 @@
        if item["high"] > max_price_info[0]:
            max_price_info = [item["high"], i]
    # 计算回踩价格
    lowest_price_threhhold = round((1-0.28) * max_price_info[0], 2)
    lowest_price_threhhold = round((1 - 0.28) * max_price_info[0], 2)
    for i in range(max_price_info[1] + 1, len(datas)):
        item = datas[i]
        if item["low"] < lowest_price_threhhold:
            print("回踩足够")
            return False
    return True
# 是否有涨停
def get_first_limit_up_count(datas):
def get_first_limit_up_count(code, datas):
    datas = copy.deepcopy(datas)
    count = 0
    for i in range(len(datas)):
        item = datas[i]
        # 获取首板涨停次数
        if __is_limit_up(item) and i > 0 and not __is_limit_up(datas[i - 1]):
        if __is_limit_up(code, item) and i > 0 and not __is_limit_up(code, datas[i - 1]):
            # 首板涨停
            count += 1
@@ -466,10 +471,10 @@
# 是否破前高
def __is_new_top(limit_up_price, datas):
def __is_new_top(code, limit_up_price, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-80:]
    datas = datas[-60:]
    max_price = 0
    for data in datas:
        if max_price < data["high"]:
@@ -479,16 +484,16 @@
    return False, ''
def is_new_top(limit_up_price, datas):
    return __is_new_top(float(limit_up_price), datas)[0]
def is_new_top(code, limit_up_price, datas):
    return __is_new_top(code, float(limit_up_price), datas)[0]
def is_near_top(limit_up_price, datas):
    return __is_near_new_top(float(limit_up_price), datas)[0]
def is_near_top(code, limit_up_price, datas):
    return __is_near_new_top(code, float(limit_up_price), datas)[0]
# 接近新高
def __is_near_new_top(limit_up_price, datas):
def __is_near_new_top(code, limit_up_price, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-80:]
@@ -507,7 +512,7 @@
# 是否跌破箱体
def __is_lowest(datas):
def __is_lowest(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-80:]
@@ -528,7 +533,7 @@
# N字形
def __is_n_model(datas):
def __is_n_model(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-80:]
@@ -537,8 +542,7 @@
        min_price = 1000000
        for i in range(len(datas) - 5, len(datas)):
            item = datas[i]
            print(item)
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"]))
            if abs(limit_up_price - item["high"]) < 0.001 and abs(
                    limit_up_price - datas[i - 1]["high"]) >= 0.001:
                # 涨停,前一天非涨停
@@ -551,34 +555,79 @@
    return False, ''
# 最近几天是否有炸板或跌停
def __is_latest_open_limit_up(datas, day_count):
def __has_latest_throwing_pressure(code, datas, day_count):
    """
    最近释放有抛压
    @param code:
    @param datas:
    @param day_count:
    @return: 是否有抛压, None/(p高价数据, t高价数据)
    """
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    items = datas[0 - day_count:]
    for item in items:
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
        if abs(limit_up_price - item["high"]) < 0.001 and abs(limit_up_price - item["close"]) > 0.001:
            # 炸板
            return True
    return False
    target_item = None
    for i in range(len(items) - 1, -1, -1):
        item = items[i]
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"]))
        limit_down_price = float(gpcode_manager.get_limit_down_price_by_preprice(code, item["pre_close"]))
        if abs(limit_up_price - item["high"]) < 0.001 or abs(limit_down_price - item["close"]) < 0.001:
            # 炸板 # 或涨停 # 或者跌停
            target_item = item
            break
    if not target_item:
        return False, None
    p_price, p_volume = target_item["high"], target_item["volume"]
    t_price, t_volume = 0, 0
    for i in range(len(items) - 1, -1, -1):
        item = items[i]
        if item["bob"] == target_item["bob"]:
            break
        if item["high"] >= p_price * 1.03:
            t_price, t_volume = item["high"], item["volume"]
            break
    if t_price > 0:
        return True, ((p_price, p_volume), (t_price, t_volume))
    else:
        return True, ((p_price, p_volume), None)
def __is_latest_limit_down(datas, day_count):
def __is_latest_limit_down(code, datas, day_count):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    items = datas[0 - day_count:]
    for item in items:
        # 是否有跌停
        limit_down_price = float(gpcode_manager.get_limit_down_price_by_preprice(item["pre_close"]))
        limit_down_price = float(gpcode_manager.get_limit_down_price_by_preprice(code, item["pre_close"]))
        if abs(limit_down_price - item["close"]) < 0.001:
            # 跌停
            return True
    return False
def __is_pre_day_limit_rate_too_low(code, datas):
    """
    上个交易日是否跌幅过大
    @param code:
    @param datas:
    @return:
    """
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    items = datas[-1:]
    for item in items:
        # 是否有跌停
        # 获取当日涨幅
        rate_open = (item["open"] - item["pre_close"]) / item["pre_close"]
        rate_close = (item["close"] - item["pre_close"]) / item["pre_close"]
        threshold_rate_ = 0.15
        if abs(rate_open - rate_close) >= threshold_rate_:
            return True
    return False
# V字形
def __is_v_model(datas):
def __is_v_model(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-30:]
@@ -589,11 +638,9 @@
            max_price = datas[i]["close"]
            max_price_index = i
    min_price = max_price
    min_price_index = max_price_index
    for i in range(max_price_index, len(datas)):
        if min_price > datas[i]["close"]:
            min_price = datas[i]["close"]
            min_price_index = i
    if (max_price - min_price) / max_price > 0.249:
        return True, ''
@@ -602,7 +649,7 @@
# 是否天量大阳
def __get_big_volumn_info(datas):
def __get_big_volumn_info(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-30:]
@@ -617,40 +664,55 @@
# 是否涨停
def __is_limit_up(data):
    limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
    return abs(limit_up_price - data["close"]) < 0.001
def __is_limit_up(code, data):
    limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"]))
    return abs(limit_up_price - data["close"]) < 0.009
# 是否涨停过
def __is_limited_up(data):
    limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
    return abs(limit_up_price - data["high"]) < 0.001
def __is_limited_up(code, data):
    limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"]))
    return abs(limit_up_price - data["high"]) < 0.009
# 多少天内是否有涨停/曾涨停
def __has_limit_up(datas, day_count):
def __has_limit_up(code, datas, day_count):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[0 - day_count:]
    if len(datas) > day_count:
        datas = datas[0 - day_count:]
    if len(datas) >= 1:
        for i in range(0, len(datas)):
            item = datas[i]
            if __is_limit_up(item):
            if __is_limit_up(code, item):
                return True
    return False
# 多少天内是否曾涨停
def __has_limited_up(code, datas, day_count):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    if len(datas) > day_count:
        datas = datas[0 - day_count:]
    if len(datas) >= 1:
        for i in range(0, len(datas)):
            item = datas[i]
            if __is_limited_up(code, item):
                return True
    return False
# 首板涨停溢价率
def get_limit_up_premium_rate(datas):
def get_limit_up_premium_rate(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    first_rate_list = []
    for i in range(0, len(datas)):
        item = datas[i]
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"]))
        if abs(limit_up_price - item["close"]) < 0.001:
            if 0 < i < len(datas) - 1 and not __is_limit_up(datas[i - 1]):
            if 0 < i < len(datas) - 1 and not __is_limit_up(code, datas[i - 1]):
                # 首板涨停
                rate = (datas[i + 1]["high"] - datas[i + 1]["pre_close"]) / datas[i + 1]["pre_close"]
                first_rate_list.append(rate)
@@ -664,18 +726,16 @@
# 首板炸板溢价率
def get_open_limit_up_premium_rate(datas):
def get_open_limit_up_premium_rate(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    first_rate_list = []
    for i in range(0, len(datas)):
        item = datas[i]
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"]))
        if abs(limit_up_price - item["high"]) < 0.001 and abs(limit_up_price - item["close"]) > 0.001:
            #
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(datas[i - 1]["pre_close"]))
            if 0 < i < len(datas) - 1 and not __is_limit_up(datas[i - 1]):
            if 0 < i < len(datas) - 1 and not __is_limit_up(code, datas[i - 1]):
                # 前一天未涨停
                rate = (datas[i + 1]["high"] - item["high"]) / item["high"]
                first_rate_list.append(rate)
@@ -689,7 +749,7 @@
# 是否具有辨识度
def is_special(datas):
def is_special(code, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas_30 = datas[-30:]
@@ -699,7 +759,7 @@
    continue_count = 0
    has_continue = False
    for item in datas_30:
        if __is_limit_up(item):
        if __is_limit_up(code, item):
            continue_count += 1
            count += 1
            if continue_count >= 4:
@@ -713,7 +773,7 @@
    has_continue = False
    # 90个交易日内涨停次数≥6次
    for item in datas_90:
        if __is_limit_up(item):
        if __is_limit_up(code, item):
            continue_count += 1
            count += 1
            if continue_count >= 4:
@@ -726,6 +786,6 @@
if __name__ == "__main__":
    HighIncreaseCodeManager().add_code("000333")
    print(HighIncreaseCodeManager().is_in("000333"))
    print(HighIncreaseCodeManager().is_in("000222"))
    code = "000333"
    threshold_rate = 0 - ((1 - tool.get_limit_down_rate(code)) * 0.9)
    print(threshold_rate)