From 6bbfbbb16d792f7737ec86cabdba5c0e98dcf4b4 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 29 八月 2025 17:41:29 +0800
Subject: [PATCH] 有涨停买撤单要触发撤单计算

---
 db/redis_manager_delegate.py |  141 ++++++++++++++++++++++++++++++++++++----------
 1 files changed, 110 insertions(+), 31 deletions(-)

diff --git a/db/redis_manager_delegate.py b/db/redis_manager_delegate.py
index a461259..68f9de6 100644
--- a/db/redis_manager_delegate.py
+++ b/db/redis_manager_delegate.py
@@ -1,6 +1,8 @@
 """
 redis绠$悊鍣�
 """
+import builtins
+import json
 import logging
 import queue
 import time
@@ -8,8 +10,11 @@
 import redis
 
 import constant
+from log_module import async_log_util
 from log_module.log import logger_redis_debug, logger_system
 from utils import tool, middle_api_protocol
+
+from db import redis_manager
 
 config = constant.REDIS_CONFIG
 
@@ -32,11 +37,13 @@
 
 
 class RedisUtils:
-    __async_task_queue = queue.Queue()
+    # 鏈満鎵цredis
+    __LOCAL_REQUEST = True
+
+    __async_task_queue = queue.Queue(maxsize=10240)
 
     @classmethod
     def exec(cls, method_name, key, lamada_method):
-        __start_time = time.time()
         try:
             return lamada_method()
         finally:
@@ -45,16 +52,70 @@
 
     @classmethod
     def __request(cls, db, cmd, key, args=None):
-        data = {
-            "db": db,
-            "cmd": cmd,
-            "key": key,
-        }
-        if args is not None:
-            data["args"] = args
-        fdata = middle_api_protocol.load_redis_cmd(data)
-        result = middle_api_protocol.request(fdata)
-        return result
+        if cls.__LOCAL_REQUEST:
+            redis = RedisManager(db).getRedis()
+            method = getattr(redis_manager.RedisUtils, cmd)
+            args_ = [redis, key]
+            if args is not None:
+                if builtins.type(args) == tuple or builtins.type(args) == list:
+                    args = list(args)
+                    if cmd == "setex":
+                        args_.append(args[0])
+                        if type(args[1]) == list:
+                            args_.append(json.dumps(args[1]))
+                        else:
+                            args_.append(args[1])
+                    else:
+                        for a in args:
+                            args_.append(a)
+                else:
+                    args_.append(args)
+            args_ = tuple(args_)
+            result = method(*args_)
+            if builtins.type(result) == set:
+                result = list(result)
+            return result
+        else:
+            data = {
+                "db": db,
+                "cmd": cmd,
+                "key": key,
+            }
+            if args is not None:
+                data["args"] = args
+            fdata = middle_api_protocol.load_redis_cmd(data)
+            result = middle_api_protocol.request(fdata)
+            return result
+
+    # [(db, cmd, key, args)]
+    @classmethod
+    def __batch__request(cls, odatas):
+        if cls.__LOCAL_REQUEST:
+            result_list = []
+            for d in odatas:
+                db = d[0]
+                cmd = d[1]
+                key = d[2]
+                args = None
+                if d[3] is not None:
+                    args = d[3]
+                result = cls.__request(db, cmd, key, args)
+                result_list.append(result)
+            return result_list
+        else:
+            _datas = []
+            for d in odatas:
+                data = {
+                    "db": d[0],
+                    "cmd": d[1],
+                    "key": d[2]
+                }
+                if d[3] is not None:
+                    data["args"] = d[3]
+                _datas.append(data)
+            fdata = middle_api_protocol.load_redis_cmds(_datas)
+            results = middle_api_protocol.request(fdata)
+            return results
 
     @classmethod
     def __get_db(cls, redis_):
@@ -79,11 +140,17 @@
 
     @classmethod
     def keys(cls, redis_, key, auto_free=True):
+        async_log_util.info(logger_redis_debug, f"keys:{key}")
         return cls.exec("keys", key, lambda: cls.__request(cls.__get_db(redis_), "keys", key))
 
     @classmethod
     def set(cls, redis_, key, val, auto_free=True):
         return cls.exec("set", key, lambda: cls.__request(cls.__get_db(redis_), "set", key, val))
+
+    @classmethod
+    def set_async(cls, db, key, val, auto_free=True):
+        cls.add_async_task(db, "set", (key, val))
+        # logger_redis_debug.info("setex_async({}):{}", 0, key)
 
     @classmethod
     def setex(cls, redis_, key, expire, val, auto_free=True, _async=False):
@@ -162,8 +229,11 @@
         pass
 
     @classmethod
-    def add_async_task(cls, db, method, args):
-        cls.__async_task_queue.put_nowait((db, method, args))
+    def add_async_task(cls, db: int, method, args):
+        try:
+            cls.__async_task_queue.put_nowait((db, method, args))
+        except Exception as e:
+            async_log_util.error(logger_redis_debug, f"鍔犲叆闃熷垪鍑洪敊锛歿str(e)}")
 
     @classmethod
     def get_async_task_count(cls):
@@ -173,31 +243,40 @@
     @classmethod
     def run_loop(cls):
         logger_system.info("鍚姩Redis鏁版嵁鍚屾鏈嶅姟")
+        logger_system.info(f"redis 绾跨▼ID:{tool.get_thread_id()}")
+        dataList = []
+        last_upload_time = time.time()
         while True:
             try:
                 data = cls.__async_task_queue.get()
                 if data:
-                    try:
-                        db = data[0]
-                        method_name = data[1]
-                        args = data[2]
-                        _redis = RedisManager(db).getRedisNoPool()
-                        method = getattr(RedisUtils, method_name)
-                        if type(args) == tuple:
-                            args = list(args)
-                            args.insert(0, _redis)
-                            args = tuple(args)
-                            result = method(*args)
+                    temp_data = [data[0], data[1]]
+                    if type(data[2]) == tuple or type(data[2]) == list:
+                        temp_data.append(data[2][0])
+                        if len(data[2]) > 1:
+                            temp_data.append(data[2][1:])
                         else:
-                            args = tuple([_redis, args])
-                            result = method(*args)
-                    except Exception as e2:
-                        logging.exception(e2)
-                        logging.error(data)
+                            temp_data.append(None)
+                    else:
+                        temp_data.append(data[2])
+                        temp_data.append(None)
+                    dataList.append(tuple(temp_data))
+                    if len(dataList) >= 20:
+                        try:
+                            results = cls.__batch__request(dataList)
+                            last_upload_time = time.time()
+                        finally:
+                            dataList.clear()
+                if dataList and time.time() - last_upload_time > 5:
+                    results = cls.__batch__request(dataList)
+                    last_upload_time = time.time()
+                    dataList.clear()
             except Exception as e1:
                 logging.exception(e1)
                 pass
 
 
 if __name__ == "__main__":
-    pass
+    for i in range(30):
+        RedisUtils.setex_async(0, 'buy_position_info-002547', tool.get_expire(), 1011)
+    RedisUtils.run_loop()

--
Gitblit v1.8.0