Administrator
2023-09-06 3784c8cc817beca104630d6a1e2c2d3fefa44e52
优化异步redis
2个文件已修改
78 ■■■■ 已修改文件
db/redis_manager_delegate.py 74 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/middle_api_protocol.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager_delegate.py
@@ -56,6 +56,23 @@
        result = middle_api_protocol.request(fdata)
        return result
    # [(db, cmd, key, args)]
    @classmethod
    def __batch__request(cls, odatas):
        _datas = []
        for d in odatas:
            data = {
                "db": d[0],
                "cmd": d[1],
                "key": d[2]
            }
            if d[3] is not None:
                data["args"] = d[3]
            _datas.append(data)
        fdata = middle_api_protocol.load_redis_cmds(_datas)
        results = middle_api_protocol.request(fdata)
        return results
    @classmethod
    def __get_db(cls, redis_):
        return redis_.connection_pool.connection_kwargs['db']
@@ -178,31 +195,54 @@
    @classmethod
    def run_loop(cls):
        logger_system.info("启动Redis数据同步服务")
        dataList = []
        last_upload_time = time.time()
        while True:
            try:
                data = cls.__async_task_queue.get()
                if data:
                    try:
                        db = data[0]
                        method_name = data[1]
                        args = data[2]
                        _redis = RedisManager(db).getRedisNoPool()
                        method = getattr(RedisUtils, method_name)
                        if type(args) == tuple:
                            args = list(args)
                            args.insert(0, _redis)
                            args = tuple(args)
                            result = method(*args)
                    temp_data = [data[0], data[1]]
                    if type(data[2]) == tuple or type(data[2]) == list:
                        temp_data.append(data[2][0])
                        if len(data[2]) > 1:
                            temp_data.append(data[2][1:])
                        else:
                            args = tuple([_redis, args])
                            result = method(*args)
                    except Exception as e2:
                        logging.exception(e2)
                        logging.error(data)
                            temp_data.append(None)
                    else:
                        temp_data.append(data[2])
                        temp_data.append(None)
                    dataList.append(tuple(temp_data))
                    if len(dataList) >= 20:
                        results = cls.__batch__request(dataList)
                        last_upload_time = time.time()
                        dataList.clear()
                if dataList and time.time() - last_upload_time > 5:
                    results = cls.__batch__request(dataList)
                    last_upload_time = time.time()
                    dataList.clear()
                    # try:
                    #     db = data[0]
                    #     method_name = data[1]
                    #     args = data[2]
                    #     _redis = RedisManager(db).getRedisNoPool()
                    #     method = getattr(RedisUtils, method_name)
                    #     if type(args) == tuple:
                    #         args = list(args)
                    #         args.insert(0, _redis)
                    #         args = tuple(args)
                    #         result = method(*args)
                    #     else:
                    #         args = tuple([_redis, args])
                    #         result = method(*args)
                    # except Exception as e2:
                    #     logging.exception(e2)
                    #     logging.error(data)
            except Exception as e1:
                logging.exception(e1)
                pass
if __name__ == "__main__":
    pass
    for i in range(30):
        RedisUtils.setex_async(0, 'buy_position_info-002547', tool.get_expire(), 1011)
    RedisUtils.run_loop()
utils/middle_api_protocol.py
@@ -31,6 +31,10 @@
    fdata = {"type": "redis", "data": {"ctype": "cmd", "data": data}}
    return fdata
def load_redis_cmds(datas):
    fdata = {"type": "redis", "data": {"ctype": "cmds", "data": datas}}
    return fdata
# ------------------------------Mysql协议----------------------------------
def load_mysql_cmd(data):