| | |
| | | result = middle_api_protocol.request(fdata) |
| | | return result |
| | | |
| | | # [(db, cmd, key, args)] |
| | | @classmethod |
| | | def __batch__request(cls, odatas): |
| | | _datas = [] |
| | | for d in odatas: |
| | | data = { |
| | | "db": d[0], |
| | | "cmd": d[1], |
| | | "key": d[2] |
| | | } |
| | | if d[3] is not None: |
| | | data["args"] = d[3] |
| | | _datas.append(data) |
| | | fdata = middle_api_protocol.load_redis_cmds(_datas) |
| | | results = middle_api_protocol.request(fdata) |
| | | return results |
| | | |
| | | @classmethod |
| | | def __get_db(cls, redis_): |
| | | return redis_.connection_pool.connection_kwargs['db'] |
| | |
| | | @classmethod |
| | | def run_loop(cls): |
| | | logger_system.info("启动Redis数据同步服务") |
| | | dataList = [] |
| | | last_upload_time = time.time() |
| | | while True: |
| | | try: |
| | | data = cls.__async_task_queue.get() |
| | | if data: |
| | | try: |
| | | db = data[0] |
| | | method_name = data[1] |
| | | args = data[2] |
| | | _redis = RedisManager(db).getRedisNoPool() |
| | | method = getattr(RedisUtils, method_name) |
| | | if type(args) == tuple: |
| | | args = list(args) |
| | | args.insert(0, _redis) |
| | | args = tuple(args) |
| | | result = method(*args) |
| | | temp_data = [data[0], data[1]] |
| | | if type(data[2]) == tuple or type(data[2]) == list: |
| | | temp_data.append(data[2][0]) |
| | | if len(data[2]) > 1: |
| | | temp_data.append(data[2][1:]) |
| | | else: |
| | | args = tuple([_redis, args]) |
| | | result = method(*args) |
| | | except Exception as e2: |
| | | logging.exception(e2) |
| | | logging.error(data) |
| | | temp_data.append(None) |
| | | else: |
| | | temp_data.append(data[2]) |
| | | temp_data.append(None) |
| | | dataList.append(tuple(temp_data)) |
| | | if len(dataList) >= 20: |
| | | results = cls.__batch__request(dataList) |
| | | last_upload_time = time.time() |
| | | dataList.clear() |
| | | if dataList and time.time() - last_upload_time > 5: |
| | | results = cls.__batch__request(dataList) |
| | | last_upload_time = time.time() |
| | | dataList.clear() |
| | | # try: |
| | | # db = data[0] |
| | | # method_name = data[1] |
| | | # args = data[2] |
| | | # _redis = RedisManager(db).getRedisNoPool() |
| | | # method = getattr(RedisUtils, method_name) |
| | | # if type(args) == tuple: |
| | | # args = list(args) |
| | | # args.insert(0, _redis) |
| | | # args = tuple(args) |
| | | # result = method(*args) |
| | | # else: |
| | | # args = tuple([_redis, args]) |
| | | # result = method(*args) |
| | | # except Exception as e2: |
| | | # logging.exception(e2) |
| | | # logging.error(data) |
| | | except Exception as e1: |
| | | logging.exception(e1) |
| | | pass |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | pass |
| | | for i in range(30): |
| | | RedisUtils.setex_async(0, 'buy_position_info-002547', tool.get_expire(), 1011) |
| | | RedisUtils.run_loop() |