""" redis管理器 """ import logging import queue import time import redis import constant from log_module.log import logger_redis_debug config = constant.REDIS_CONFIG pool_caches = [redis.ConnectionPool(host=config["host"], port=config["port"], password=config["pwd"], db=db, decode_responses=True, max_connections=50) for db in range(16)] class RedisManager: def __init__(self, db=config["db"]): self.pool = pool_caches[db] self.db = db def getRedis(self): return redis.Redis(connection_pool=self.pool) def getRedisNoPool(self): return redis.Redis(host=config["host"], port=config["port"], password=config["pwd"], db=self.db, decode_responses=True) class RedisUtils: __async_task_queue = queue.Queue(maxsize=1024) @classmethod def exec(cls, method_name, key, lamada_method): __start_time = time.time() try: return lamada_method() finally: # logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key) pass @classmethod def get(cls, redis_, key, auto_free=True): return cls.exec("get", key, lambda: redis_.get(key)) @classmethod def scard(cls, redis_, key, auto_free=True): return cls.exec("scard", key, lambda: redis_.scard(key)) @classmethod def delete(cls, redis_, key, auto_free=True, _async=False): return cls.exec("delete", key, lambda: redis_.delete(key)) @classmethod def delete_async(cls, db, key, auto_free=True): cls.add_async_task(db, "delete", (key)) logger_redis_debug.info("delete_async({}):{}", 0, key) @classmethod def keys(cls, redis_, key, auto_free=True): return cls.exec("keys", key, lambda: redis_.keys(key)) @classmethod def set(cls, redis_, key, val, auto_free=True): return cls.exec("set", key, lambda: redis_.set(key, val)) @classmethod def setex(cls, redis_, key, expire, val, auto_free=True, _async=False): return cls.exec("setex", key, lambda: redis_.setex(key, expire, val)) @classmethod def setex_async(cls, db, key, expire, val, auto_free=True): cls.add_async_task(db, "setex", (key, expire, val)) logger_redis_debug.info("setex_async({}):{}", 0, key) @classmethod def setnx(cls, redis_, key, val, auto_free=True): return cls.exec("setnx", key, lambda: redis_.setnx(key, val)) @classmethod def expire(cls, redis_, key, expire, auto_free=True): return cls.exec("expire", key, lambda: redis_.expire(key, expire)) @classmethod def expire_async(cls, db, key, expire, auto_free=True): cls.add_async_task(db, "expire", (key, expire)) logger_redis_debug.info("expire_async({}):{}", 0, key) @classmethod def sadd(cls, redis_, key, val, auto_free=True): return cls.exec("sadd", key, lambda: redis_.sadd(key, val)) @classmethod def sadd_async(cls, db, key, val, auto_free=True): cls.add_async_task(db, "sadd", (key, val)) logger_redis_debug.info("sadd_async({}):{}", 0, key) @classmethod def sismember(cls, redis_, key, val, auto_free=True): return cls.exec("sismember", key, lambda: redis_.sismember(key, val)) @classmethod def smembers(cls, redis_, key, auto_free=True): return cls.exec("smembers", key, lambda: redis_.smembers(key)) @classmethod def srem(cls, redis_, key, val, auto_free=True): return cls.exec("srem", key, lambda: redis_.srem(key, val)) @classmethod def srem_async(cls, db, key, val, auto_free=True): cls.add_async_task(db, "srem", (key, val)) logger_redis_debug.info("srem_async({}):{}", 0, key) @classmethod def incrby(cls, redis_, key, num, auto_free=True, _async=False): return cls.exec("incrby", key, lambda: redis_.incrby(key, num)) @classmethod def incrby_async(cls, db, key, num, auto_free=True): cls.add_async_task(db, "incrby", (key, num)) logger_redis_debug.info("incrby_async({}):{}", 0, key) @classmethod def lpush(cls, redis_, key, val, auto_free=True): return cls.exec("lpush", key, lambda: redis_.lpush(key, val)) @classmethod def lpop(cls, redis_, key, auto_free=True): return cls.exec("lpop", key, lambda: redis_.lpop(key)) @classmethod def rpush(cls, redis_, key, val, auto_free=True): return cls.exec("rpush", key, lambda: redis_.rpush(key, val)) @classmethod def realse(cls, redis_): pass @classmethod def add_async_task(cls, db, method, args): cls.__async_task_queue.put_nowait((db, method, args)) @classmethod def get_async_task_count(cls): return cls.__async_task_queue.qsize() # 运行异步任务 @classmethod def run_loop(cls): while True: try: data = cls.__async_task_queue.get() if data: db = data[0] method_name = data[1] args = data[2] _redis = RedisManager(db).getRedisNoPool() method = getattr(_redis, method_name) if type(args) == tuple: result = method(*args) else: result = method(args) except Exception as e1: logging.exception(e1) time.sleep(1) if __name__ == "__main__": redis = RedisManager(1).getRedis() db = redis.connection_pool.connection_kwargs['db'] print(db)