""" redis管理器 """ import logging import queue import time import redis import constant from log_module.log import logger_redis_debug, logger_system from utils import tool, middle_api_protocol config = constant.REDIS_CONFIG pool_caches = [redis.ConnectionPool(host=config["host"], port=config["port"], password=config["pwd"], db=db, decode_responses=True, max_connections=50) for db in range(16)] class RedisManager: def __init__(self, db=config["db"]): self.pool = pool_caches[db] self.db = db def getRedis(self): return redis.Redis(connection_pool=self.pool) def getRedisNoPool(self): return redis.Redis(host=config["host"], port=config["port"], password=config["pwd"], db=self.db, decode_responses=True) class RedisUtils: __async_task_queue = queue.Queue() @classmethod def exec(cls, method_name, key, lamada_method): __start_time = time.time() try: return lamada_method() finally: # logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key) pass @classmethod def __request(cls, db, cmd, key, args=None): data = { "db": db, "cmd": cmd, "key": key, } if args is not None: data["args"] = args fdata = middle_api_protocol.load_redis_cmd(data) result = middle_api_protocol.request(fdata) return result @classmethod def __get_db(cls, redis_): return redis_.connection_pool.connection_kwargs['db'] @classmethod def get(cls, redis_, key, auto_free=True): return cls.exec("get", key, lambda: cls.__request(cls.__get_db(redis_), "get", key)) @classmethod def scard(cls, redis_, key, auto_free=True): return cls.exec("scard", key, lambda: cls.__request(cls.__get_db(redis_), "scard", key)) @classmethod def delete(cls, redis_, key, auto_free=True, _async=False): return cls.exec("delete", key, lambda: cls.__request(cls.__get_db(redis_), "delete", key)) @classmethod def delete_async(cls, db, key, auto_free=True): cls.add_async_task(db, "delete", (key)) # logger_redis_debug.info("delete_async({}):{}", 0, key) @classmethod def keys(cls, redis_, key, auto_free=True): return cls.exec("keys", key, lambda: cls.__request(cls.__get_db(redis_), "keys", key)) @classmethod def set(cls, redis_, key, val, auto_free=True): return cls.exec("set", key, lambda: cls.__request(cls.__get_db(redis_), "set", key, val)) @classmethod def setex(cls, redis_, key, expire, val, auto_free=True, _async=False): return cls.exec("setex", key, lambda: cls.__request(cls.__get_db(redis_), "setex", key, [expire, val])) @classmethod def setex_async(cls, db, key, expire, val, auto_free=True): cls.add_async_task(db, "setex", (key, expire, val)) # logger_redis_debug.info("setex_async({}):{}", 0, key) @classmethod def setnx(cls, redis_, key, val, auto_free=True): return cls.exec("setnx", key, lambda: cls.__request(cls.__get_db(redis_), "setnx", key, val)) @classmethod def expire(cls, redis_, key, expire, auto_free=True): return cls.exec("expire", key, lambda: cls.__request(cls.__get_db(redis_), "expire", key, expire)) @classmethod def expire_async(cls, db, key, expire, auto_free=True): cls.add_async_task(db, "expire", (key, expire)) # logger_redis_debug.info("expire_async({}):{}", 0, key) @classmethod def sadd(cls, redis_, key, val, auto_free=True): return cls.exec("sadd", key, lambda: cls.__request(cls.__get_db(redis_), "sadd", key, val)) @classmethod def sadd_async(cls, db, key, val, auto_free=True): cls.add_async_task(db, "sadd", (key, val)) # logger_redis_debug.info("sadd_async({}):{}", 0, key) @classmethod def sismember(cls, redis_, key, val, auto_free=True): return cls.exec("sismember", key, lambda: cls.__request(cls.__get_db(redis_), "sismember", key, val)) @classmethod def smembers(cls, redis_, key, auto_free=True): result = cls.exec("smembers", key, lambda: cls.__request(cls.__get_db(redis_), "smembers", key)) if type(result) == list: result = set(result) return result @classmethod def srem(cls, redis_, key, val, auto_free=True): return cls.exec("srem", key, lambda: cls.__request(cls.__get_db(redis_), "srem", key, val)) @classmethod def srem_async(cls, db, key, val, auto_free=True): cls.add_async_task(db, "srem", (key, val)) # logger_redis_debug.info("srem_async({}):{}", 0, key) @classmethod def incrby(cls, redis_, key, num, auto_free=True, _async=False): return cls.exec("incrby", key, lambda: cls.__request(cls.__get_db(redis_), "incrby", key, num)) @classmethod def incrby_async(cls, db, key, num, auto_free=True): cls.add_async_task(db, "incrby", (key, num)) # logger_redis_debug.info("incrby_async({}):{}", 0, key) @classmethod def lpush(cls, redis_, key, val, auto_free=True): return cls.exec("lpush", key, lambda: cls.__request(cls.__get_db(redis_), "lpush", key, val)) @classmethod def lpop(cls, redis_, key, auto_free=True): return cls.exec("lpop", key, lambda: cls.__request(cls.__get_db(redis_), "lpop", key)) @classmethod def rpush(cls, redis_, key, val, auto_free=True): return cls.exec("rpush", key, lambda: cls.__request(cls.__get_db(redis_), "rpush", key, val)) @classmethod def realse(cls, redis_): pass @classmethod def add_async_task(cls, db, method, args): cls.__async_task_queue.put_nowait((db, method, args)) @classmethod def get_async_task_count(cls): return cls.__async_task_queue.qsize() # 运行异步任务 @classmethod def run_loop(cls): logger_system.info("启动Redis数据同步服务") while True: try: data = cls.__async_task_queue.get() if data: try: db = data[0] method_name = data[1] args = data[2] _redis = RedisManager(db).getRedisNoPool() method = getattr(RedisUtils, method_name) if type(args) == tuple: args = list(args) args.insert(0, _redis) args = tuple(args) result = method(*args) else: args = tuple([_redis, args]) result = method(*args) except Exception as e2: logging.exception(e2) logging.error(data) except Exception as e1: logging.exception(e1) pass if __name__ == "__main__": pass