Administrator
2023-11-26 0deaa8930569222d8f4787d97a70a0a89496b44a
code_attribute/code_nature_analyse.py
@@ -18,58 +18,83 @@
class CodeNatureRecordManager:
    __redisManager = RedisManager(0)
    __k_format_cache = {}
    __nature_cache = {}
    __db = 0
    __instance = None
    __redis_manager = redis_manager.RedisManager(0)
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodeNatureRecordManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
        return cls.__redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "k_format-*")
            for k in keys:
                code = k.split("-")[1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                cls.__k_format_cache[code] = val
            keys = RedisUtils.keys(__redis, "code_nature-*")
            for k in keys:
                code = k.split("-")[1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                cls.__nature_cache[code] = val
        except:
            pass
        finally:
            RedisUtils.realse(__redis)
    # 保存K线形态
    @classmethod
    def save_k_format(cls, code, k_format):
        RedisUtils.setex(cls.__get_redis(), f"k_format-{code}", tool.get_expire(), json.dumps(k_format))
    def save_k_format(self, code, k_format):
        self.__k_format_cache[code] = k_format
        RedisUtils.setex_async(self.__db, f"k_format-{code}", tool.get_expire(), json.dumps(k_format))
    @classmethod
    def get_k_format(cls, code):
        val = RedisUtils.get(cls.__get_redis(), f"k_format-{code}")
    def get_k_format(self, code):
        val = RedisUtils.get(self.__get_redis(), f"k_format-{code}")
        if val:
            return json.loads(val)
        return None
    @classmethod
    def get_k_format_cache(cls, code):
        val = None
        if code in cls.__k_format_cache:
            val = cls.__k_format_cache[code]
        if not val:
            val = cls.get_k_format(code)
            if val:
                cls.__k_format_cache[code] = val
    def get_k_format_cache(self, code):
        val = self.__k_format_cache.get(code)
        # 复制
        return copy.deepcopy(val) if val else None
    # 保存股性
    @classmethod
    def save_nature(cls, code, natures):
        RedisUtils.setex(cls.__get_redis(), f"code_nature-{code}", tool.get_expire(), json.dumps(natures))
    def clear(self):
        self.__k_format_cache.clear()
        self.__nature_cache.clear()
        keys = RedisUtils.keys(self.__get_redis(), "k_format-*")
        for k in keys:
            RedisUtils.delete(self.__get_redis(), k)
        keys = RedisUtils.keys(self.__get_redis(), "code_nature-*")
        for k in keys:
            RedisUtils.delete(self.__get_redis(), k)
    @classmethod
    def get_nature(cls, code):
        val = RedisUtils.get(cls.__get_redis(), f"code_nature-{code}")
    # 保存股性
    def save_nature(self, code, natures):
        RedisUtils.setex_async(self.__db, f"code_nature-{code}", tool.get_expire(), json.dumps(natures))
    def get_nature(self, code):
        val = RedisUtils.get(self.__get_redis(), f"code_nature-{code}")
        if val:
            return json.loads(val)
        return None
    @classmethod
    def get_nature_cache(cls, code):
        if code in cls.__nature_cache:
            return cls.__nature_cache[code]
        val = cls.get_nature(code)
        if val:
            cls.__nature_cache[code] = val
        return val
    def get_nature_cache(self, code):
        return self.__nature_cache.get(code)
class LatestMaxVolumeManager:
@@ -119,12 +144,57 @@
            RedisUtils.delete_async(self.__db, k)
# 涨幅过高的票管理
class HighIncreaseCodeManager:
    __db = 0
    __instance = None
    __redis_manager = redis_manager.RedisManager(0)
    __high_increase_codes = set()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(HighIncreaseCodeManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            cls.__high_increase_codes = RedisUtils.smembers(__redis, "high_increase_codes")
        finally:
            RedisUtils.realse(__redis)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def add_code(self, code):
        if self.__high_increase_codes is None:
            self.__high_increase_codes = set()
        self.__high_increase_codes.add(code)
        RedisUtils.sadd_async(self.__db, "high_increase_codes", code)
        RedisUtils.expire_async(self.__db, "high_increase_codes", tool.get_expire())
    def is_in(self, code):
        return code in self.__high_increase_codes
    # 加载全部
    def list_all(self):
        return self.__high_increase_codes
    def clear(self):
        if self.__high_increase_codes:
            self.__high_increase_codes.clear()
        RedisUtils.delete_async(self.__db, "high_increase_codes")
# 设置历史K线
def set_record_datas(code, limit_up_price, record_datas):
    k_format = get_k_format(float(limit_up_price), record_datas)
    CodeNatureRecordManager.save_k_format(code, k_format)
    CodeNatureRecordManager().save_k_format(code, k_format)
    natures = get_nature(record_datas)
    CodeNatureRecordManager.save_nature(code, natures)
    CodeNatureRecordManager().save_nature(code, natures)
# 获取K线形态
@@ -215,7 +285,7 @@
        for t in temp_datas:
            if t[1]:
                t_count += 1
        if t_count >= 3 and limit_down_rate < 0.2:
        if t_count >= 3 and limit_down_rate < 0.15:
            # 降幅小于20%
            return True
@@ -299,33 +369,17 @@
    datas = copy.deepcopy(datas)
    datas.sort(key=lambda x: x["bob"])
    datas = datas[-80:]
    max_volume = 0
    max_volume_index = 0
    max_price = 0
    max_price_index = 0
    for index in range(0, len(datas)):
        data = datas[index]
        if max_volume < data["volume"]:
            max_volume = data["volume"]
            max_volume_index = index
    price = 0
    price_index = 0
    for index in range(max_volume_index, len(datas)):
        data = datas[index]
        if data["high"] > price:
            price = data["high"]
            price_index = index
    index = price_index
    # 最大量当日最高价比当日之后的最高价涨幅在15%以内
    if (price - datas[max_volume_index]["high"]) / datas[max_volume_index]["high"] < 0.15:
        price = datas[max_volume_index]["high"]
        index = max_volume_index
    print(max_volume)
    rate = (price - limit_up_price) / limit_up_price
    if 0 < rate < 0.03:
        return True, datas[index]['bob'].strftime("%Y-%m-%d")
        if data["high"] > max_price:
            max_price = data["high"]
            max_price_index = index
    rate = (max_price - float(limit_up_price)) / max_price
    if 0 < rate < 0.02:
        return True, datas[max_price_index]['bob'].strftime("%Y-%m-%d")
    return False, ''
@@ -493,4 +547,6 @@
if __name__ == "__main__":
    print(CodeNatureRecordManager.get_k_format("603717"))
    HighIncreaseCodeManager().add_code("000333")
    print(HighIncreaseCodeManager().is_in("000333"))
    print(HighIncreaseCodeManager().is_in("000222"))