Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
db/redis_manager_delegate.py
@@ -1,6 +1,8 @@
"""
redis管理器
"""
import builtins
import json
import logging
import queue
import time
@@ -8,8 +10,11 @@
import redis
import constant
from log_module.log import logger_redis_debug
from log_module import async_log_util
from log_module.log import logger_redis_debug, logger_system
from utils import tool, middle_api_protocol
from db import redis_manager
config = constant.REDIS_CONFIG
@@ -32,28 +37,85 @@
class RedisUtils:
    __async_task_queue = queue.Queue()
    # 本机执行redis
    __LOCAL_REQUEST = True
    __async_task_queue = queue.Queue(maxsize=10240)
    @classmethod
    def exec(cls, method_name, key, lamada_method):
        __start_time = time.time()
        try:
            return lamada_method()
        finally:
            logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key)
            # logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key)
            pass
    @classmethod
    def __request(cls, db, cmd, key, args=None):
        data = {
            "db": db,
            "cmd": cmd,
            "key": key,
        }
        if args:
            data["args"] = args
        fdata = middle_api_protocol.load_redis_cmd(data)
        result = middle_api_protocol.request(fdata)
        return result
        if cls.__LOCAL_REQUEST:
            redis = RedisManager(db).getRedis()
            method = getattr(redis_manager.RedisUtils, cmd)
            args_ = [redis, key]
            if args is not None:
                if builtins.type(args) == tuple or builtins.type(args) == list:
                    args = list(args)
                    if cmd == "setex":
                        args_.append(args[0])
                        if type(args[1]) == list:
                            args_.append(json.dumps(args[1]))
                        else:
                            args_.append(args[1])
                    else:
                        for a in args:
                            args_.append(a)
                else:
                    args_.append(args)
            args_ = tuple(args_)
            result = method(*args_)
            if builtins.type(result) == set:
                result = list(result)
            return result
        else:
            data = {
                "db": db,
                "cmd": cmd,
                "key": key,
            }
            if args is not None:
                data["args"] = args
            fdata = middle_api_protocol.load_redis_cmd(data)
            result = middle_api_protocol.request(fdata)
            return result
    # [(db, cmd, key, args)]
    @classmethod
    def __batch__request(cls, odatas):
        if cls.__LOCAL_REQUEST:
            result_list = []
            for d in odatas:
                db = d[0]
                cmd = d[1]
                key = d[2]
                args = None
                if d[3] is not None:
                    args = d[3]
                result = cls.__request(db, cmd, key, args)
                result_list.append(result)
            return result_list
        else:
            _datas = []
            for d in odatas:
                data = {
                    "db": d[0],
                    "cmd": d[1],
                    "key": d[2]
                }
                if d[3] is not None:
                    data["args"] = d[3]
                _datas.append(data)
            fdata = middle_api_protocol.load_redis_cmds(_datas)
            results = middle_api_protocol.request(fdata)
            return results
    @classmethod
    def __get_db(cls, redis_):
@@ -74,15 +136,21 @@
    @classmethod
    def delete_async(cls, db, key, auto_free=True):
        cls.add_async_task(db, "delete", (key))
        logger_redis_debug.info("delete_async({}):{}", 0, key)
        # logger_redis_debug.info("delete_async({}):{}", 0, key)
    @classmethod
    def keys(cls, redis_, key, auto_free=True):
        async_log_util.info(logger_redis_debug, f"keys:{key}")
        return cls.exec("keys", key, lambda: cls.__request(cls.__get_db(redis_), "keys", key))
    @classmethod
    def set(cls, redis_, key, val, auto_free=True):
        return cls.exec("set", key, lambda: cls.__request(cls.__get_db(redis_), "set", key, val))
    @classmethod
    def set_async(cls, db, key, val, auto_free=True):
        cls.add_async_task(db, "set", (key, val))
        # logger_redis_debug.info("setex_async({}):{}", 0, key)
    @classmethod
    def setex(cls, redis_, key, expire, val, auto_free=True, _async=False):
