Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
code_attribute/gpcode_first_screen_manager.py
@@ -8,76 +8,106 @@
from db.redis_manager_delegate import RedisUtils
from utils import tool
from db import redis_manager_delegate as redis_manager
from third_data import block_info
__redisManager = redis_manager.RedisManager(0)
def __get_redis():
    return __redisManager.getRedis()
class FirstCodeDataManager:
    __instance = None
    __redisManager = redis_manager.RedisManager(0)
    __db = 0
    __first_code_data_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(FirstCodeDataManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
__first_code_data_cache = {}
    # 加载数据
    @classmethod
    def __load_data(cls):
        keys = RedisUtils.keys(cls.__get_redis(), "first_code_data-*")
        for k in keys:
            val = RedisUtils.get(cls.__get_redis(), k)
            code = k.split("-")[-1]
            if val:
                cls.__first_code_data_cache[code] = json.loads(val)
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
# 保存首板票的数据
# 1.首次涨停时间
# 2.最近涨停时间
# 3.首次炸开时间
# 4.最近炸开时间
# 5.是否已经涨停
def __save_first_code_data(code, data):
    tool.CodeDataCacheUtil.set_cache(__first_code_data_cache, code, data)
    RedisUtils.setex(__redisManager.getRedis(), f"first_code_data-{code}", tool.get_expire(), json.dumps(data))
    # 保存首板票的数据
    # 1.首次涨停时间
    # 2.最近涨停时间
    # 3.首次炸开时间
    # 4.最近炸开时间
    # 5.是否已经涨停
    def save_first_code_data(self, code, data):
        tool.CodeDataCacheUtil.set_cache(self.__first_code_data_cache, code, data)
        RedisUtils.setex_async(self.__db, f"first_code_data-{code}", tool.get_expire(), json.dumps(data))
    def get_first_code_data(self, code):
        val = RedisUtils.get(self.__get_redis(), f"first_code_data-{code}")
        if val is None:
            return None
        return json.loads(val)
def __get_first_code_data(code):
    val = RedisUtils.get(__get_redis(), f"first_code_data-{code}")
    if val is None:
    def get_first_code_data_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__first_code_data_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    return json.loads(val)
def __get_first_code_data_cache(code):
    cache_result = tool.CodeDataCacheUtil.get_cache(__first_code_data_cache, code)
    if cache_result[0]:
        return cache_result[1]
    val = __get_first_code_data(code)
    tool.CodeDataCacheUtil.set_cache(__first_code_data_cache, code, val)
    return val
class FirstNoScreenCodesManager:
    __instance = None
    __redisManager = redis_manager.RedisManager(0)
    __db = 0
    __first_no_screen_codes_cache = set()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(FirstNoScreenCodesManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
# 添加进首板未筛选票
def __add_first_no_screen_codes(codes):
    redis = __redisManager.getRedis()
    try:
    # 加载数据
    @classmethod
    def __load_data(cls):
        codes = RedisUtils.smembers(cls.__get_redis(), "first_no_screen_codes")
        cls.__first_no_screen_codes_cache = codes
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    # 添加进首板未筛选票
    def add_first_no_screen_codes(self, codes):
        redis = self.__get_redis()
        if codes:
            for code in codes:
                RedisUtils.sadd(redis, "first_no_screen_codes", code, auto_free=False)
            RedisUtils.expire(redis, "first_no_screen_codes", tool.get_expire(), auto_free=False)
    finally:
        RedisUtils.realse(redis)
                self.__first_no_screen_codes_cache.add(code)
                RedisUtils.sadd_async(self.__db, "first_no_screen_codes", code, auto_free=False)
            RedisUtils.expire_async(self.__db, "first_no_screen_codes", tool.get_expire(), auto_free=False)
    def clear_first_no_screen_codes(self):
        self.__first_no_screen_codes_cache.clear()
        RedisUtils.delete(self.__get_redis(), "first_no_screen_codes")
def clear_first_no_screen_codes():
    RedisUtils.delete(__redisManager.getRedis(), "first_no_screen_codes")
def __remove_first_no_screen_codes(codes):
    redis = __redisManager.getRedis()
    try:
    def remove_first_no_screen_codes(self, codes):
        if codes:
            for code in codes:
                RedisUtils.srem(redis, "first_no_screen_codes", code, auto_free= False)
    finally:
        RedisUtils.realse(redis)
                self.__first_no_screen_codes_cache.discard(code)
                RedisUtils.srem_async(self.__db, "first_no_screen_codes", code, auto_free=False)
    def __get_first_no_screen_codes(self):
        codes = RedisUtils.smembers(self.__get_redis(), "first_no_screen_codes")
        if not codes:
            return set()
        return codes
def __get_first_no_screen_codes():
    codes = RedisUtils.smembers(__get_redis(), "first_no_screen_codes")
    if not codes:
        return set()
    return codes
    def get_first_no_screen_codes_cache(self):
        return self.__first_no_screen_codes_cache
# 处理ticks数据
@@ -85,7 +115,7 @@
    for price in prices:
        code = price["code"]
        time_ = price["time"]
        old_data = __get_first_code_data_cache(code)
        old_data = FirstCodeDataManager().get_first_code_data_cache(code)
        if old_data is None:
            continue
        limit_up = price["limit_up"]
@@ -99,7 +129,7 @@
                old_data[2] = time_
            old_data[3] = time_
        old_data[4] = limit_up
        __save_first_code_data(code, old_data)
        FirstCodeDataManager().save_first_code_data(code, old_data)
def set_target_no_screen_codes(codes):
@@ -112,16 +142,16 @@
    del_codes = old_codes - codes_set
    add_codes = codes_set - old_codes
    if del_codes:
        __remove_first_no_screen_codes(del_codes)
        FirstNoScreenCodesManager().remove_first_no_screen_codes(del_codes)
    if add_codes:
        # 添加进首板未选票
        __add_first_no_screen_codes(add_codes)
        FirstNoScreenCodesManager().add_first_no_screen_codes(add_codes)
    return add_codes
# 获取首板未筛选的目标票
def get_target_no_screen_codes():
    return __get_first_no_screen_codes()
    return FirstNoScreenCodesManager().get_first_no_screen_codes_cache()
# 是否需要加入首板
@@ -152,11 +182,5 @@
                return True, "炸板后,60分钟内都未回封"
        if now_rate <= 6:
            return True, "炸板后,涨幅小于6%"
    blocks = block_info.get_code_blocks(code)
    if blocks and len(blocks) == 1:
        codes = block_info.get_block_codes(blocks[0])
        if codes:
            pass
    return False, "首板炸开后,涨幅≤6%"