"""
|
redis管理器
|
"""
|
import logging
|
import queue
|
import time
|
|
import redis
|
|
import constant
|
from log_module.log import logger_redis_debug
|
|
config = constant.REDIS_CONFIG
|
|
pool_caches = [redis.ConnectionPool(host=config["host"], port=config["port"], password=config["pwd"],
|
db=db, decode_responses=True, max_connections=50) for db in range(16)]
|
|
|
class RedisManager:
|
|
def __init__(self, db=config["db"]):
|
self.pool = pool_caches[db]
|
self.db = db
|
|
def getRedis(self):
|
return redis.Redis(connection_pool=self.pool)
|
|
def getRedisNoPool(self):
|
return redis.Redis(host=config["host"], port=config["port"], password=config["pwd"], db=self.db,
|
decode_responses=True)
|
|
|
class RedisUtils:
|
__async_task_queue = queue.Queue(maxsize=1024)
|
|
@classmethod
|
def exec(cls, method_name, key, lamada_method):
|
__start_time = time.time()
|
try:
|
return lamada_method()
|
finally:
|
# logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key)
|
pass
|
|
@classmethod
|
def get(cls, redis_, key, auto_free=True):
|
return cls.exec("get", key, lambda: redis_.get(key))
|
|
@classmethod
|
def scard(cls, redis_, key, auto_free=True):
|
return cls.exec("scard", key, lambda: redis_.scard(key))
|
|
@classmethod
|
def delete(cls, redis_, key, auto_free=True, _async=False):
|
return cls.exec("delete", key, lambda: redis_.delete(key))
|
|
@classmethod
|
def delete_async(cls, db, key, auto_free=True):
|
cls.add_async_task(db, "delete", (key))
|
logger_redis_debug.info("delete_async({}):{}", 0, key)
|
|
@classmethod
|
def keys(cls, redis_, key, auto_free=True):
|
return cls.exec("keys", key, lambda: redis_.keys(key))
|
|
@classmethod
|
def set(cls, redis_, key, val, auto_free=True):
|
return cls.exec("set", key, lambda: redis_.set(key, val))
|
|
@classmethod
|
def setex(cls, redis_, key, expire, val, auto_free=True, _async=False):
|
return cls.exec("setex", key, lambda: redis_.setex(key, expire, val))
|
|
@classmethod
|
def setex_async(cls, db, key, expire, val, auto_free=True):
|
cls.add_async_task(db, "setex", (key, expire, val))
|
logger_redis_debug.info("setex_async({}):{}", 0, key)
|
|
@classmethod
|
def setnx(cls, redis_, key, val, auto_free=True):
|
return cls.exec("setnx", key, lambda: redis_.setnx(key, val))
|
|
@classmethod
|
def expire(cls, redis_, key, expire, auto_free=True):
|
return cls.exec("expire", key, lambda: redis_.expire(key, expire))
|
|
@classmethod
|
def expire_async(cls, db, key, expire, auto_free=True):
|
cls.add_async_task(db, "expire", (key, expire))
|
logger_redis_debug.info("expire_async({}):{}", 0, key)
|
|
@classmethod
|
def sadd(cls, redis_, key, val, auto_free=True):
|
return cls.exec("sadd", key, lambda: redis_.sadd(key, val))
|
|
@classmethod
|
def sadd_async(cls, db, key, val, auto_free=True):
|
cls.add_async_task(db, "sadd", (key, val))
|
logger_redis_debug.info("sadd_async({}):{}", 0, key)
|
|
@classmethod
|
def sismember(cls, redis_, key, val, auto_free=True):
|
return cls.exec("sismember", key, lambda: redis_.sismember(key, val))
|
|
@classmethod
|
def smembers(cls, redis_, key, auto_free=True):
|
return cls.exec("smembers", key, lambda: redis_.smembers(key))
|
|
@classmethod
|
def srem(cls, redis_, key, val, auto_free=True):
|
return cls.exec("srem", key, lambda: redis_.srem(key, val))
|
|
@classmethod
|
def srem_async(cls, db, key, val, auto_free=True):
|
cls.add_async_task(db, "srem", (key, val))
|
logger_redis_debug.info("srem_async({}):{}", 0, key)
|
|
@classmethod
|
def incrby(cls, redis_, key, num, auto_free=True, _async=False):
|
return cls.exec("incrby", key, lambda: redis_.incrby(key, num))
|
|
@classmethod
|
def incrby_async(cls, db, key, num, auto_free=True):
|
cls.add_async_task(db, "incrby", (key, num))
|
logger_redis_debug.info("incrby_async({}):{}", 0, key)
|
|
@classmethod
|
def lpush(cls, redis_, key, val, auto_free=True):
|
return cls.exec("lpush", key, lambda: redis_.lpush(key, val))
|
|
@classmethod
|
def lpop(cls, redis_, key, auto_free=True):
|
return cls.exec("lpop", key, lambda: redis_.lpop(key))
|
|
@classmethod
|
def rpush(cls, redis_, key, val, auto_free=True):
|
return cls.exec("rpush", key, lambda: redis_.rpush(key, val))
|
|
@classmethod
|
def realse(cls, redis_):
|
pass
|
|
@classmethod
|
def add_async_task(cls, db, method, args):
|
cls.__async_task_queue.put_nowait((db, method, args))
|
|
@classmethod
|
def get_async_task_count(cls):
|
return cls.__async_task_queue.qsize()
|
|
# 运行异步任务
|
@classmethod
|
def run_loop(cls):
|
while True:
|
try:
|
data = cls.__async_task_queue.get()
|
if data:
|
db = data[0]
|
method_name = data[1]
|
args = data[2]
|
_redis = RedisManager(db).getRedisNoPool()
|
method = getattr(_redis, method_name)
|
if type(args) == tuple:
|
result = method(*args)
|
else:
|
result = method(args)
|
|
except Exception as e1:
|
logging.exception(e1)
|
time.sleep(1)
|
|
|
if __name__ == "__main__":
|
redis = RedisManager(1).getRedis()
|
db = redis.connection_pool.connection_kwargs['db']
|
print(db)
|