From 3784c8cc817beca104630d6a1e2c2d3fefa44e52 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 06 九月 2023 16:04:36 +0800 Subject: [PATCH] 优化异步redis --- db/redis_manager_delegate.py | 74 ++++++++++++++++++++++++++++-------- 1 files changed, 57 insertions(+), 17 deletions(-) diff --git a/db/redis_manager_delegate.py b/db/redis_manager_delegate.py index 6f45261..638f785 100644 --- a/db/redis_manager_delegate.py +++ b/db/redis_manager_delegate.py @@ -56,6 +56,23 @@ result = middle_api_protocol.request(fdata) return result + # [(db, cmd, key, args)] + @classmethod + def __batch__request(cls, odatas): + _datas = [] + for d in odatas: + data = { + "db": d[0], + "cmd": d[1], + "key": d[2] + } + if d[3] is not None: + data["args"] = d[3] + _datas.append(data) + fdata = middle_api_protocol.load_redis_cmds(_datas) + results = middle_api_protocol.request(fdata) + return results + @classmethod def __get_db(cls, redis_): return redis_.connection_pool.connection_kwargs['db'] @@ -178,31 +195,54 @@ @classmethod def run_loop(cls): logger_system.info("鍚姩Redis鏁版嵁鍚屾鏈嶅姟") + dataList = [] + last_upload_time = time.time() while True: try: data = cls.__async_task_queue.get() if data: - try: - db = data[0] - method_name = data[1] - args = data[2] - _redis = RedisManager(db).getRedisNoPool() - method = getattr(RedisUtils, method_name) - if type(args) == tuple: - args = list(args) - args.insert(0, _redis) - args = tuple(args) - result = method(*args) + temp_data = [data[0], data[1]] + if type(data[2]) == tuple or type(data[2]) == list: + temp_data.append(data[2][0]) + if len(data[2]) > 1: + temp_data.append(data[2][1:]) else: - args = tuple([_redis, args]) - result = method(*args) - except Exception as e2: - logging.exception(e2) - logging.error(data) + temp_data.append(None) + else: + temp_data.append(data[2]) + temp_data.append(None) + dataList.append(tuple(temp_data)) + if len(dataList) >= 20: + results = cls.__batch__request(dataList) + last_upload_time = time.time() + dataList.clear() + if dataList and time.time() - last_upload_time > 5: + results = cls.__batch__request(dataList) + last_upload_time = time.time() + dataList.clear() + # try: + # db = data[0] + # method_name = data[1] + # args = data[2] + # _redis = RedisManager(db).getRedisNoPool() + # method = getattr(RedisUtils, method_name) + # if type(args) == tuple: + # args = list(args) + # args.insert(0, _redis) + # args = tuple(args) + # result = method(*args) + # else: + # args = tuple([_redis, args]) + # result = method(*args) + # except Exception as e2: + # logging.exception(e2) + # logging.error(data) except Exception as e1: logging.exception(e1) pass if __name__ == "__main__": - pass + for i in range(30): + RedisUtils.setex_async(0, 'buy_position_info-002547', tool.get_expire(), 1011) + RedisUtils.run_loop() -- Gitblit v1.8.0