""" 日志 """ import datetime import json import os import shutil import sys from loguru import logger import gpcode_manager import tool class MyLogger: def __init__(self): logger.remove() # 每一天生成一个日志文件,历史日志文件采用zip压缩,异步写入日志 logger.add(self.get_path("trade", "trade_gui"), filter=lambda record: record["extra"].get("name") == "trade_gui", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("trade", "trade"), filter=lambda record: record["extra"].get("name") == "trade", rotation="00:00", compression="zip") logger.add(self.get_path("trade", "delegate"), filter=lambda record: record["extra"].get("name") == "delegate", rotation="00:00", compression="zip") logger.add(self.get_path("l2", "l2_error"), filter=lambda record: record["extra"].get("name") == "l2_error", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("l2", "l2_process"), filter=lambda record: record["extra"].get("name") == "l2_process", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("l2", "l2_process_time"), filter=lambda record: record["extra"].get("name") == "l2_process_time", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("l2", "l2_trade"), filter=lambda record: record["extra"].get("name") == "l2_trade", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("l2", "l2_data"), filter=lambda record: record["extra"].get("name") == "l2_data", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("l2", "l2_latest_data"), filter=lambda record: record["extra"].get("name") == "l2_latest_data", rotation="00:00", compression="zip", enqueue=True) # 显示在控制台 logger.add(sys.stdout, filter=lambda record: record["extra"].get("name") == "l2_trade") logger.add(self.get_path("l2", "l2_trade_cancel"), filter=lambda record: record["extra"].get("name") == "l2_trade_cancel", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("l2", "cancel/s_cancel"), filter=lambda record: record["extra"].get("name") == "s_cancel", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("l2", "cancel/h_cancel"), filter=lambda record: record["extra"].get("name") == "h_cancel", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("l2", "l2_trade_buy"), filter=lambda record: record["extra"].get("name") == "l2_trade_buy", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("l2", "l2_big_data"), filter=lambda record: record["extra"].get("name") == "l2_big_data", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("l2", "l2_trade_queue"), filter=lambda record: record["extra"].get("name") == "l2_trade_queue", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("l2", "l2_trade_buy_queue"), filter=lambda record: record["extra"].get("name") == "l2_trade_buy_queue", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("juejin", "juejin_tick"), filter=lambda record: record["extra"].get("name") == "juejin_tick", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("ths", "code_operate"), filter=lambda record: record["extra"].get("name") == "code_operate", rotation="00:00", compression="zip", enqueue=True) # 显示在控制台 logger.add(sys.stdout, filter=lambda record: record["extra"].get("name") == "code_operate") logger.add(self.get_path("device", "device"), filter=lambda record: record["extra"].get("name") == "device", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("system", "system"), filter=lambda record: record["extra"].get("name") == "system", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("ths", "buy_1_volumn"), filter=lambda record: record["extra"].get("name") == "buy_1_volumn", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("ths", "buy_1_volumn_record"), filter=lambda record: record["extra"].get("name") == "buy_1_volumn_record", rotation="00:00", compression="zip", enqueue=True) logger.add(self.get_path("ths", "day_volumn"), filter=lambda record: record["extra"].get("name") == "day_volumn", rotation="00:00", compression="zip", enqueue=True) def get_path(self, dir_name, log_name): return "D:/logs/gp/{}/{}".format(dir_name, log_name) + ".{time:YYYY-MM-DD}.log" def get_logger(self, log_name): return logger.bind(name=log_name) __mylogger = MyLogger() logger_trade_gui = __mylogger.get_logger("trade_gui") logger_trade = __mylogger.get_logger("trade") logger_trade_delegate = __mylogger.get_logger("delegate") logger_l2_error = __mylogger.get_logger("l2_error") logger_l2_process = __mylogger.get_logger("l2_process") logger_l2_process_time = __mylogger.get_logger("l2_process_time") logger_l2_data = __mylogger.get_logger("l2_data") logger_l2_latest_data = __mylogger.get_logger("l2_latest_data") logger_l2_trade = __mylogger.get_logger("l2_trade") logger_l2_trade_cancel = __mylogger.get_logger("l2_trade_cancel") logger_l2_s_cancel = __mylogger.get_logger("s_cancel") logger_l2_h_cancel = __mylogger.get_logger("h_cancel") logger_l2_trade_buy = __mylogger.get_logger("l2_trade_buy") logger_l2_trade_queue = __mylogger.get_logger("l2_trade_queue") logger_l2_trade_buy_queue = __mylogger.get_logger("l2_trade_buy_queue") logger_l2_big_data = __mylogger.get_logger("l2_big_data") logger_juejin_tick = __mylogger.get_logger("juejin_tick") logger_code_operate = __mylogger.get_logger("code_operate") logger_device = __mylogger.get_logger("device") logger_system = __mylogger.get_logger("system") logger_buy_1_volumn = __mylogger.get_logger("buy_1_volumn") logger_buy_1_volumn_record = __mylogger.get_logger("buy_1_volumn_record") logger_day_volumn = __mylogger.get_logger("day_volumn") class LogUtil: @classmethod def extract_log_from_key(cls, key, path, target_path): fw = open(target_path, mode='w', encoding="utf-8") try: with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line.find("{}".format(key)) > 0: fw.write(line) finally: fw.close() # 导出数据处理位置日志 def __export_l2_pos_range(code, date, dir): LogUtil.extract_log_from_key("{} 处理数据范围".format(code), "D:/logs/gp/l2/l2_process.{}.log".format(date), "{}/l2_process_{}.log".format(dir, date)) # 导出交易日志 def __export_l2_trade_log(code, date, dir): LogUtil.extract_log_from_key(code, "D:/logs/gp/l2/l2_trade.{}.log".format(date), "{}/l2_trade_{}.log".format(dir, date)) # 导出交易取消日志 def __export_l2_trade_cancel_log(code, date, dir): LogUtil.extract_log_from_key(code, "D:/logs/gp/l2/l2_trade_cancel.{}.log".format(date), "{}/l2_trade_cancel_{}.log".format(dir, date)) def __analyse_pricess_time(): date = datetime.datetime.now().strftime("%Y-%m-%d") file_path = f"D:/logs/gp/l2/l2_process.{date}.log" with open(file_path, encoding="utf-8") as f: line = f.readline() while line: time_ = line.split(":")[-1] if int(time_) > 150: print(line) line = f.readline() def export_l2_log(code): if len(code) < 6: return date = datetime.datetime.now().strftime("%Y-%m-%d") dir_ = "D:/logs/gp/l2/{}".format(code) if not os.path.exists(dir_): os.mkdir(dir_) __export_l2_pos_range(code, date, dir_) __export_l2_trade_cancel_log(code, date, dir_) __export_l2_trade_log(code, date, dir_) def compute_buy1_real_time(time_): ts = time_.split(":") s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) cha = (s - 2) % 3 return tool.time_seconds_format(s - 2 - cha) def load_l2_from_log(date=None): today_data = {} if date is None: date = datetime.datetime.now().strftime("%Y-%m-%d") try: with open("D:/logs/gp/l2/l2_data.{}.log".format(date), mode='r') as f: while True: data = f.readline() if not data: break index = data.find('save_l2_data:') index = data.find('-', index) data = data[index + 1:].strip() code = data[0:6] data = data[7:] dict_ = eval(data) if code not in today_data: today_data[code] = dict_ else: today_data[code].extend(dict_) for key in today_data: news = sorted(today_data[key], key=lambda x: x["index"]) today_data[key] = news print(key, len(today_data[key]) - 1, today_data[key][-1]["index"]) except: pass return today_data # 获取日志时间 def __get_log_time(line): time_ = line.split("|")[0].split(" ")[1].split(".")[0] return time_ # 获取L2每次批量处理数据的位置范围 def get_l2_process_position(code, date=None): if not date: date = datetime.datetime.now().strftime("%Y-%m-%d") pos_list = [] with open("D:/logs/gp/l2/l2_process.{}.log".format(date), mode='r', encoding="utf-8") as f: while True: line = f.readline() if not line: break if line.find("code:{}".format(code)) < 0: continue time_ = __get_log_time(line) line = line[line.find("处理数据范围") + len("处理数据范围") + 1:line.find("处理时间")].strip() if len(pos_list) == 0 or pos_list[-1][1] < int(line.split("-")[0]): if int("093000") <= int(time_.replace(":", "")) <= int("150000"): pos_list.append((int(line.split("-")[0]), int(line.split("-")[1]))) return pos_list # 获取L2每次批量处理数据的位置范围 def get_l2_trade_position(code, date=None): if not date: date = datetime.datetime.now().strftime("%Y-%m-%d") pos_list = [] with open("D:/logs/gp/l2/l2_trade.{}.log".format(date), mode='r', encoding="utf-8") as f: while True: line = f.readline() if not line: break if line.find("code={}".format(code)) < 0: continue print(line) time_ = __get_log_time(line) if int("093000") > int(time_.replace(":", "")) or int(time_.replace(":", "")) > int("150000"): continue if line.find("获取到买入信号起始点") > 0: str_ = line.split("获取到买入信号起始点:")[1].strip() index = str_[0:str_.find(" ")].strip() # print("信号起始位置:", index) pos_list.append((0, int(index), "")) elif line.find("获取到买入执行位置") > 0: str_ = line.split("获取到买入执行位置:")[1].strip() index = str_[0:str_.find(" ")].strip() # print("买入执行位置:", index) pos_list.append((1, int(index), "")) elif line.find("触发撤单") > 0: str_ = line.split("触发撤单,撤单位置:")[1].strip() index = str_[0:str_.find(" ")].strip() # print("撤单位置:", index) pos_list.append((2, int(index), line.split("撤单原因:")[1])) pass else: continue return pos_list # 获取交易进度 def get_trade_progress(code, date=None): if not date: date = datetime.datetime.now().strftime("%Y-%m-%d") index_list = [] buy_queues = [] with open("D:/logs/gp/l2/l2_trade_buy_queue.{}.log".format(date), mode='r', encoding="utf-8") as f: while True: line = f.readline() if not line: break time_ = __get_log_time(line).strip() if int(time_.replace(":", "")) > int("150000"): continue if line.find(f"{code}-[") >= 0: buy_queues.append((eval(line.split(f"{code}-")[1]), time_)) if line.find("获取成交位置成功: code-{}".format(code)) < 0: continue try: index = int(line.split("index-")[1].split(" ")[0]) index_list.append((index, time_)) except: pass return index_list, buy_queues def export_logs(code): code_name = gpcode_manager.get_code_name(code) date = datetime.datetime.now().strftime("%Y-%m-%d") target_dir = f"D:/logs/gp/l2/export/{code}_{code_name}_{date}" if os.path.exists(target_dir): shutil.rmtree(target_dir) os.makedirs(target_dir) log_names = ["l2_process", "l2_trade", "l2_trade_cancel", "l2_process_time", "l2_trade_buy"] # 导出交易日志 for log_name in log_names: key = f"code={code}" if log_name == "l2_process" or log_name == "l2_process_time": key = code LogUtil.extract_log_from_key(key, f"D:/logs/gp/l2/{log_name}.{date}.log".format(date), f"{target_dir}/{log_name}.{code}_{code_name}.{date}.log") if __name__ == '__main__': # logger_l2_h_cancel.info("test") # logger_l2_process_time.info("test123") codes = ["002131", "003035", "002131"] for code in codes: export_logs(code) # parse_l2_data()