Administrator
2023-12-21 81f328532e366eef171b71810b221a9294dda78f
code_attribute/code_nature_analyse.py
@@ -5,56 +5,200 @@
# 是否有涨停
import copy
import json
import random
import time
from code_attribute import gpcode_manager
# 代码股性记录管理
from db import redis_manager
from utils import tool
from db.redis_manager import RedisManager
from db.redis_manager_delegate import RedisManager, RedisUtils
from utils.tool import CodeDataCacheUtil
class CodeNatureRecordManager:
    __redisManager = RedisManager(0)
    __k_format_cache = {}
    __nature_cache = {}
    __db = 0
    __instance = None
    __redis_manager = redis_manager.RedisManager(0)
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodeNatureRecordManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
        return cls.__redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "k_format-*")
            for k in keys:
                code = k.split("-")[1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                cls.__k_format_cache[code] = val
            keys = RedisUtils.keys(__redis, "code_nature-*")
            for k in keys:
                code = k.split("-")[1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                cls.__nature_cache[code] = val
        except:
            pass
        finally:
            RedisUtils.realse(__redis)
    # 保存K线形态
    @classmethod
    def save_k_format(cls, code, k_format):
        cls.__get_redis().setex(f"k_format-{code}", tool.get_expire(), json.dumps(k_format))
    def save_k_format(self, code, k_format):
        self.__k_format_cache[code] = k_format
        RedisUtils.setex_async(self.__db, f"k_format-{code}", tool.get_expire(), json.dumps(k_format))
    @classmethod
    def get_k_format(cls, code):
        val = cls.__get_redis().get(f"k_format-{code}")
    def get_k_format(self, code):
        val = RedisUtils.get(self.__get_redis(), f"k_format-{code}")
        if val:
            return json.loads(val)
        return None
    def get_k_format_cache(self, code):
        val = self.__k_format_cache.get(code)
        # 复制
        return copy.deepcopy(val) if val else None
    def clear(self):
        self.__k_format_cache.clear()
        self.__nature_cache.clear()
        keys = RedisUtils.keys(self.__get_redis(), "k_format-*")
        for k in keys:
            RedisUtils.delete(self.__get_redis(), k)
        keys = RedisUtils.keys(self.__get_redis(), "code_nature-*")
        for k in keys:
            RedisUtils.delete(self.__get_redis(), k)
    # 保存股性
    @classmethod
    def save_nature(cls, code, natures):
        cls.__get_redis().setex(f"code_nature-{code}", tool.get_expire(), json.dumps(natures))
    @classmethod
    def get_nature(cls, code):
        val = cls.__get_redis().get(f"code_nature-{code}")
    def save_nature(self, code, natures):
        RedisUtils.setex_async(self.__db, f"code_nature-{code}", tool.get_expire(), json.dumps(natures))
    def get_nature(self, code):
        val = RedisUtils.get(self.__get_redis(), f"code_nature-{code}")
        if val:
            return json.loads(val)
        return None
    def get_nature_cache(self, code):
        return self.__nature_cache.get(code)
class LatestMaxVolumeManager:
    __db = 0
    __instance = None
    __redis_manager = redis_manager.RedisManager(0)
    __max_volume_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(LatestMaxVolumeManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "is_latest_max_volume-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__max_volume_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def __save_has_latest_max_volume(self, code):
        RedisUtils.setex_async(self.__db, f"is_latest_max_volume-{code}", tool.get_expire(), "1")
    # 设置最近有最大量
    def set_has_latest_max_volume(self, code):
        CodeDataCacheUtil.set_cache(self.__max_volume_cache, code, 1)
        self.__save_has_latest_max_volume(code)
    # 最近是否有最大量
    def is_latest_max_volume(self, code):
        return code in self.__max_volume_cache
    def clear(self):
        self.__max_volume_cache.clear()
        keys = RedisUtils.keys(self.__get_redis(), "is_latest_max_volume-*")
        for k in keys:
            RedisUtils.delete_async(self.__db, k)
# 涨幅过高的票管理
class HighIncreaseCodeManager:
    __db = 0
    __instance = None
    __redis_manager = redis_manager.RedisManager(0)
    __high_increase_codes = set()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(HighIncreaseCodeManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            cls.__high_increase_codes = RedisUtils.smembers(__redis, "high_increase_codes")
        finally:
            RedisUtils.realse(__redis)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def add_code(self, code):
        if self.__high_increase_codes is None:
            self.__high_increase_codes = set()
        self.__high_increase_codes.add(code)
        RedisUtils.sadd_async(self.__db, "high_increase_codes", code)
        RedisUtils.expire_async(self.__db, "high_increase_codes", tool.get_expire())
    def is_in(self, code):
        return code in self.__high_increase_codes
    # 加载全部
    def list_all(self):
        return self.__high_increase_codes
    def clear(self):
        if self.__high_increase_codes:
            self.__high_increase_codes.clear()
        RedisUtils.delete_async(self.__db, "high_increase_codes")
# 设置历史K线
def set_record_datas(code, limit_up_price, record_datas):
    k_format = get_k_format(float(limit_up_price), record_datas)
    CodeNatureRecordManager.save_k_format(code, k_format)
    CodeNatureRecordManager().save_k_format(code, k_format)
    natures = get_nature(record_datas)
    CodeNatureRecordManager.save_nature(code, natures)
    CodeNatureRecordManager().save_nature(code, natures)
# 获取K线形态
# 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度)
# 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度,近2天有10天内最大量,上个交易日是否炸板)
def get_k_format(limit_up_price, record_datas):
    p1_data = get_lowest_price_rate(record_datas)
    p1 = p1_data[0] >= 0.249, p1_data[1]
@@ -73,13 +217,19 @@
    # 是否具有辨识度
    p9 = is_special(record_datas)
    p10 = is_latest_10d_max_volume_at_latest_2d(record_datas)
    # 最近5天是否跌停/炸板
    p11 = __is_latest_open_limit_up_or_limit_down(record_datas, 5)
    # 30天内是否有涨停
    p12 = __has_limit_up(record_datas, 30)
    return p1, p2, p3, p4, p5, p6, p7, p8, p9
    return p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12
# 是否具有K线形态
def is_has_k_format(limit_up_price, record_datas):
    is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info,is_special = get_k_format(float(limit_up_price),record_datas)
    is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info, is_special, has_max_volume, open_limit_up, is_limit_up_in_30days = get_k_format(
        float(limit_up_price), record_datas)
    if not has_format:
        return False, "不满足K线形态"
    return True, "有形态"
@@ -106,6 +256,125 @@
            date = data['bob'].strftime("%Y-%m-%d")
            return round((datas[-1]["close"] - data["close"]) / data["close"], 4), date
    return 0, ''
# 是否涨得太高
def is_up_too_high_in_10d_with_limit_up(record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-10:]
    limit_ups = []
    limit_up_count = 0
    max_price = datas[0]["high"]
    for data in datas:
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
        date = data['bob'].strftime("%Y-%m-%d")
        if data["high"] > max_price:
            max_price = data["high"]
        if abs(limit_up_price - data["high"]) < 0.01:
            limit_ups.append((date, True))
            limit_up_count += 1
        else:
            limit_ups.append((date, False))
    # 下降幅度
    limit_down_rate = round((max_price - datas[-1]["close"]) / max_price, 3)
    if limit_up_count < 3:
        return False
    # 连续5天有3天涨停
    for i in range(len(limit_ups)):
        if i + 5 > len(limit_ups):
            break
        temp_datas = limit_ups[i:i + 5]
        t_count = 0
        for t in temp_datas:
            if t[1]:
                t_count += 1
        if t_count >= 3 and limit_down_rate < 0.15:
            # 降幅小于20%
            return True
    return False
# 10天内的最高量是否集中在最近两天
def is_latest_10d_max_volume_at_latest_2d(record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-10:]
    max_volume_info = None
    for i in range(0, len(datas)):
        if not max_volume_info:
            max_volume_info = (i, datas[i]["volume"])
        else:
            if max_volume_info[1] < datas[i]["volume"]:
                max_volume_info = (i, datas[i]["volume"])
    return len(datas) - max_volume_info[0] <= 2
# 120 天内是否长得太高
def is_up_too_high_in_120d(record_datas):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-120:]
    today_limit_up_price = round(float(gpcode_manager.get_limit_up_price_by_preprice(datas[-1]["close"])), 2)
    max_price = 0
    for data in datas:
        if data["high"] > max_price:
            max_price = data["high"]
    if today_limit_up_price <= max_price:
        return False
    # 计算120天的均价
    total_amount = 0
    total_volume = 0
    for data in datas:
        total_amount += data["amount"]
        total_volume += data["volume"]
    average_price = round(total_amount / total_volume, 2)
    if (today_limit_up_price - average_price) / average_price > 0.3:
        return True
    else:
        return False
# 最近几天是否有最大量
def is_have_latest_max_volume(record_datas, day_count):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-120:]
    max_volume = (0, datas[0]["volume"])
    for i in range(0, len(datas)):
        if max_volume[1] < datas[i]["volume"]:
            max_volume = (i, datas[i]["volume"])
    if len(datas) - max_volume[0] <= day_count:
        return True
    return False
# 在最近几天内股价是否长得太高
def is_price_too_high_in_days(record_datas, limit_up_price, day_count=5):
    datas = copy.deepcopy(record_datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[0 - day_count:]
    min_price = None
    max_price = None
    for d in datas:
        if min_price is None:
            min_price = d["low"]
        if max_price is None:
            max_price = d["high"]
        if min_price > d["low"]:
            min_price = d["low"]
        if max_price < d["high"]:
            max_price = d["high"]
    # if max_price > float(limit_up_price):
    #     return False
    rate = (float(limit_up_price) - min_price) / min_price
    # print(rate)
    if rate >= 0.28:
        return True
    return False
# 是否有涨停
@@ -140,38 +409,26 @@
    return __is_new_top(float(limit_up_price), datas)[0]
def is_near_top(limit_up_price, datas):
    return __is_near_new_top(float(limit_up_price), datas)[0]
# 接近新高
def __is_near_new_top(limit_up_price, datas):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-80:]
    max_volume = 0
    max_volume_index = 0
    max_price = 0
    max_price_index = 0
    for index in range(0, len(datas)):
        data = datas[index]
        if max_volume < data["volume"]:
            max_volume = data["volume"]
            max_volume_index = index
    price = 0
    price_index = 0
    for index in range(max_volume_index, len(datas)):
        data = datas[index]
        if data["high"] > price:
            price = data["high"]
            price_index = index
    index = price_index
    # 最大量当日最高价比当日之后的最高价涨幅在15%以内
    if (price - datas[max_volume_index]["high"]) / datas[max_volume_index]["high"] < 0.15:
        price = datas[max_volume_index]["high"]
        index = max_volume_index
    print(max_volume)
    rate = (price - limit_up_price) / limit_up_price
    if 0 < rate < 0.03:
        return True, datas[index]['bob'].strftime("%Y-%m-%d")
        if data["high"] > max_price:
            max_price = data["high"]
            max_price_index = index
    rate = (max_price - float(limit_up_price)) / max_price
    if 0 < rate < 0.02:
        return True, datas[max_price_index]['bob'].strftime("%Y-%m-%d")
    return False, ''
@@ -218,6 +475,24 @@
        if max_price > min_price:
            return True, ''
    return False, ''
# 最近几天是否有炸板或跌停
def __is_latest_open_limit_up_or_limit_down(datas, day_count):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    items = datas[0-day_count]
    for item in items:
        limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
        if abs(limit_up_price - item["high"]) < 0.001 and abs(limit_up_price - item["close"]) > 0.001:
            # 炸板
            return True
        # 是否有跌停
        limit_down_price = float(gpcode_manager.get_limit_down_price_by_preprice(item["pre_close"]))
        if abs(limit_down_price - item["close"]) < 0.001:
            # 跌停
            return True
    return False
# V字形
@@ -269,6 +544,19 @@
def __is_limited_up(data):
    limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"]))
    return abs(limit_up_price - data["high"]) < 0.001
# 多少天内是否有涨停/曾涨停
def __has_limit_up(datas, day_count):
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[0 - day_count:]
    if len(datas) >= 1:
        for i in range(0, len(datas)):
            item = datas[i]
            if __is_limit_up(item):
                return True
    return False
# 首板涨停溢价率
@@ -339,4 +627,6 @@
if __name__ == "__main__":
    print(CodeNatureRecordManager.get_k_format("603717"))
    HighIncreaseCodeManager().add_code("000333")
    print(HighIncreaseCodeManager().is_in("000333"))
    print(HighIncreaseCodeManager().is_in("000222"))