""" redis管理器 """ import logging import queue import threading import time import redis import constant from log_module import async_log_util from log_module.log import logger_redis_debug, logger_system from trade import middle_api_protocol from utils import tool config = constant.REDIS_CONFIG pool_caches = [redis.ConnectionPool(host=config["host"], port=config["port"], password=config["pwd"], db=db, decode_responses=True, max_connections=50) for db in range(16)] class RedisManager: def __init__(self, db=config["db"]): self.pool = pool_caches[db] self.db = db def getRedis(self): return redis.Redis(connection_pool=self.pool) def getRedisNoPool(self): return redis.Redis(host=config["host"], port=config["port"], password=config["pwd"], db=self.db, decode_responses=True) class RedisUtils: __async_task_queue = queue.Queue() @classmethod def exec(cls, method_name, key, lamada_method): __start_time = time.time() try: return lamada_method() finally: # logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key) pass @classmethod def __request(cls, db, cmd, key, args=None): data = { "db": db, "cmd": cmd, "key": key, } if args is not None: data["args"] = args fdata = middle_api_protocol.load_redis_cmd(data) result = middle_api_protocol.request(fdata) return result # [(db, cmd, key, args)] @classmethod def __batch__request(cls, odatas): _datas = [] for d in odatas: data = { "db": d[0], "cmd": d[1], "key": d[2] } if d[3] is not None: data["args"] = d[3] _datas.append(data) fdata = middle_api_protocol.load_redis_cmds(_datas) results = middle_api_protocol.request(fdata) return results @classmethod def __get_db(cls, redis_): return redis_.connection_pool.connection_kwargs['db'] @classmethod def get(cls, redis_, key, auto_free=True): return cls.exec("get", key, lambda: cls.__request(cls.__get_db(redis_), "get", key)) @classmethod def scard(cls, redis_, key, auto_free=True): return cls.exec("scard", key, lambda: cls.__request(cls.__get_db(redis_), "scard", key)) @classmethod def delete(cls, redis_, key, auto_free=True, _async=False): return cls.exec("delete", key, lambda: cls.__request(cls.__get_db(redis_), "delete", key)) @classmethod def delete_async(cls, db, key, auto_free=True): cls.add_async_task(db, "delete", (key)) # logger_redis_debug.info("delete_async({}):{}", 0, key) @classmethod def keys(cls, redis_, key, auto_free=True): async_log_util.info(logger_redis_debug, f"keys:{key}") return cls.exec("keys", key, lambda: cls.__request(cls.__get_db(redis_), "keys", key)) @classmethod def set(cls, redis_, key, val, auto_free=True): return cls.exec("set", key, lambda: cls.__request(cls.__get_db(redis_), "set", key, val)) @classmethod def set_async(cls, db, key, val, auto_free=True): cls.add_async_task(db, "set", (key, val)) # logger_redis_debug.info("setex_async({}):{}", 0, key) @classmethod def setex(cls, redis_, key, expire, val, auto_free=True, _async=False): return cls.exec("setex", key, lambda: cls.__request(cls.__get_db(redis_), "setex", key, [expire, val])) @classmethod def setex_async(cls, db, key, expire, val, auto_free=True): cls.add_async_task(db, "setex", (key, expire, val)) # logger_redis_debug.info("setex_async({}):{}", 0, key) @classmethod def setnx(cls, redis_, key, val, auto_free=True): return cls.exec("setnx", key, lambda: cls.__request(cls.__get_db(redis_), "setnx", key, val)) @classmethod def expire(cls, redis_, key, expire, auto_free=True): return cls.exec("expire", key, lambda: cls.__request(cls.__get_db(redis_), "expire", key, expire)) @classmethod def expire_async(cls, db, key, expire, auto_free=True): cls.add_async_task(db, "expire", (key, expire)) # logger_redis_debug.info("expire_async({}):{}", 0, key) @classmethod def sadd(cls, redis_, key, val, auto_free=True): return cls.exec("sadd", key, lambda: cls.__request(cls.__get_db(redis_), "sadd", key, val)) @classmethod def sadd_async(cls, db, key, val, auto_free=True): cls.add_async_task(db, "sadd", (key, val)) # logger_redis_debug.info("sadd_async({}):{}", 0, key) @classmethod def sismember(cls, redis_, key, val, auto_free=True): return cls.exec("sismember", key, lambda: cls.__request(cls.__get_db(redis_), "sismember", key, val)) @classmethod def smembers(cls, redis_, key, auto_free=True): result = cls.exec("smembers", key, lambda: cls.__request(cls.__get_db(redis_), "smembers", key)) if type(result) == list: result = set(result) return result @classmethod def srem(cls, redis_, key, val, auto_free=True): return cls.exec("srem", key, lambda: cls.__request(cls.__get_db(redis_), "srem", key, val)) @classmethod def srem_async(cls, db, key, val, auto_free=True): cls.add_async_task(db, "srem", (key, val)) # logger_redis_debug.info("srem_async({}):{}", 0, key) @classmethod def incrby(cls, redis_, key, num, auto_free=True, _async=False): return cls.exec("incrby", key, lambda: cls.__request(cls.__get_db(redis_), "incrby", key, num)) @classmethod def incrby_async(cls, db, key, num, auto_free=True): cls.add_async_task(db, "incrby", (key, num)) # logger_redis_debug.info("incrby_async({}):{}", 0, key) @classmethod def lpush(cls, redis_, key, val, auto_free=True): return cls.exec("lpush", key, lambda: cls.__request(cls.__get_db(redis_), "lpush", key, val)) @classmethod def lpop(cls, redis_, key, auto_free=True): return cls.exec("lpop", key, lambda: cls.__request(cls.__get_db(redis_), "lpop", key)) @classmethod def rpush(cls, redis_, key, val, auto_free=True): return cls.exec("rpush", key, lambda: cls.__request(cls.__get_db(redis_), "rpush", key, val)) @classmethod def realse(cls, redis_): pass @classmethod def add_async_task(cls, db: int, method, args): cls.__async_task_queue.put_nowait((db, method, args)) @classmethod def get_async_task_count(cls): return cls.__async_task_queue.qsize() # 运行异步任务 @classmethod def run_loop(cls): logger_system.info("启动Redis数据同步服务") logger_system.info(f"redis 线程ID:{tool.get_thread_id()}") dataList = [] last_upload_time = time.time() while True: try: if not cls.__async_task_queue.empty(): data = cls.__async_task_queue.get() else: data = None time.sleep(1) if data: temp_data = [data[0], data[1]] if type(data[2]) == tuple or type(data[2]) == list: temp_data.append(data[2][0]) if len(data[2]) > 1: temp_data.append(data[2][1:]) else: temp_data.append(None) else: temp_data.append(data[2]) temp_data.append(None) dataList.append(tuple(temp_data)) if len(dataList) >= 20: try: results = cls.__batch__request(dataList) last_upload_time = time.time() finally: dataList.clear() if dataList and time.time() - last_upload_time > 5: results = cls.__batch__request(dataList) last_upload_time = time.time() dataList.clear() except Exception as e1: logging.exception(e1) pass if __name__ == "__main__": for i in range(30): RedisUtils.setex_async(0, 'buy_position_info-002547', tool.get_expire(), 1011) RedisUtils.run_loop()