@@ -91,7 +159,7 @@
    @classmethod
    def setex_async(cls, db, key, expire, val, auto_free=True):
        cls.add_async_task(db, "setex", (key, expire, val))
        logger_redis_debug.info("setex_async({}):{}", 0, key)
        # logger_redis_debug.info("setex_async({}):{}", 0, key)
    @classmethod
    def setnx(cls, redis_, key, val, auto_free=True):
@@ -104,7 +172,7 @@
    @classmethod
    def expire_async(cls, db, key, expire, auto_free=True):
        cls.add_async_task(db, "expire", (key, expire))
        logger_redis_debug.info("expire_async({}):{}", 0, key)
        # logger_redis_debug.info("expire_async({}):{}", 0, key)
    @classmethod
    def sadd(cls, redis_, key, val, auto_free=True):
@@ -113,7 +181,7 @@
    @classmethod
    def sadd_async(cls, db, key, val, auto_free=True):
        cls.add_async_task(db, "sadd", (key, val))
        logger_redis_debug.info("sadd_async({}):{}", 0, key)
        # logger_redis_debug.info("sadd_async({}):{}", 0, key)
    @classmethod
    def sismember(cls, redis_, key, val, auto_free=True):
@@ -133,7 +201,7 @@
    @classmethod
    def srem_async(cls, db, key, val, auto_free=True):
        cls.add_async_task(db, "srem", (key, val))
        logger_redis_debug.info("srem_async({}):{}", 0, key)
        # logger_redis_debug.info("srem_async({}):{}", 0, key)
    @classmethod
    def incrby(cls, redis_, key, num, auto_free=True, _async=False):
@@ -142,7 +210,7 @@
    @classmethod
    def incrby_async(cls, db, key, num, auto_free=True):
        cls.add_async_task(db, "incrby", (key, num))
        logger_redis_debug.info("incrby_async({}):{}", 0, key)
        # logger_redis_debug.info("incrby_async({}):{}", 0, key)
    @classmethod
    def lpush(cls, redis_, key, val, auto_free=True):
@@ -161,8 +229,11 @@
        pass
    @classmethod
    def add_async_task(cls, db, method, args):
        cls.__async_task_queue.put_nowait((db, method, args))
    def add_async_task(cls, db: int, method, args):
        try:
            cls.__async_task_queue.put_nowait((db, method, args))
        except Exception as e:
            async_log_util.error(logger_redis_debug, f"加入队列出错:{str(e)}")
    @classmethod
    def get_async_task_count(cls):
@@ -171,28 +242,41 @@
    # 运行异步任务
    @classmethod
    def run_loop(cls):
        logger_system.info("启动Redis数据同步服务")
        logger_system.info(f"redis 线程ID:{tool.get_thread_id()}")
        dataList = []
        last_upload_time = time.time()
        while True:
            try:
                data = cls.__async_task_queue.get()
                if data:
                    db = data[0]
                    method_name = data[1]
                    args = data[2]
                    _redis = RedisManager(db).getRedisNoPool()
                    method = getattr(RedisUtils, method_name)
                    if type(args) == tuple:
                        args = list(args)
                        args.insert(0, _redis)
                        args = tuple(args)
                        result = method(*args)
                    temp_data = [data[0], data[1]]
                    if type(data[2]) == tuple or type(data[2]) == list:
                        temp_data.append(data[2][0])
                        if len(data[2]) > 1:
                            temp_data.append(data[2][1:])
                        else:
                            temp_data.append(None)
                    else:
                        args = tuple([_redis, args])
                        result = method(*args)
                        temp_data.append(data[2])
                        temp_data.append(None)
                    dataList.append(tuple(temp_data))
                    if len(dataList) >= 20:
                        try:
                            results = cls.__batch__request(dataList)
                            last_upload_time = time.time()
                        finally:
                            dataList.clear()
                if dataList and time.time() - last_upload_time > 5:
                    results = cls.__batch__request(dataList)
                    last_upload_time = time.time()
                    dataList.clear()
            except Exception as e1:
                logging.exception(e1)
                pass
if __name__ == "__main__":
    pass
    for i in range(30):
        RedisUtils.setex_async(0, 'buy_position_info-002547', tool.get_expire(), 1011)
    RedisUtils.run_loop()