""" 日志分析 """ # 获取不可以下单的原因 import os import re import constant from utils import tool def get_cant_order_reasons_dict(): file_path = "{}/logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), tool.get_now_date_str()) dict_ = {} if os.path.exists(file_path): with open(file_path, encoding="utf-8") as f: line = f.readline() while line: if line.find("不可以下单,原因:") > -1: code = line.split("code=")[1][:6] time_ = line.split("|")[0].split(" ")[1][:12] reason = line.split("不可以下单,原因:")[1].strip() dict_[code] = (time_, reason) # print(time_, code, reason) line = f.readline() return dict_ def get_kpl_can_buy_reasons_dict(): file_path = "{}/logs/gp/kpl/kpl_block_can_buy.{}.log".format(constant.get_path_prefix(), tool.get_now_date_str()) dict_ = {} if os.path.exists(file_path): with open(file_path, encoding="utf-8") as f: line = f.readline() while line: if True: code = line.split("code=")[1][:6] time_ = line.split("|")[0].split(" ")[1][:12] reason = line.split(f"code={code}:")[1].strip() dict_[code] = (time_, reason.replace("可以下单", "")) # print(time_, code, reason) line = f.readline() return dict_ # 分析请求时间 def analyse_request_time(): with open(f"D:\\文件传输\\交易\\日志文件\\request_debug.{tool.get_now_date_str()}.log", encoding="utf-8", mode='r') as f: lines = f.readlines() keys = {} for line in lines: if not line: continue if line.find("请求开始:register") >= 0: continue try: time_str = re.findall(r'\[(.*?)\]', line)[0] result = re.findall(r'【(.*?)】', line) key = f"{result[0]}-{result[1]}" if key not in keys: keys[key] = (time_str, line) else: use_time = tool.time_sub_as_ms(time_str, keys[key][0]) if use_time > 1000 * 5: print(f"请求时间:{use_time}ms", keys[key][1]) keys.pop(key) except: print(line) for k in keys: print("尚未获取到结果:", keys[k]) pass if __name__ == "__main__": analyse_request_time()