import datetime import hashlib import logging import os import time import constant from log_module.log import printlog from utils import tool class LogUtil: @classmethod def extract_log_from_key(cls, key, path, target_path): fw = open(target_path, mode='w', encoding="utf-8") try: with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line.find("{}".format(key)) > 0: fw.write(line) finally: fw.close() # 导出数据处理位置日志 def __export_l2_pos_range(code, date, dir): LogUtil.extract_log_from_key("{} 处理数据范围".format(code), "{}/sell_logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date), "{}/l2_process_{}.log".format(dir, date)) # 导出交易日志 def __export_l2_trade_log(code, date, dir): LogUtil.extract_log_from_key(code, "{}/sell_logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date), "{}/l2_trade_{}.log".format(dir, date)) # 导出交易取消日志 def __export_l2_trade_cancel_log(code, date, dir): LogUtil.extract_log_from_key(code, "{}/sell_logs/gp/l2/l2_trade_cancel.{}.log".format(constant.get_path_prefix(), date), "{}/l2_trade_cancel_{}.log".format(dir, date)) def __analyse_pricess_time(): date = datetime.datetime.now().strftime("%Y-%m-%d") file_path = f"{constant.get_path_prefix()}/sell_logs/gp/l2/l2_process.{date}.log" with open(file_path, encoding="utf-8") as f: line = f.readline() while line: time_ = line.split(":")[-1] if int(time_) > 150: printlog(line) line = f.readline() def export_l2_log(code): if len(code) < 6: return date = datetime.datetime.now().strftime("%Y-%m-%d") dir_ = "{}/sell_logs/gp/l2/{}".format(constant.get_path_prefix(), code) if not os.path.exists(dir_): os.mkdir(dir_) __export_l2_pos_range(code, date, dir_) __export_l2_trade_cancel_log(code, date, dir_) __export_l2_trade_log(code, date, dir_) def compute_buy1_real_time(time_): ts = time_.split(":") s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) cha = (s - 2) % 3 return tool.time_seconds_format(s - 2 - cha) def load_l2_from_log(date=None): today_data = {} if date is None: date = tool.get_now_date_str() try: with open("{}/sell_logs/gp/l2/l2_data.{}.log".format(constant.get_path_prefix(), date), mode='r') as f: while True: data = f.readline() if not data: break index = data.find(' - ') + 2 if data.find('async_log_util') > 0: index = data.find(']', index) + 1 data = data[index + 1:].strip() code = data[0:6] data = data[7:] dict_ = eval(data) if code not in today_data: today_data[code] = dict_ else: today_data[code].extend(dict_) for key in today_data: # news = sorted(today_data[key], key=lambda x: x["index"]) # today_data[key] = news printlog(key, len(today_data[key]) - 1, today_data[key][-1]["index"]) except: pass return today_data # 获取日志时间 def __get_log_time(line): time_ = line.split("|")[0].split(" ")[1].split(".")[0] return time_ def __get_async_log_time(line): line = line.split(" - ")[1] time_str = line[line.find("[") + 1:line.find("[") + 9] return time_str # 获取L2每次批量处理数据的位置范围 def get_l2_process_position(code, date=None): if not date: date = datetime.datetime.now().strftime("%Y-%m-%d") pos_list = [] with open("{}/sell_logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date), mode='r', encoding="utf-8") as f: while True: line = f.readline() if not line: break if line.find("code:{}".format(code)) < 0: continue time_ = __get_log_time(line) line = line[line.find("处理数据范围") + len("处理数据范围") + 1:line.find("处理时间")].strip() if len(pos_list) == 0 or pos_list[-1][1] < int(line.split("-")[0]): if int("093000") <= int(time_.replace(":", "")) <= int("150000"): try: pos_list.append((int(line.split("-")[0]), int(line.split("-")[1]))) except Exception as e: logging.exception(e) return pos_list # 获取L2每次批量处理数据的位置范围 def get_l2_trade_position(code, date=None): if not date: date = datetime.datetime.now().strftime("%Y-%m-%d") pos_list = [] with open("{}/sell_logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date), mode='r', encoding="utf-8") as f: while True: line = f.readline() if not line: break if line.find("code={}".format(code)) < 0: continue # printlog(line) time_ = __get_log_time(line) if int("093000") > int(time_.replace(":", "")) or int(time_.replace(":", "")) > int("150000"): continue if line.find("获取到买入信号起始点") > 0: str_ = line.split("获取到买入信号起始点:")[1].strip() index = str_[0:str_.find(" ")].strip() # printlog("信号起始位置:", index) pos_list.append((0, int(index), "")) elif line.find("获取到买入执行位置") > 0: str_ = line.split("获取到买入执行位置:")[1].strip() index = str_[0:str_.find(" ")].strip() # printlog("买入执行位置:", index) pos_list.append((1, int(index), "")) elif line.find("触发撤单,撤单位置:") > 0: str_ = line.split("触发撤单,撤单位置:")[1].strip() index = str_[0:str_.find(" ")].strip() # printlog("撤单位置:", index) pos_list.append((2, int(index), line.split("撤单原因:")[1])) pass else: continue return pos_list __log_file_contents = {} # 加载文件内容 def __load_file_content(path_str, expire_timespace=20): md5 = hashlib.md5(path_str.encode(encoding='utf-8')).hexdigest() if md5 in __log_file_contents and time.time() - __log_file_contents[md5][0] < expire_timespace: return __log_file_contents[md5][1] contents = [] if os.path.exists(path_str): with open(path_str, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: contents.append(line) __log_file_contents[md5] = (time.time(), contents) return contents # 加载l2订单成交数据 def load_huaxin_deal_record(code, date=tool.get_now_date_str()): path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction_desc.{date}.log" # 格式:[(订单号,手数,开始成交时间,成交结束时间,下单手数)] fdatas = [] lines = __load_file_content(path) for line in lines: data_index = line.find(f"{code}#") if data_index > 0: line = line.split(" - ")[1] time_str = line[line.find("[") + 1:line.find("[") + 9] data = line[line.find("]") + 1:].strip() code = data.split("#")[0] data = data.split("#")[1] data = eval(data) fdatas.append(data) return fdatas # 加载华鑫成交的卖单 def load_huaxin_transaction_sell_no(code=None, date=tool.get_now_date_str()): path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction_sell_order.{date}.log" fdatas = {} if os.path.exists(path): with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line: data = line.split(" - ")[1].strip() if data.startswith("["): data = data[data.find("]") + 1:].strip() data = data.split("code=")[1] code_ = data[:6] if code and code != code_: continue data = data[6:].strip() if code_ not in fdatas: fdatas[code_] = [] fdatas[code_].append(eval(data)) return fdatas # 读取系统日志 def load_system_log(): path = f"{constant.get_path_prefix()}/sell_logs/gp/system/system.{tool.get_now_date_str()}.log" fdatas = [] if os.path.exists(path): with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line: try: time_str = line.split("|")[0].strip() level = line.split("|")[1].strip() if level != "INFO" and level != "ERROR": continue data = line.split("|")[2].split(" - ")[1].strip() fdatas.append((time_str, level, data)) except: pass return fdatas # 读取系统日志 def load_huaxin_transaction_map(date=tool.get_now_date_str()): path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction.{date}.log" fdatas = {} if os.path.exists(path): with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line: try: data = line.split(" - ")[1].strip() if data.startswith("["): data = data[data.find("]") + 1:].strip() code = data.split("#")[0] l2_data = eval(data.split("#")[1]) if code not in fdatas: fdatas[code] = [] fdatas[code].append(l2_data) except: pass return fdatas # 读取成交的大买单 def load_huaxin_transaction_big_buy_order_infos(date=tool.get_now_date_str()): path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction_big_buy_order.{date}.log" fdatas = {} if os.path.exists(path): with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line: try: data = line.split(" - ")[1].strip() if data.startswith("["): data = data[data.find("]") + 1:].strip() code = data.split("#")[0] l2_data = eval(data.split("#")[1]) if code not in fdatas: fdatas[code] = [] fdatas[code].append(l2_data) except: pass return fdatas # 读取成交的大卖单 def load_huaxin_transaction_big_sell_order_infos(date=tool.get_now_date_str()): path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction_big_sell_order.{date}.log" fdatas = {} if os.path.exists(path): with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line: try: data = line.split(" - ")[1].strip() if data.startswith("["): data = data[data.find("]") + 1:].strip() code = data.split("#")[0] l2_data = eval(data.split("#")[1]) if code not in fdatas: fdatas[code] = [] fdatas[code].append(l2_data) except: pass return fdatas