From e0ca7b43c17ebe25e718d5ca229c989f48340015 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 07 八月 2023 16:16:30 +0800 Subject: [PATCH] redis批量提交数据 --- db/redis_manager.py | 216 ++++++++++++++++++------------------------------------ 1 files changed, 72 insertions(+), 144 deletions(-) diff --git a/db/redis_manager.py b/db/redis_manager.py index 467c280..70cab76 100644 --- a/db/redis_manager.py +++ b/db/redis_manager.py @@ -1,6 +1,8 @@ """ redis绠$悊鍣� """ +import logging +import queue import time from threading import Thread @@ -31,214 +33,140 @@ class RedisUtils: + __async_task_queue = queue.Queue() + @classmethod - def get(cls, redis_, key, auto_free=True): + def exec(cls, method_name, key, lamada_method): __start_time = time.time() try: - return redis_.get(key) + return lamada_method() finally: - logger_redis_debug.info("get({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key) + + @classmethod + def get(cls, redis_, key, auto_free=True): + return cls.exec("get", key, lambda: redis_.get(key)) @classmethod def scard(cls, redis_, key, auto_free=True): - __start_time = time.time() - try: - return redis_.scard(key) - finally: - logger_redis_debug.info("scard({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("scard", key, lambda: redis_.scard(key)) @classmethod def delete(cls, redis_, key, auto_free=True, _async=False): - __start_time = time.time() - try: - return redis_.delete(key) - finally: - if not _async: - logger_redis_debug.info("delete({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("delete", key, lambda: redis_.delete(key)) @classmethod - def delete_async(cls, redis_, key, auto_free=True): + def delete_async(cls, db, key, auto_free=True): __start_time = time.time() - Thread(target=lambda: cls.delete(redis_, key, auto_free,_async=True)).start() + cls.add_async_task(db, "delete", (key)) logger_redis_debug.info("delete_async({}):{}", round((time.time() - __start_time) * 1000, 3), key) @classmethod def keys(cls, redis_, key, auto_free=True): - __start_time = time.time() - try: - logger_redis_debug.info("keys(start):{}", key) - return redis_.keys(key) - finally: - logger_redis_debug.info("keys({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("keys", key, lambda: redis_.keys(key)) @classmethod def set(cls, redis_, key, val, auto_free=True): - __start_time = time.time() - try: - return redis_.set(key, val) - finally: - logger_redis_debug.info("set({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("set", key, lambda: redis_.set(key, val)) @classmethod - def setex(cls, redis_, key, expire, val, auto_free=True, _async = False): - __start_time = time.time() - try: - return redis_.setex(key, expire, val) - finally: - if not _async: - logger_redis_debug.info("setex({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + def setex(cls, redis_, key, expire, val, auto_free=True, _async=False): + return cls.exec("setex", key, lambda: redis_.setex(key, expire, val)) @classmethod - def setex_async(cls, redis_, key, expire, val, auto_free=True): + def setex_async(cls, db, key, expire, val, auto_free=True): __start_time = time.time() - Thread(target=lambda: cls.setex(redis_, key, expire, val, auto_free,_async=True)).start() + cls.add_async_task(db, "setex", (key, expire, val)) logger_redis_debug.info("setex_async({}):{}", round((time.time() - __start_time) * 1000, 3), key) @classmethod def setnx(cls, redis_, key, val, auto_free=True): - __start_time = time.time() - try: - return redis_.setnx(key, val) - finally: - logger_redis_debug.info("setnx({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("setnx", key, lambda: redis_.setnx(key, val)) @classmethod def expire(cls, redis_, key, expire, auto_free=True): - __start_time = time.time() - try: - return redis_.expire(key, expire) - finally: - logger_redis_debug.info("expire({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("expire", key, lambda: redis_.expire(key, expire)) @classmethod def sadd(cls, redis_, key, val, auto_free=True): - __start_time = time.time() - try: - return redis_.sadd(key, val) - finally: - logger_redis_debug.info("sadd({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("sadd", key, lambda: redis_.sadd(key, val)) @classmethod def sismember(cls, redis_, key, val, auto_free=True): - __start_time = time.time() - try: - return redis_.sismember(key, val) - finally: - logger_redis_debug.info("sismember({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("sismember", key, lambda: redis_.sismember(key, val)) @classmethod def smembers(cls, redis_, key, auto_free=True): - __start_time = time.time() - try: - return redis_.smembers(key) - finally: - logger_redis_debug.info("smembers({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("smembers", key, lambda: redis_.smembers(key)) @classmethod def srem(cls, redis_, key, val, auto_free=True): - __start_time = time.time() - try: - return redis_.srem(key, val) - finally: - logger_redis_debug.info("srem({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("srem", key, lambda: redis_.srem(key, val)) @classmethod def incrby(cls, redis_, key, num, auto_free=True, _async=False): - __start_time = time.time() - try: - return redis_.incrby(key, num) - finally: - if not _async: - logger_redis_debug.info("incrby({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("incrby", key, lambda: redis_.incrby(key, num)) @classmethod - def incrby_async(cls, redis_, key, num, auto_free=True): + def incrby_async(cls, db, key, num, auto_free=True): __start_time = time.time() - Thread(target=lambda: cls.incrby(redis_, key, num, auto_free)).start() + cls.add_async_task(db, "incrby", (key, num)) logger_redis_debug.info("incrby_async({}):{}", round((time.time() - __start_time) * 1000, 3), key) @classmethod def lpush(cls, redis_, key, val, auto_free=True): - __start_time = time.time() - try: - - return redis_.lpush(key, val) - finally: - logger_redis_debug.info("lpush({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("lpush", key, lambda: redis_.lpush(key, val)) @classmethod def lpop(cls, redis_, key, auto_free=True): - __start_time = time.time() - try: - - return redis_.lpop(key) - finally: - logger_redis_debug.info("lpop({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("lpop", key, lambda: redis_.lpop(key)) @classmethod def rpush(cls, redis_, key, val, auto_free=True): - __start_time = time.time() - try: - - return redis_.rpush(key, val) - finally: - logger_redis_debug.info("rpush({}):{}", round((time.time() - __start_time) * 1000, 3), key) - if auto_free: - # redis_.connection_pool.disconnect() - pass + return cls.exec("rpush", key, lambda: redis_.rpush(key, val)) @classmethod def realse(cls, redis_): pass + @classmethod + def add_async_task(cls, db, method, args): + cls.__async_task_queue.put_nowait((db, method, args)) + + @classmethod + def get_async_task_count(cls): + return cls.__async_task_queue.qsize() + + # 杩愯寮傛浠诲姟 + @classmethod + def run_loop(cls): + while True: + try: + data = cls.__async_task_queue.get() + if data: + db = data[0] + method_name = data[1] + print(db,method_name) + args = data[2] + _redis = RedisManager(db).getRedisNoPool() + method = getattr(_redis, method_name) + if type(args) == tuple: + result = method(*args) + print(result) + else: + result = method(args) + print(result) + + except Exception as e1: + logging.exception(e1) + pass + if __name__ == "__main__": - time1 = time.time() - RedisUtils.setex_async(RedisManager(0).getRedis(), "test123123", tool.get_expire(), "123213") - print(time.time() - time1) - input() + RedisUtils.setex_async(0, "test", tool.get_expire(), "123123") + print("澶у皬",RedisUtils.get_async_task_count()) + RedisUtils.incrby_async(0, "test_1", 1) + print("澶у皬", RedisUtils.get_async_task_count()) + RedisUtils.delete_async(0, "test") + print("澶у皬", RedisUtils.get_async_task_count()) + RedisUtils.run_loop() -- Gitblit v1.8.0