From e0ca7b43c17ebe25e718d5ca229c989f48340015 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 07 八月 2023 16:16:30 +0800
Subject: [PATCH] redis批量提交数据

---
 db/redis_manager.py |  216 ++++++++++++++++++------------------------------------
 1 files changed, 72 insertions(+), 144 deletions(-)

diff --git a/db/redis_manager.py b/db/redis_manager.py
index 467c280..70cab76 100644
--- a/db/redis_manager.py
+++ b/db/redis_manager.py
@@ -1,6 +1,8 @@
 """
 redis绠$悊鍣�
 """
+import logging
+import queue
 import time
 from threading import Thread
 
@@ -31,214 +33,140 @@
 
 
 class RedisUtils:
+    __async_task_queue = queue.Queue()
+
     @classmethod
-    def get(cls, redis_, key, auto_free=True):
+    def exec(cls, method_name, key, lamada_method):
         __start_time = time.time()
         try:
-            return redis_.get(key)
+            return lamada_method()
         finally:
-            logger_redis_debug.info("get({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+            logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key)
+
+    @classmethod
+    def get(cls, redis_, key, auto_free=True):
+        return cls.exec("get", key, lambda: redis_.get(key))
 
     @classmethod
     def scard(cls, redis_, key, auto_free=True):
-        __start_time = time.time()
-        try:
-            return redis_.scard(key)
-        finally:
-            logger_redis_debug.info("scard({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("scard", key, lambda: redis_.scard(key))
 
     @classmethod
     def delete(cls, redis_, key, auto_free=True, _async=False):
-        __start_time = time.time()
-        try:
-            return redis_.delete(key)
-        finally:
-            if not _async:
-                logger_redis_debug.info("delete({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("delete", key, lambda: redis_.delete(key))
 
     @classmethod
-    def delete_async(cls, redis_, key, auto_free=True):
+    def delete_async(cls, db, key, auto_free=True):
         __start_time = time.time()
-        Thread(target=lambda: cls.delete(redis_, key, auto_free,_async=True)).start()
+        cls.add_async_task(db, "delete", (key))
         logger_redis_debug.info("delete_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
 
     @classmethod
     def keys(cls, redis_, key, auto_free=True):
-        __start_time = time.time()
-        try:
-            logger_redis_debug.info("keys(start):{}", key)
-            return redis_.keys(key)
-        finally:
-            logger_redis_debug.info("keys({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("keys", key, lambda: redis_.keys(key))
 
     @classmethod
     def set(cls, redis_, key, val, auto_free=True):
-        __start_time = time.time()
-        try:
-            return redis_.set(key, val)
-        finally:
-            logger_redis_debug.info("set({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("set", key, lambda: redis_.set(key, val))
 
     @classmethod
-    def setex(cls, redis_, key, expire, val, auto_free=True, _async = False):
-        __start_time = time.time()
-        try:
-            return redis_.setex(key, expire, val)
-        finally:
-            if not _async:
-                logger_redis_debug.info("setex({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+    def setex(cls, redis_, key, expire, val, auto_free=True, _async=False):
+        return cls.exec("setex", key, lambda: redis_.setex(key, expire, val))
 
     @classmethod
-    def setex_async(cls, redis_, key, expire, val, auto_free=True):
+    def setex_async(cls, db, key, expire, val, auto_free=True):
         __start_time = time.time()
-        Thread(target=lambda: cls.setex(redis_, key, expire, val, auto_free,_async=True)).start()
+        cls.add_async_task(db, "setex", (key, expire, val))
         logger_redis_debug.info("setex_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
 
     @classmethod
     def setnx(cls, redis_, key, val, auto_free=True):
-        __start_time = time.time()
-        try:
-            return redis_.setnx(key, val)
-        finally:
-            logger_redis_debug.info("setnx({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("setnx", key, lambda: redis_.setnx(key, val))
 
     @classmethod
     def expire(cls, redis_, key, expire, auto_free=True):
-        __start_time = time.time()
-        try:
-            return redis_.expire(key, expire)
-        finally:
-            logger_redis_debug.info("expire({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("expire", key, lambda: redis_.expire(key, expire))
 
     @classmethod
     def sadd(cls, redis_, key, val, auto_free=True):
-        __start_time = time.time()
-        try:
-            return redis_.sadd(key, val)
-        finally:
-            logger_redis_debug.info("sadd({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("sadd", key, lambda: redis_.sadd(key, val))
 
     @classmethod
     def sismember(cls, redis_, key, val, auto_free=True):
-        __start_time = time.time()
-        try:
-            return redis_.sismember(key, val)
-        finally:
-            logger_redis_debug.info("sismember({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("sismember", key, lambda: redis_.sismember(key, val))
 
     @classmethod
     def smembers(cls, redis_, key, auto_free=True):
-        __start_time = time.time()
-        try:
-            return redis_.smembers(key)
-        finally:
-            logger_redis_debug.info("smembers({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("smembers", key, lambda: redis_.smembers(key))
 
     @classmethod
     def srem(cls, redis_, key, val, auto_free=True):
-        __start_time = time.time()
-        try:
-            return redis_.srem(key, val)
-        finally:
-            logger_redis_debug.info("srem({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("srem", key, lambda: redis_.srem(key, val))
 
     @classmethod
     def incrby(cls, redis_, key, num, auto_free=True, _async=False):
-        __start_time = time.time()
-        try:
-            return redis_.incrby(key, num)
-        finally:
-            if not _async:
-                logger_redis_debug.info("incrby({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("incrby", key, lambda: redis_.incrby(key, num))
 
     @classmethod
-    def incrby_async(cls, redis_, key, num, auto_free=True):
+    def incrby_async(cls, db, key, num, auto_free=True):
         __start_time = time.time()
-        Thread(target=lambda: cls.incrby(redis_, key, num, auto_free)).start()
+        cls.add_async_task(db, "incrby", (key, num))
         logger_redis_debug.info("incrby_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
 
     @classmethod
     def lpush(cls, redis_, key, val, auto_free=True):
-        __start_time = time.time()
-        try:
-
-            return redis_.lpush(key, val)
-        finally:
-            logger_redis_debug.info("lpush({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("lpush", key, lambda: redis_.lpush(key, val))
 
     @classmethod
     def lpop(cls, redis_, key, auto_free=True):
-        __start_time = time.time()
-        try:
-
-            return redis_.lpop(key)
-        finally:
-            logger_redis_debug.info("lpop({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("lpop", key, lambda: redis_.lpop(key))
 
     @classmethod
     def rpush(cls, redis_, key, val, auto_free=True):
-        __start_time = time.time()
-        try:
-
-            return redis_.rpush(key, val)
-        finally:
-            logger_redis_debug.info("rpush({}):{}", round((time.time() - __start_time) * 1000, 3), key)
-            if auto_free:
-                # redis_.connection_pool.disconnect()
-                pass
+        return cls.exec("rpush", key, lambda: redis_.rpush(key, val))
 
     @classmethod
     def realse(cls, redis_):
         pass
 
+    @classmethod
+    def add_async_task(cls, db, method, args):
+        cls.__async_task_queue.put_nowait((db, method, args))
+
+    @classmethod
+    def get_async_task_count(cls):
+        return cls.__async_task_queue.qsize()
+
+    # 杩愯寮傛浠诲姟
+    @classmethod
+    def run_loop(cls):
+        while True:
+            try:
+                data = cls.__async_task_queue.get()
+                if data:
+                    db = data[0]
+                    method_name = data[1]
+                    print(db,method_name)
+                    args = data[2]
+                    _redis = RedisManager(db).getRedisNoPool()
+                    method = getattr(_redis, method_name)
+                    if type(args) == tuple:
+                        result = method(*args)
+                        print(result)
+                    else:
+                        result = method(args)
+                        print(result)
+
+            except Exception as e1:
+                logging.exception(e1)
+                pass
+
 
 if __name__ == "__main__":
-    time1 = time.time()
-    RedisUtils.setex_async(RedisManager(0).getRedis(), "test123123", tool.get_expire(), "123213")
-    print(time.time() - time1)
-    input()
+    RedisUtils.setex_async(0, "test", tool.get_expire(), "123123")
+    print("澶у皬",RedisUtils.get_async_task_count())
+    RedisUtils.incrby_async(0, "test_1", 1)
+    print("澶у皬", RedisUtils.get_async_task_count())
+    RedisUtils.delete_async(0, "test")
+    print("澶у皬", RedisUtils.get_async_task_count())
+    RedisUtils.run_loop()

--
Gitblit v1.8.0