"""
|
redis管理器
|
"""
|
import builtins
|
import json
|
import logging
|
import queue
|
import time
|
|
import redis
|
|
import constant
|
from log_module import async_log_util
|
from log_module.log import logger_redis_debug, logger_system
|
from utils import tool, middle_api_protocol
|
|
from db import redis_manager
|
|
config = constant.REDIS_CONFIG
|
|
pool_caches = [redis.ConnectionPool(host=config["host"], port=config["port"], password=config["pwd"],
|
db=db, decode_responses=True, max_connections=50) for db in range(16)]
|
|
|
class RedisManager:
|
|
def __init__(self, db=config["db"]):
|
self.pool = pool_caches[db]
|
self.db = db
|
|
def getRedis(self):
|
return redis.Redis(connection_pool=self.pool)
|
|
def getRedisNoPool(self):
|
return redis.Redis(host=config["host"], port=config["port"], password=config["pwd"], db=self.db,
|
decode_responses=True)
|
|
|
class RedisUtils:
|
# 本机执行redis
|
__LOCAL_REQUEST = True
|
|
__async_task_queue = queue.Queue(maxsize=4096)
|
|
@classmethod
|
def exec(cls, method_name, key, lamada_method):
|
__start_time = time.time()
|
try:
|
return lamada_method()
|
finally:
|
# logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key)
|
pass
|
|
@classmethod
|
def __request(cls, db, cmd, key, args=None):
|
if cls.__LOCAL_REQUEST:
|
redis = RedisManager(db).getRedis()
|
method = getattr(redis_manager.RedisUtils, cmd)
|
args_ = [redis, key]
|
if args is not None:
|
if builtins.type(args) == tuple or builtins.type(args) == list:
|
args = list(args)
|
if cmd == "setex":
|
args_.append(args[0])
|
if type(args[1]) == list:
|
args_.append(json.dumps(args[1]))
|
else:
|
args_.append(args[1])
|
else:
|
for a in args:
|
args_.append(a)
|
else:
|
args_.append(args)
|
args_ = tuple(args_)
|
result = method(*args_)
|
if builtins.type(result) == set:
|
result = list(result)
|
return result
|
else:
|
data = {
|
"db": db,
|
"cmd": cmd,
|
"key": key,
|
}
|
if args is not None:
|
data["args"] = args
|
fdata = middle_api_protocol.load_redis_cmd(data)
|
result = middle_api_protocol.request(fdata)
|
return result
|
|
# [(db, cmd, key, args)]
|
@classmethod
|
def __batch__request(cls, odatas):
|
if cls.__LOCAL_REQUEST:
|
result_list = []
|
for d in odatas:
|
db = d[0]
|
cmd = d[1]
|
key = d[2]
|
args = None
|
if d[3] is not None:
|
args = d[3]
|
result = cls.__request(db, cmd, key, args)
|
result_list.append(result)
|
return result_list
|
else:
|
_datas = []
|
for d in odatas:
|
data = {
|
"db": d[0],
|
"cmd": d[1],
|
"key": d[2]
|
}
|
if d[3] is not None:
|
data["args"] = d[3]
|
_datas.append(data)
|
fdata = middle_api_protocol.load_redis_cmds(_datas)
|
results = middle_api_protocol.request(fdata)
|
return results
|
|
@classmethod
|
def __get_db(cls, redis_):
|
return redis_.connection_pool.connection_kwargs['db']
|
|
@classmethod
|
def get(cls, redis_, key, auto_free=True):
|
return cls.exec("get", key, lambda: cls.__request(cls.__get_db(redis_), "get", key))
|
|
@classmethod
|
def scard(cls, redis_, key, auto_free=True):
|
return cls.exec("scard", key, lambda: cls.__request(cls.__get_db(redis_), "scard", key))
|
|
@classmethod
|
def delete(cls, redis_, key, auto_free=True, _async=False):
|
return cls.exec("delete", key, lambda: cls.__request(cls.__get_db(redis_), "delete", key))
|
|
@classmethod
|
def delete_async(cls, db, key, auto_free=True):
|
cls.add_async_task(db, "delete", (key))
|
# logger_redis_debug.info("delete_async({}):{}", 0, key)
|
|
@classmethod
|
def keys(cls, redis_, key, auto_free=True):
|
async_log_util.info(logger_redis_debug, f"keys:{key}")
|
return cls.exec("keys", key, lambda: cls.__request(cls.__get_db(redis_), "keys", key))
|
|
@classmethod
|
def set(cls, redis_, key, val, auto_free=True):
|
return cls.exec("set", key, lambda: cls.__request(cls.__get_db(redis_), "set", key, val))
|
|
@classmethod
|
def set_async(cls, db, key, val, auto_free=True):
|
cls.add_async_task(db, "set", (key, val))
|
# logger_redis_debug.info("setex_async({}):{}", 0, key)
|
|
@classmethod
|
def setex(cls, redis_, key, expire, val, auto_free=True, _async=False):
|
return cls.exec("setex", key, lambda: cls.__request(cls.__get_db(redis_), "setex", key, [expire, val]))
|
|
@classmethod
|
def setex_async(cls, db, key, expire, val, auto_free=True):
|
cls.add_async_task(db, "setex", (key, expire, val))
|
# logger_redis_debug.info("setex_async({}):{}", 0, key)
|
|
@classmethod
|
def setnx(cls, redis_, key, val, auto_free=True):
|
return cls.exec("setnx", key, lambda: cls.__request(cls.__get_db(redis_), "setnx", key, val))
|
|
@classmethod
|
def expire(cls, redis_, key, expire, auto_free=True):
|
return cls.exec("expire", key, lambda: cls.__request(cls.__get_db(redis_), "expire", key, expire))
|
|
@classmethod
|
def expire_async(cls, db, key, expire, auto_free=True):
|
cls.add_async_task(db, "expire", (key, expire))
|
# logger_redis_debug.info("expire_async({}):{}", 0, key)
|
|
@classmethod
|
def sadd(cls, redis_, key, val, auto_free=True):
|
return cls.exec("sadd", key, lambda: cls.__request(cls.__get_db(redis_), "sadd", key, val))
|
|
@classmethod
|
def sadd_async(cls, db, key, val, auto_free=True):
|
cls.add_async_task(db, "sadd", (key, val))
|
# logger_redis_debug.info("sadd_async({}):{}", 0, key)
|
|
@classmethod
|
def sismember(cls, redis_, key, val, auto_free=True):
|
return cls.exec("sismember", key, lambda: cls.__request(cls.__get_db(redis_), "sismember", key, val))
|
|
@classmethod
|
def smembers(cls, redis_, key, auto_free=True):
|
result = cls.exec("smembers", key, lambda: cls.__request(cls.__get_db(redis_), "smembers", key))
|
if type(result) == list:
|
result = set(result)
|
return result
|
|
@classmethod
|
def srem(cls, redis_, key, val, auto_free=True):
|
return cls.exec("srem", key, lambda: cls.__request(cls.__get_db(redis_), "srem", key, val))
|
|
@classmethod
|
def srem_async(cls, db, key, val, auto_free=True):
|
cls.add_async_task(db, "srem", (key, val))
|
# logger_redis_debug.info("srem_async({}):{}", 0, key)
|
|
@classmethod
|
def incrby(cls, redis_, key, num, auto_free=True, _async=False):
|
return cls.exec("incrby", key, lambda: cls.__request(cls.__get_db(redis_), "incrby", key, num))
|
|
@classmethod
|
def incrby_async(cls, db, key, num, auto_free=True):
|
cls.add_async_task(db, "incrby", (key, num))
|
# logger_redis_debug.info("incrby_async({}):{}", 0, key)
|
|
@classmethod
|
def lpush(cls, redis_, key, val, auto_free=True):
|
return cls.exec("lpush", key, lambda: cls.__request(cls.__get_db(redis_), "lpush", key, val))
|
|
@classmethod
|
def lpop(cls, redis_, key, auto_free=True):
|
return cls.exec("lpop", key, lambda: cls.__request(cls.__get_db(redis_), "lpop", key))
|
|
@classmethod
|
def rpush(cls, redis_, key, val, auto_free=True):
|
return cls.exec("rpush", key, lambda: cls.__request(cls.__get_db(redis_), "rpush", key, val))
|
|
@classmethod
|
def realse(cls, redis_):
|
pass
|
|
@classmethod
|
def add_async_task(cls, db: int, method, args):
|
try:
|
cls.__async_task_queue.put_nowait((db, method, args))
|
except Exception as e:
|
async_log_util.error(logger_redis_debug, f"加入队列出错:{str(e)}")
|
|
@classmethod
|
def get_async_task_count(cls):
|
return cls.__async_task_queue.qsize()
|
|
# 运行异步任务
|
@classmethod
|
def run_loop(cls):
|
logger_system.info("启动Redis数据同步服务")
|
logger_system.info(f"redis 线程ID:{tool.get_thread_id()}")
|
dataList = []
|
last_upload_time = time.time()
|
while True:
|
try:
|
data = cls.__async_task_queue.get()
|
if data:
|
temp_data = [data[0], data[1]]
|
if type(data[2]) == tuple or type(data[2]) == list:
|
temp_data.append(data[2][0])
|
if len(data[2]) > 1:
|
temp_data.append(data[2][1:])
|
else:
|
temp_data.append(None)
|
else:
|
temp_data.append(data[2])
|
temp_data.append(None)
|
dataList.append(tuple(temp_data))
|
if len(dataList) >= 20:
|
try:
|
results = cls.__batch__request(dataList)
|
last_upload_time = time.time()
|
finally:
|
dataList.clear()
|
if dataList and time.time() - last_upload_time > 5:
|
results = cls.__batch__request(dataList)
|
last_upload_time = time.time()
|
dataList.clear()
|
# try:
|
# db = data[0]
|
# method_name = data[1]
|
# args = data[2]
|
# _redis = RedisManager(db).getRedisNoPool()
|
# method = getattr(RedisUtils, method_name)
|
# if type(args) == tuple:
|
# args = list(args)
|
# args.insert(0, _redis)
|
# args = tuple(args)
|
# result = method(*args)
|
# else:
|
# args = tuple([_redis, args])
|
# result = method(*args)
|
# except Exception as e2:
|
# logging.exception(e2)
|
# logging.error(data)
|
except Exception as e1:
|
logging.exception(e1)
|
pass
|
|
|
if __name__ == "__main__":
|
for i in range(30):
|
RedisUtils.setex_async(0, 'buy_position_info-002547', tool.get_expire(), 1011)
|
RedisUtils.run_loop()
|