Administrator
2023-08-15 68237432c42b70b8f27dc07c37f85414ee5234bf
code_attribute/gpcode_first_screen_manager.py
@@ -5,16 +5,19 @@
# 设置首板未筛选的目标票
import json
from db.redis_manager import RedisUtils
from db.redis_manager_delegate import RedisUtils
from utils import tool
from db import redis_manager
from third_data import  block_info
from db import redis_manager_delegate as redis_manager
from third_data import block_info
__redisManager = redis_manager.RedisManager(0)
def __get_redis():
    return __redisManager.getRedis()
__first_code_data_cache = {}
# 保存首板票的数据
@@ -24,7 +27,8 @@
# 4.最近炸开时间
# 5.是否已经涨停
def __save_first_code_data(code, data):
    RedisUtils.setex(__redisManager.getRedis(),f"first_code_data-{code}", tool.get_expire(), json.dumps(data))
    tool.CodeDataCacheUtil.set_cache(__first_code_data_cache, code, data)
    RedisUtils.setex(__redisManager.getRedis(), f"first_code_data-{code}", tool.get_expire(), json.dumps(data))
def __get_first_code_data(code):
@@ -34,25 +38,39 @@
    return json.loads(val)
def __get_first_code_data_cache(code):
    cache_result = tool.CodeDataCacheUtil.get_cache(__first_code_data_cache, code)
    if cache_result[0]:
        return cache_result[1]
    val = __get_first_code_data(code)
    tool.CodeDataCacheUtil.set_cache(__first_code_data_cache, code, val)
    return val
# 添加进首板未筛选票
def __add_first_no_screen_codes(codes):
    redis = __redisManager.getRedis()
    if codes:
        for code in codes:
            RedisUtils.sadd(redis, "first_no_screen_codes", code)
        RedisUtils.expire(redis, "first_no_screen_codes", tool.get_expire())
    try:
        if codes:
            for code in codes:
                RedisUtils.sadd(redis, "first_no_screen_codes", code, auto_free=False)
            RedisUtils.expire(redis, "first_no_screen_codes", tool.get_expire(), auto_free=False)
    finally:
        RedisUtils.realse(redis)
def clear_first_no_screen_codes():
    redis = __redisManager.getRedis()
    RedisUtils.delete(redis, "first_no_screen_codes")
    RedisUtils.delete(__redisManager.getRedis(), "first_no_screen_codes")
def __remove_first_no_screen_codes(codes):
    redis = __redisManager.getRedis()
    if codes:
        for code in codes:
            redis.srem("first_no_screen_codes", code)
    try:
        if codes:
            for code in codes:
                RedisUtils.srem(redis, "first_no_screen_codes", code, auto_free= False)
    finally:
        RedisUtils.realse(redis)
def __get_first_no_screen_codes():
@@ -67,7 +85,7 @@
    for price in prices:
        code = price["code"]
        time_ = price["time"]
        old_data = __get_first_code_data(code)
        old_data = __get_first_code_data_cache(code)
        if old_data is None:
            continue
        limit_up = price["limit_up"]