Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
db/redis_manager.py
@@ -1,87 +1,176 @@
"""
redis管理器
"""
import logging
import queue
import time
import redis
import constant
from log_module.log import logger_redis_debug
config = constant.REDIS_CONFIG
pool_cache = {}
pool_caches = [redis.ConnectionPool(host=config["host"], port=config["port"], password=config["pwd"],
                                    db=db, decode_responses=True, max_connections=50) for db in range(16)]
class RedisManager:
    def __init__(self, db=config["db"]):
        pool = None
        if db in pool_cache:
            pool = pool_cache[db]
        else:
            pool = redis.ConnectionPool(host=config["host"], port=config["port"], password=config["pwd"],
                                        db=db, decode_responses=True, max_connections=200)
            pool_cache[db] = pool
        self.pool = pool
        self.pool = pool_caches[db]
        self.db = db
    def getRedis(self):
        return redis.Redis(connection_pool=self.pool)
    def getRedisNoPool(self):
        return redis.Redis(host=config["host"], port=config["port"], password=config["pwd"], db=self.db,
                           decode_responses=True)
class RedisUtils:
    __async_task_queue = queue.Queue(maxsize=1024)
    @classmethod
    def get(cls, redis_, key):
        return redis_.get(key)
    def exec(cls, method_name, key, lamada_method):
        __start_time = time.time()
        try:
            return lamada_method()
        finally:
            # logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key)
            pass
    @classmethod
    def delete(cls, redis_, key):
        return redis_.delete(key)
    def get(cls, redis_, key, auto_free=True):
        return cls.exec("get", key, lambda: redis_.get(key))
    @classmethod
    def keys(cls, redis_, key):
        return redis_.keys(key)
    def scard(cls, redis_, key, auto_free=True):
        return cls.exec("scard", key, lambda: redis_.scard(key))
    @classmethod
    def set(cls, redis_, key, val):
        return redis_.set(key, val)
    def delete(cls, redis_, key, auto_free=True, _async=False):
        return cls.exec("delete", key, lambda: redis_.delete(key))
    @classmethod
    def setex(cls, redis_, key, expire, val):
        return redis_.setex(key, expire, val)
    def delete_async(cls, db, key, auto_free=True):
        cls.add_async_task(db, "delete", (key))
        logger_redis_debug.info("delete_async({}):{}", 0, key)
    @classmethod
    def expire(cls, redis_, key, expire):
        return redis_.expire(key, expire)
    def keys(cls, redis_, key, auto_free=True):
        return cls.exec("keys", key, lambda: redis_.keys(key))
    @classmethod
    def sadd(cls, redis_, key, val):
        return redis_.sadd(key, val)
    def set(cls, redis_, key, val, auto_free=True):
        return cls.exec("set", key, lambda: redis_.set(key, val))
    @classmethod
    def sismember(cls, redis_, key, val):
        return redis_.sismember(key, val)
    def setex(cls, redis_, key, expire, val, auto_free=True, _async=False):
        return cls.exec("setex", key, lambda: redis_.setex(key, expire, val))
    @classmethod
    def smembers(cls, redis_, key):
        return redis_.smembers(key)
    def setex_async(cls, db, key, expire, val, auto_free=True):
        cls.add_async_task(db, "setex", (key, expire, val))
        logger_redis_debug.info("setex_async({}):{}", 0, key)
    @classmethod
    def srem(cls, redis_, key, val):
        return redis_.srem(key, val)
    def setnx(cls, redis_, key, val, auto_free=True):
        return cls.exec("setnx", key, lambda: redis_.setnx(key, val))
    @classmethod
    def incrby(cls, redis_, key, num):
        return redis_.incrby(key, num)
    def expire(cls, redis_, key, expire, auto_free=True):
        return cls.exec("expire", key, lambda: redis_.expire(key, expire))
    @classmethod
    def lpush(cls, redis_, key, val):
        return redis_.lpush(key, val)
    def expire_async(cls, db, key, expire, auto_free=True):
        cls.add_async_task(db, "expire", (key, expire))
        logger_redis_debug.info("expire_async({}):{}", 0, key)
    @classmethod
    def lpop(cls, redis_, key):
        return redis_.lpop(key)
    def sadd(cls, redis_, key, val, auto_free=True):
        return cls.exec("sadd", key, lambda: redis_.sadd(key, val))
    @classmethod
    def sadd_async(cls, db, key, val, auto_free=True):
        cls.add_async_task(db, "sadd", (key, val))
        logger_redis_debug.info("sadd_async({}):{}", 0, key)
    @classmethod
    def sismember(cls, redis_, key, val, auto_free=True):
        return cls.exec("sismember", key, lambda: redis_.sismember(key, val))
    @classmethod
    def smembers(cls, redis_, key, auto_free=True):
        return cls.exec("smembers", key, lambda: redis_.smembers(key))
    @classmethod
    def srem(cls, redis_, key, val, auto_free=True):
        return cls.exec("srem", key, lambda: redis_.srem(key, val))
    @classmethod
    def srem_async(cls, db, key, val, auto_free=True):
        cls.add_async_task(db, "srem", (key, val))
        logger_redis_debug.info("srem_async({}):{}", 0, key)
    @classmethod
    def incrby(cls, redis_, key, num, auto_free=True, _async=False):
        return cls.exec("incrby", key, lambda: redis_.incrby(key, num))
    @classmethod
    def incrby_async(cls, db, key, num, auto_free=True):
        cls.add_async_task(db, "incrby", (key, num))
        logger_redis_debug.info("incrby_async({}):{}", 0, key)
    @classmethod
    def lpush(cls, redis_, key, val, auto_free=True):
        return cls.exec("lpush", key, lambda: redis_.lpush(key, val))
    @classmethod
    def lpop(cls, redis_, key, auto_free=True):
        return cls.exec("lpop", key, lambda: redis_.lpop(key))
    @classmethod
    def rpush(cls, redis_, key, val, auto_free=True):
        return cls.exec("rpush", key, lambda: redis_.rpush(key, val))
    @classmethod
    def realse(cls, redis_):
        pass
    @classmethod
    def add_async_task(cls, db, method, args):
        cls.__async_task_queue.put_nowait((db, method, args))
    @classmethod
    def get_async_task_count(cls):
        return cls.__async_task_queue.qsize()
    # 运行异步任务
    @classmethod
    def run_loop(cls):
        while True:
            try:
                data = cls.__async_task_queue.get()
                if data:
                    db = data[0]
                    method_name = data[1]
                    args = data[2]
                    _redis = RedisManager(db).getRedisNoPool()
                    method = getattr(_redis, method_name)
                    if type(args) == tuple:
                        result = method(*args)
                    else:
                        result = method(args)
            except Exception as e1:
                logging.exception(e1)
                time.sleep(1)
if __name__ == "__main__":
    pass
    redis = RedisManager(1).getRedis()
    db = redis.connection_pool.connection_kwargs['db']
    print(db)