Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
db/redis_manager.py
@@ -1,14 +1,14 @@
"""
redis管理器
"""
import logging
import queue
import time
from threading import Thread
import redis
import constant
from log_module.log import logger_redis_debug
from utils import tool
config = constant.REDIS_CONFIG
@@ -31,197 +31,146 @@
class RedisUtils:
    __async_task_queue = queue.Queue(maxsize=1024)
    @classmethod
    def get(cls, redis_, key, auto_free=True):
    def exec(cls, method_name, key, lamada_method):
        __start_time = time.time()
        try:
            return redis_.get(key)
            return lamada_method()
        finally:
            logger_redis_debug.info("get({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
            # logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key)
            pass
    @classmethod
    def get(cls, redis_, key, auto_free=True):
        return cls.exec("get", key, lambda: redis_.get(key))
    @classmethod
    def scard(cls, redis_, key, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.scard(key)
        finally:
            logger_redis_debug.info("scard({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("scard", key, lambda: redis_.scard(key))
    @classmethod
    def delete(cls, redis_, key, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.delete(key)
        finally:
            logger_redis_debug.info("delete({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
    def delete(cls, redis_, key, auto_free=True, _async=False):
        return cls.exec("delete", key, lambda: redis_.delete(key))
    @classmethod
    def delete_async(cls, db, key, auto_free=True):
        cls.add_async_task(db, "delete", (key))
        logger_redis_debug.info("delete_async({}):{}", 0, key)
    @classmethod
    def keys(cls, redis_, key, auto_free=True):
        __start_time = time.time()
        try:
            logger_redis_debug.info("keys(start):{}", key)
            return redis_.keys(key)
        finally:
            logger_redis_debug.info("keys({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("keys", key, lambda: redis_.keys(key))
    @classmethod
    def set(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.set(key, val)
        finally:
            logger_redis_debug.info("set({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("set", key, lambda: redis_.set(key, val))
    @classmethod
    def setex(cls, redis_, key, expire, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.setex(key, expire, val)
        finally:
            logger_redis_debug.info("setex({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
    def setex(cls, redis_, key, expire, val, auto_free=True, _async=False):
        return cls.exec("setex", key, lambda: redis_.setex(key, expire, val))
    @classmethod
    def setex_async(cls, redis_, key, expire, val, auto_free=True):
        Thread(target=lambda: cls.setex(redis_, key, expire, val, auto_free)).start()
    def setex_async(cls, db, key, expire, val, auto_free=True):
        cls.add_async_task(db, "setex", (key, expire, val))
        logger_redis_debug.info("setex_async({}):{}", 0, key)
    @classmethod
    def setnx(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.setnx(key, val)
        finally:
            logger_redis_debug.info("setnx({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("setnx", key, lambda: redis_.setnx(key, val))
    @classmethod
    def expire(cls, redis_, key, expire, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.expire(key, expire)
        finally:
            logger_redis_debug.info("expire({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("expire", key, lambda: redis_.expire(key, expire))
    @classmethod
    def expire_async(cls, db, key, expire, auto_free=True):
        cls.add_async_task(db, "expire", (key, expire))
        logger_redis_debug.info("expire_async({}):{}", 0, key)
    @classmethod
    def sadd(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.sadd(key, val)
        finally:
            logger_redis_debug.info("sadd({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("sadd", key, lambda: redis_.sadd(key, val))
    @classmethod
    def sadd_async(cls, db, key, val, auto_free=True):
        cls.add_async_task(db, "sadd", (key, val))
        logger_redis_debug.info("sadd_async({}):{}", 0, key)
    @classmethod
    def sismember(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.sismember(key, val)
        finally:
            logger_redis_debug.info("sismember({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("sismember", key, lambda: redis_.sismember(key, val))
    @classmethod
    def smembers(cls, redis_, key, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.smembers(key)
        finally:
            logger_redis_debug.info("smembers({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("smembers", key, lambda: redis_.smembers(key))
    @classmethod
    def srem(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.srem(key, val)
        finally:
            logger_redis_debug.info("srem({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("srem", key, lambda: redis_.srem(key, val))
    @classmethod
    def incrby(cls, redis_, key, num, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.incrby(key, num)
        finally:
            logger_redis_debug.info("incrby({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
    def srem_async(cls, db, key, val, auto_free=True):
        cls.add_async_task(db, "srem", (key, val))
        logger_redis_debug.info("srem_async({}):{}", 0, key)
    @classmethod
    def incrby(cls, redis_, key, num, auto_free=True, _async=False):
        return cls.exec("incrby", key, lambda: redis_.incrby(key, num))
    @classmethod
    def incrby_async(cls, db, key, num, auto_free=True):
        cls.add_async_task(db, "incrby", (key, num))
        logger_redis_debug.info("incrby_async({}):{}", 0, key)
    @classmethod
    def lpush(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.lpush(key, val)
        finally:
            logger_redis_debug.info("lpush({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("lpush", key, lambda: redis_.lpush(key, val))
    @classmethod
    def lpop(cls, redis_, key, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.lpop(key)
        finally:
            logger_redis_debug.info("lpop({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("lpop", key, lambda: redis_.lpop(key))
    @classmethod
    def rpush(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.rpush(key, val)
        finally:
            logger_redis_debug.info("rpush({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("rpush", key, lambda: redis_.rpush(key, val))
    @classmethod
    def realse(cls, redis_):
        pass
    @classmethod
    def add_async_task(cls, db, method, args):
        cls.__async_task_queue.put_nowait((db, method, args))
    @classmethod
    def get_async_task_count(cls):
        return cls.__async_task_queue.qsize()
    # 运行异步任务
    @classmethod
    def run_loop(cls):
        while True:
            try:
                data = cls.__async_task_queue.get()
                if data:
                    db = data[0]
                    method_name = data[1]
                    args = data[2]
                    _redis = RedisManager(db).getRedisNoPool()
                    method = getattr(_redis, method_name)
                    if type(args) == tuple:
                        result = method(*args)
                    else:
                        result = method(args)
            except Exception as e1:
                logging.exception(e1)
                time.sleep(1)
if __name__ == "__main__":
    time1 = time.time()
    RedisUtils.setex_async(RedisManager(0).getRedis(), "test123123", tool.get_expire(), "123213")
    print(time.time() - time1)
    input()
    redis = RedisManager(1).getRedis()
    db = redis.connection_pool.connection_kwargs['db']
    print(db)