From 6bbfbbb16d792f7737ec86cabdba5c0e98dcf4b4 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 29 八月 2025 17:41:29 +0800 Subject: [PATCH] 有涨停买撤单要触发撤单计算 --- db/redis_manager_delegate.py | 141 ++++++++++++++++++++++++++++++++++++---------- 1 files changed, 110 insertions(+), 31 deletions(-) diff --git a/db/redis_manager_delegate.py b/db/redis_manager_delegate.py index a461259..68f9de6 100644 --- a/db/redis_manager_delegate.py +++ b/db/redis_manager_delegate.py @@ -1,6 +1,8 @@ """ redis绠$悊鍣� """ +import builtins +import json import logging import queue import time @@ -8,8 +10,11 @@ import redis import constant +from log_module import async_log_util from log_module.log import logger_redis_debug, logger_system from utils import tool, middle_api_protocol + +from db import redis_manager config = constant.REDIS_CONFIG @@ -32,11 +37,13 @@ class RedisUtils: - __async_task_queue = queue.Queue() + # 鏈満鎵цredis + __LOCAL_REQUEST = True + + __async_task_queue = queue.Queue(maxsize=10240) @classmethod def exec(cls, method_name, key, lamada_method): - __start_time = time.time() try: return lamada_method() finally: @@ -45,16 +52,70 @@ @classmethod def __request(cls, db, cmd, key, args=None): - data = { - "db": db, - "cmd": cmd, - "key": key, - } - if args is not None: - data["args"] = args - fdata = middle_api_protocol.load_redis_cmd(data) - result = middle_api_protocol.request(fdata) - return result + if cls.__LOCAL_REQUEST: + redis = RedisManager(db).getRedis() + method = getattr(redis_manager.RedisUtils, cmd) + args_ = [redis, key] + if args is not None: + if builtins.type(args) == tuple or builtins.type(args) == list: + args = list(args) + if cmd == "setex": + args_.append(args[0]) + if type(args[1]) == list: + args_.append(json.dumps(args[1])) + else: + args_.append(args[1]) + else: + for a in args: + args_.append(a) + else: + args_.append(args) + args_ = tuple(args_) + result = method(*args_) + if builtins.type(result) == set: + result = list(result) + return result + else: + data = { + "db": db, + "cmd": cmd, + "key": key, + } + if args is not None: + data["args"] = args + fdata = middle_api_protocol.load_redis_cmd(data) + result = middle_api_protocol.request(fdata) + return result + + # [(db, cmd, key, args)] + @classmethod + def __batch__request(cls, odatas): + if cls.__LOCAL_REQUEST: + result_list = [] + for d in odatas: + db = d[0] + cmd = d[1] + key = d[2] + args = None + if d[3] is not None: + args = d[3] + result = cls.__request(db, cmd, key, args) + result_list.append(result) + return result_list + else: + _datas = [] + for d in odatas: + data = { + "db": d[0], + "cmd": d[1], + "key": d[2] + } + if d[3] is not None: + data["args"] = d[3] + _datas.append(data) + fdata = middle_api_protocol.load_redis_cmds(_datas) + results = middle_api_protocol.request(fdata) + return results @classmethod def __get_db(cls, redis_): @@ -79,11 +140,17 @@ @classmethod def keys(cls, redis_, key, auto_free=True): + async_log_util.info(logger_redis_debug, f"keys:{key}") return cls.exec("keys", key, lambda: cls.__request(cls.__get_db(redis_), "keys", key)) @classmethod def set(cls, redis_, key, val, auto_free=True): return cls.exec("set", key, lambda: cls.__request(cls.__get_db(redis_), "set", key, val)) + + @classmethod + def set_async(cls, db, key, val, auto_free=True): + cls.add_async_task(db, "set", (key, val)) + # logger_redis_debug.info("setex_async({}):{}", 0, key) @classmethod def setex(cls, redis_, key, expire, val, auto_free=True, _async=False): @@ -162,8 +229,11 @@ pass @classmethod - def add_async_task(cls, db, method, args): - cls.__async_task_queue.put_nowait((db, method, args)) + def add_async_task(cls, db: int, method, args): + try: + cls.__async_task_queue.put_nowait((db, method, args)) + except Exception as e: + async_log_util.error(logger_redis_debug, f"鍔犲叆闃熷垪鍑洪敊锛歿str(e)}") @classmethod def get_async_task_count(cls): @@ -173,31 +243,40 @@ @classmethod def run_loop(cls): logger_system.info("鍚姩Redis鏁版嵁鍚屾鏈嶅姟") + logger_system.info(f"redis 绾跨▼ID:{tool.get_thread_id()}") + dataList = [] + last_upload_time = time.time() while True: try: data = cls.__async_task_queue.get() if data: - try: - db = data[0] - method_name = data[1] - args = data[2] - _redis = RedisManager(db).getRedisNoPool() - method = getattr(RedisUtils, method_name) - if type(args) == tuple: - args = list(args) - args.insert(0, _redis) - args = tuple(args) - result = method(*args) + temp_data = [data[0], data[1]] + if type(data[2]) == tuple or type(data[2]) == list: + temp_data.append(data[2][0]) + if len(data[2]) > 1: + temp_data.append(data[2][1:]) else: - args = tuple([_redis, args]) - result = method(*args) - except Exception as e2: - logging.exception(e2) - logging.error(data) + temp_data.append(None) + else: + temp_data.append(data[2]) + temp_data.append(None) + dataList.append(tuple(temp_data)) + if len(dataList) >= 20: + try: + results = cls.__batch__request(dataList) + last_upload_time = time.time() + finally: + dataList.clear() + if dataList and time.time() - last_upload_time > 5: + results = cls.__batch__request(dataList) + last_upload_time = time.time() + dataList.clear() except Exception as e1: logging.exception(e1) pass if __name__ == "__main__": - pass + for i in range(30): + RedisUtils.setex_async(0, 'buy_position_info-002547', tool.get_expire(), 1011) + RedisUtils.run_loop() -- Gitblit v1.8.0