""" 日志分析 """ # 获取不可以下单的原因 import datetime import os import constant from utils import tool def get_cant_order_reasons_dict(): file_path = "{}/logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), tool.get_now_date_str()) dict_ = {} if os.path.exists(file_path): with open(file_path, encoding="utf-8") as f: line = f.readline() while line: if line.find("不可以下单,原因:") > -1: code = line.split("code=")[1][:6] time_ = line.split("|")[0].split(" ")[1][:12] reason = line.split("不可以下单,原因:")[1].strip() dict_[code] = (time_, reason) # print(time_, code, reason) line = f.readline() return dict_ def get_kpl_can_buy_reasons_dict(): file_path = "{}/logs/gp/kpl/kpl_block_can_buy.{}.log".format(constant.get_path_prefix(), tool.get_now_date_str()) dict_ = {} if os.path.exists(file_path): with open(file_path, encoding="utf-8") as f: line = f.readline() while line: if True: code = line.split("code=")[1][:6] time_ = line.split("|")[0].split(" ")[1][:12] reason = line.split(f"code={code}:")[1].strip() dict_[code] = (time_, reason.replace("可以下单", "")) # print(time_, code, reason) line = f.readline() return dict_ # 获取华鑫的委托数据 def load_huaxin_delegate_datas(): file_path = "{}/logs/gp/kpl/kpl_block_can_buy.{}.log".format(constant.get_path_prefix(), tool.get_now_date_str()) dict_ = {} if os.path.exists(file_path): with open(file_path, encoding="utf-8") as f: line = f.readline() while line: if True: code = line.split("code=")[1][:6] time_ = line.split("|")[0].split(" ")[1][:12] reason = line.split(f"code={code}:")[1].strip() dict_[code] = (time_, reason.replace("可以下单", "")) # print(time_, code, reason) line = f.readline() # 分析L2数据传输时间 def analyze_l2_data_transformation(path_): with open(path_, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if not line: break try: datas = line.split("|") create_time = datas[0].strip() data = datas[2].split(" - ")[1].strip() contents = data.split("#") code = contents[0] use_time = int(contents[1].strip().split(":")[1].split("-")[-2]) l2_data = contents[2] l2_data = eval(l2_data) max_time_data = None min_time_data = None for d in l2_data: if len(d) > 10: if max_time_data is None: max_time_data = d if min_time_data is None: min_time_data = d if d[10] > max_time_data[10]: max_time_data = d if d[10] < min_time_data[10]: min_time_data = d if max_time_data and min_time_data: dt = datetime.datetime.strptime(create_time.split(".")[0], "%Y-%m-%d %H:%M:%S") create_timestamp = int(dt.timestamp() * 1000) + int(create_time.split(".")[1]) if use_time > 100: print(create_time, f"数量:{len(l2_data)}", f"耗时:{use_time}", code, create_timestamp - min_time_data[10], create_timestamp - max_time_data[10]) except: print(line) pass pass if __name__ == "__main__": analyze_l2_data_transformation("D:\\logs\\huaxin_l2\\orderdetail.2023-08-29.log")