Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
db/redis_manager_delegate.py
@@ -1,16 +1,20 @@
"""
redis管理器
"""
import builtins
import json
import logging
import queue
import threading
import time
import redis
import constant
from log_module import async_log_util
from log_module.log import logger_redis_debug, logger_system
from utils import tool, middle_api_protocol
from db import redis_manager
config = constant.REDIS_CONFIG
@@ -33,11 +37,13 @@
class RedisUtils:
    __async_task_queue = queue.Queue()
    # 本机执行redis
    __LOCAL_REQUEST = True
    __async_task_queue = queue.Queue(maxsize=10240)
    @classmethod
    def exec(cls, method_name, key, lamada_method):
        __start_time = time.time()
        try:
            return lamada_method()
        finally:
@@ -46,33 +52,70 @@
    @classmethod
    def __request(cls, db, cmd, key, args=None):
        data = {
            "db": db,
            "cmd": cmd,
            "key": key,
        }
        if args is not None:
            data["args"] = args
        fdata = middle_api_protocol.load_redis_cmd(data)
        result = middle_api_protocol.request(fdata)
        return result
        if cls.__LOCAL_REQUEST:
            redis = RedisManager(db).getRedis()
            method = getattr(redis_manager.RedisUtils, cmd)
            args_ = [redis, key]
            if args is not None:
                if builtins.type(args) == tuple or builtins.type(args) == list:
                    args = list(args)
                    if cmd == "setex":
                        args_.append(args[0])
                        if type(args[1]) == list:
                            args_.append(json.dumps(args[1]))
                        else:
                            args_.append(args[1])
                    else:
                        for a in args:
                            args_.append(a)
                else:
                    args_.append(args)
            args_ = tuple(args_)
            result = method(*args_)
            if builtins.type(result) == set:
                result = list(result)
            return result
        else:
            data = {
                "db": db,
                "cmd": cmd,
                "key": key,
            }
            if args is not None:
                data["args"] = args
            fdata = middle_api_protocol.load_redis_cmd(data)
            result = middle_api_protocol.request(fdata)
            return result
    # [(db, cmd, key, args)]
    @classmethod
    def __batch__request(cls, odatas):
        _datas = []
        for d in odatas:
            data = {
                "db": d[0],
                "cmd": d[1],
                "key": d[2]
            }
            if d[3] is not None:
                data["args"] = d[3]
            _datas.append(data)
        fdata = middle_api_protocol.load_redis_cmds(_datas)
        results = middle_api_protocol.request(fdata)
        return results
        if cls.__LOCAL_REQUEST:
            result_list = []
            for d in odatas:
                db = d[0]
                cmd = d[1]
                key = d[2]
                args = None
                if d[3] is not None:
                    args = d[3]
                result = cls.__request(db, cmd, key, args)
                result_list.append(result)
            return result_list
        else:
            _datas = []
            for d in odatas:
                data = {
                    "db": d[0],
                    "cmd": d[1],
                    "key": d[2]
                }
                if d[3] is not None:
                    data["args"] = d[3]
                _datas.append(data)
            fdata = middle_api_protocol.load_redis_cmds(_datas)
            results = middle_api_protocol.request(fdata)
            return results
    @classmethod
    def __get_db(cls, redis_):
@@ -97,6 +140,7 @@
    @classmethod
    def keys(cls, redis_, key, auto_free=True):
        async_log_util.info(logger_redis_debug, f"keys:{key}")
        return cls.exec("keys", key, lambda: cls.__request(cls.__get_db(redis_), "keys", key))
    @classmethod
@@ -186,7 +230,10 @@
    @classmethod
    def add_async_task(cls, db: int, method, args):
        cls.__async_task_queue.put_nowait((db, method, args))
        try:
            cls.__async_task_queue.put_nowait((db, method, args))
        except Exception as e:
            async_log_util.error(logger_redis_debug, f"加入队列出错:{str(e)}")
    @classmethod
    def get_async_task_count(cls):
@@ -215,30 +262,15 @@
                        temp_data.append(None)
                    dataList.append(tuple(temp_data))
                    if len(dataList) >= 20:
                        results = cls.__batch__request(dataList)
                        last_upload_time = time.time()
                        dataList.clear()
                        try:
                            results = cls.__batch__request(dataList)
                            last_upload_time = time.time()
                        finally:
                            dataList.clear()
                if dataList and time.time() - last_upload_time > 5:
                    results = cls.__batch__request(dataList)
                    last_upload_time = time.time()
                    dataList.clear()
                    # try:
                    #     db = data[0]
                    #     method_name = data[1]
                    #     args = data[2]
                    #     _redis = RedisManager(db).getRedisNoPool()
                    #     method = getattr(RedisUtils, method_name)
                    #     if type(args) == tuple:
                    #         args = list(args)
                    #         args.insert(0, _redis)
                    #         args = tuple(args)
                    #         result = method(*args)
                    #     else:
                    #         args = tuple([_redis, args])
                    #         result = method(*args)
                    # except Exception as e2:
                    #     logging.exception(e2)
                    #     logging.error(data)
            except Exception as e1:
                logging.exception(e1)
                pass