Administrator
2025-06-11 6dbf7d8320b03533a8a7c70cb3cc309426eac94e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 涨停时间管理器
"""
涨停时间管理器
"""
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from utils import global_util, tool
 
 
class LimitUpTimeManager:
    __limit_up_time_cache = {}
    __db = 0
    _redisManager = redis_manager.RedisManager(0)
    __instance = None
 
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(LimitUpTimeManager, cls).__new__(cls, *args, **kwargs)
            cls.load_limit_up_time()
        return cls.__instance
 
    @classmethod
    def load_limit_up_time(cls):
        redis = cls._redisManager.getRedis()
        try:
            keys = RedisUtils.keys(redis, "limit_up_time-*", auto_free=False)
            for key in keys:
                code = key.replace("limit_up_time-", "")
                time_ = RedisUtils.get(redis, key, auto_free=False)
                global_util.limit_up_time[code] = time_
                tool.CodeDataCacheUtil.set_cache(cls.__limit_up_time_cache, code, time_)
        finally:
            RedisUtils.realse(redis)
 
    def save_limit_up_time(self, code, time):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__limit_up_time_cache, code)
        if not cache_result[0] or cache_result[1] is None:
            global_util.limit_up_time[code] = time
            tool.CodeDataCacheUtil.set_cache(self.__limit_up_time_cache, code, time)
            RedisUtils.setex_async(
                self.__db, "limit_up_time-{}".format(code), tool.get_expire(), time)
 
    def get_limit_up_time(self, code):
        time = global_util.limit_up_time.get(code)
        if time is None:
            time = RedisUtils.get(self._redisManager.getRedis(), "limit_up_time-{}".format(code))
            if time is not None:
                global_util.limit_up_time[code] = time
        return time
 
    def get_limit_up_time_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__limit_up_time_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
 
 
if __name__ == "__main__":
    list = [("1234578", "09:00:03", None), ("12345", "09:00:01", True), ("123456", "09:00:00", True),
            ("123457", "09:00:04", False)]