From 506e43eb1b1c0bb4571cd7f3ce00ee74613cb920 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 20 八月 2025 15:57:19 +0800 Subject: [PATCH] bug修复 --- db/redis_manager_delegate.py | 130 +++++++++++++++++++++++++++---------------- 1 files changed, 81 insertions(+), 49 deletions(-) diff --git a/db/redis_manager_delegate.py b/db/redis_manager_delegate.py index ece402e..68f9de6 100644 --- a/db/redis_manager_delegate.py +++ b/db/redis_manager_delegate.py @@ -1,16 +1,20 @@ """ redis绠$悊鍣� """ +import builtins +import json import logging import queue -import threading import time import redis import constant +from log_module import async_log_util from log_module.log import logger_redis_debug, logger_system from utils import tool, middle_api_protocol + +from db import redis_manager config = constant.REDIS_CONFIG @@ -33,11 +37,13 @@ class RedisUtils: - __async_task_queue = queue.Queue() + # 鏈満鎵цredis + __LOCAL_REQUEST = True + + __async_task_queue = queue.Queue(maxsize=10240) @classmethod def exec(cls, method_name, key, lamada_method): - __start_time = time.time() try: return lamada_method() finally: @@ -46,33 +52,70 @@ @classmethod def __request(cls, db, cmd, key, args=None): - data = { - "db": db, - "cmd": cmd, - "key": key, - } - if args is not None: - data["args"] = args - fdata = middle_api_protocol.load_redis_cmd(data) - result = middle_api_protocol.request(fdata) - return result + if cls.__LOCAL_REQUEST: + redis = RedisManager(db).getRedis() + method = getattr(redis_manager.RedisUtils, cmd) + args_ = [redis, key] + if args is not None: + if builtins.type(args) == tuple or builtins.type(args) == list: + args = list(args) + if cmd == "setex": + args_.append(args[0]) + if type(args[1]) == list: + args_.append(json.dumps(args[1])) + else: + args_.append(args[1]) + else: + for a in args: + args_.append(a) + else: + args_.append(args) + args_ = tuple(args_) + result = method(*args_) + if builtins.type(result) == set: + result = list(result) + return result + else: + data = { + "db": db, + "cmd": cmd, + "key": key, + } + if args is not None: + data["args"] = args + fdata = middle_api_protocol.load_redis_cmd(data) + result = middle_api_protocol.request(fdata) + return result # [(db, cmd, key, args)] @classmethod def __batch__request(cls, odatas): - _datas = [] - for d in odatas: - data = { - "db": d[0], - "cmd": d[1], - "key": d[2] - } - if d[3] is not None: - data["args"] = d[3] - _datas.append(data) - fdata = middle_api_protocol.load_redis_cmds(_datas) - results = middle_api_protocol.request(fdata) - return results + if cls.__LOCAL_REQUEST: + result_list = [] + for d in odatas: + db = d[0] + cmd = d[1] + key = d[2] + args = None + if d[3] is not None: + args = d[3] + result = cls.__request(db, cmd, key, args) + result_list.append(result) + return result_list + else: + _datas = [] + for d in odatas: + data = { + "db": d[0], + "cmd": d[1], + "key": d[2] + } + if d[3] is not None: + data["args"] = d[3] + _datas.append(data) + fdata = middle_api_protocol.load_redis_cmds(_datas) + results = middle_api_protocol.request(fdata) + return results @classmethod def __get_db(cls, redis_): @@ -97,6 +140,7 @@ @classmethod def keys(cls, redis_, key, auto_free=True): + async_log_util.info(logger_redis_debug, f"keys:{key}") return cls.exec("keys", key, lambda: cls.__request(cls.__get_db(redis_), "keys", key)) @classmethod @@ -185,8 +229,11 @@ pass @classmethod - def add_async_task(cls, db, method, args): - cls.__async_task_queue.put_nowait((db, method, args)) + def add_async_task(cls, db: int, method, args): + try: + cls.__async_task_queue.put_nowait((db, method, args)) + except Exception as e: + async_log_util.error(logger_redis_debug, f"鍔犲叆闃熷垪鍑洪敊锛歿str(e)}") @classmethod def get_async_task_count(cls): @@ -196,7 +243,7 @@ @classmethod def run_loop(cls): logger_system.info("鍚姩Redis鏁版嵁鍚屾鏈嶅姟") - logger_system.debug(f"绾跨▼ID锛宺edis锛歿threading.get_ident()}") + logger_system.info(f"redis 绾跨▼ID:{tool.get_thread_id()}") dataList = [] last_upload_time = time.time() while True: @@ -215,30 +262,15 @@ temp_data.append(None) dataList.append(tuple(temp_data)) if len(dataList) >= 20: - results = cls.__batch__request(dataList) - last_upload_time = time.time() - dataList.clear() + try: + results = cls.__batch__request(dataList) + last_upload_time = time.time() + finally: + dataList.clear() if dataList and time.time() - last_upload_time > 5: results = cls.__batch__request(dataList) last_upload_time = time.time() dataList.clear() - # try: - # db = data[0] - # method_name = data[1] - # args = data[2] - # _redis = RedisManager(db).getRedisNoPool() - # method = getattr(RedisUtils, method_name) - # if type(args) == tuple: - # args = list(args) - # args.insert(0, _redis) - # args = tuple(args) - # result = method(*args) - # else: - # args = tuple([_redis, args]) - # result = method(*args) - # except Exception as e2: - # logging.exception(e2) - # logging.error(data) except Exception as e1: logging.exception(e1) pass -- Gitblit v1.8.0