| | |
| | | """ |
| | | redis管理器 |
| | | """ |
| | | import builtins |
| | | import json |
| | | import logging |
| | | import queue |
| | | import threading |
| | | import time |
| | | |
| | | import redis |
| | | |
| | | import constant |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_redis_debug, logger_system |
| | | from utils import tool, middle_api_protocol |
| | | |
| | | from db import redis_manager |
| | | |
| | | config = constant.REDIS_CONFIG |
| | | |
| | |
| | | |
| | | |
| | | class RedisUtils: |
| | | __async_task_queue = queue.Queue() |
| | | # 本机执行redis |
| | | __LOCAL_REQUEST = True |
| | | |
| | | __async_task_queue = queue.Queue(maxsize=10240) |
| | | |
| | | @classmethod |
| | | def exec(cls, method_name, key, lamada_method): |
| | | __start_time = time.time() |
| | | try: |
| | | return lamada_method() |
| | | finally: |
| | |
| | | |
| | | @classmethod |
| | | def __request(cls, db, cmd, key, args=None): |
| | | data = { |
| | | "db": db, |
| | | "cmd": cmd, |
| | | "key": key, |
| | | } |
| | | if args is not None: |
| | | data["args"] = args |
| | | fdata = middle_api_protocol.load_redis_cmd(data) |
| | | result = middle_api_protocol.request(fdata) |
| | | return result |
| | | if cls.__LOCAL_REQUEST: |
| | | redis = RedisManager(db).getRedis() |
| | | method = getattr(redis_manager.RedisUtils, cmd) |
| | | args_ = [redis, key] |
| | | if args is not None: |
| | | if builtins.type(args) == tuple or builtins.type(args) == list: |
| | | args = list(args) |
| | | if cmd == "setex": |
| | | args_.append(args[0]) |
| | | if type(args[1]) == list: |
| | | args_.append(json.dumps(args[1])) |
| | | else: |
| | | args_.append(args[1]) |
| | | else: |
| | | for a in args: |
| | | args_.append(a) |
| | | else: |
| | | args_.append(args) |
| | | args_ = tuple(args_) |
| | | result = method(*args_) |
| | | if builtins.type(result) == set: |
| | | result = list(result) |
| | | return result |
| | | else: |
| | | data = { |
| | | "db": db, |
| | | "cmd": cmd, |
| | | "key": key, |
| | | } |
| | | if args is not None: |
| | | data["args"] = args |
| | | fdata = middle_api_protocol.load_redis_cmd(data) |
| | | result = middle_api_protocol.request(fdata) |
| | | return result |
| | | |
| | | # [(db, cmd, key, args)] |
| | | @classmethod |
| | | def __batch__request(cls, odatas): |
| | | _datas = [] |
| | | for d in odatas: |
| | | data = { |
| | | "db": d[0], |
| | | "cmd": d[1], |
| | | "key": d[2] |
| | | } |
| | | if d[3] is not None: |
| | | data["args"] = d[3] |
| | | _datas.append(data) |
| | | fdata = middle_api_protocol.load_redis_cmds(_datas) |
| | | results = middle_api_protocol.request(fdata) |
| | | return results |
| | | if cls.__LOCAL_REQUEST: |
| | | result_list = [] |
| | | for d in odatas: |
| | | db = d[0] |
| | | cmd = d[1] |
| | | key = d[2] |
| | | args = None |
| | | if d[3] is not None: |
| | | args = d[3] |
| | | result = cls.__request(db, cmd, key, args) |
| | | result_list.append(result) |
| | | return result_list |
| | | else: |
| | | _datas = [] |
| | | for d in odatas: |
| | | data = { |
| | | "db": d[0], |
| | | "cmd": d[1], |
| | | "key": d[2] |
| | | } |
| | | if d[3] is not None: |
| | | data["args"] = d[3] |
| | | _datas.append(data) |
| | | fdata = middle_api_protocol.load_redis_cmds(_datas) |
| | | results = middle_api_protocol.request(fdata) |
| | | return results |
| | | |
| | | @classmethod |
| | | def __get_db(cls, redis_): |
| | |
| | | |
| | | @classmethod |
| | | def keys(cls, redis_, key, auto_free=True): |
| | | async_log_util.info(logger_redis_debug, f"keys:{key}") |
| | | return cls.exec("keys", key, lambda: cls.__request(cls.__get_db(redis_), "keys", key)) |
| | | |
| | | @classmethod |
| | |
| | | |
| | | @classmethod |
| | | def add_async_task(cls, db: int, method, args): |
| | | cls.__async_task_queue.put_nowait((db, method, args)) |
| | | try: |
| | | cls.__async_task_queue.put_nowait((db, method, args)) |
| | | except Exception as e: |
| | | async_log_util.error(logger_redis_debug, f"加入队列出错:{str(e)}") |
| | | |
| | | @classmethod |
| | | def get_async_task_count(cls): |
| | |
| | | temp_data.append(None) |
| | | dataList.append(tuple(temp_data)) |
| | | if len(dataList) >= 20: |
| | | results = cls.__batch__request(dataList) |
| | | last_upload_time = time.time() |
| | | dataList.clear() |
| | | try: |
| | | results = cls.__batch__request(dataList) |
| | | last_upload_time = time.time() |
| | | finally: |
| | | dataList.clear() |
| | | if dataList and time.time() - last_upload_time > 5: |
| | | results = cls.__batch__request(dataList) |
| | | last_upload_time = time.time() |
| | | dataList.clear() |
| | | # try: |
| | | # db = data[0] |
| | | # method_name = data[1] |
| | | # args = data[2] |
| | | # _redis = RedisManager(db).getRedisNoPool() |
| | | # method = getattr(RedisUtils, method_name) |
| | | # if type(args) == tuple: |
| | | # args = list(args) |
| | | # args.insert(0, _redis) |
| | | # args = tuple(args) |
| | | # result = method(*args) |
| | | # else: |
| | | # args = tuple([_redis, args]) |
| | | # result = method(*args) |
| | | # except Exception as e2: |
| | | # logging.exception(e2) |
| | | # logging.error(data) |
| | | except Exception as e1: |
| | | logging.exception(e1) |
| | | pass |