"""
|
日志
|
"""
|
import datetime
|
import os
|
import shutil
|
import sys
|
|
from loguru import logger
|
|
import constant
|
from code_attribute import gpcode_manager
|
from utils import tool
|
|
|
class MyLogger:
|
def __init__(self):
|
logger.remove()
|
# 每一天生成一个日志文件,历史日志文件采用zip压缩,异步写入日志
|
logger.add(self.get_path("trade", "trade_gui"),
|
filter=lambda record: record["extra"].get("name") == "trade_gui",
|
rotation="00:00", compression="zip", enqueue=True)
|
logger.add(self.get_path("trade", "trade"), filter=lambda record: record["extra"].get("name") == "trade",
|
rotation="00:00",
|
compression="zip")
|
logger.add(self.get_path("trade", "delegate"), filter=lambda record: record["extra"].get("name") == "delegate",
|
rotation="00:00",
|
compression="zip")
|
logger.add(self.get_path("l2", "l2_error"), filter=lambda record: record["extra"].get("name") == "l2_error",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("l2", "l2_process"), filter=lambda record: record["extra"].get("name") == "l2_process",
|
rotation="00:00", compression="zip", enqueue=True)
|
logger.add(self.get_path("l2", "l2_process_time"),
|
filter=lambda record: record["extra"].get("name") == "l2_process_time",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("l2", "l2_trade"), filter=lambda record: record["extra"].get("name") == "l2_trade",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("l2", "l2_data"), filter=lambda record: record["extra"].get("name") == "l2_data",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("l2", "l2_latest_data"),
|
filter=lambda record: record["extra"].get("name") == "l2_latest_data",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
# 显示在控制台
|
logger.add(sys.stdout,
|
filter=lambda record: record["extra"].get("name") == "l2_trade")
|
|
logger.add(self.get_path("l2", "l2_trade_cancel"),
|
filter=lambda record: record["extra"].get("name") == "l2_trade_cancel",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("l2", "cancel/s_cancel"),
|
filter=lambda record: record["extra"].get("name") == "s_cancel",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("l2", "cancel/h_cancel"),
|
filter=lambda record: record["extra"].get("name") == "h_cancel",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("l2", "l2_trade_buy"),
|
filter=lambda record: record["extra"].get("name") == "l2_trade_buy",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("l2", "l2_big_data"),
|
filter=lambda record: record["extra"].get("name") == "l2_big_data",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("l2", "l2_trade_queue"),
|
filter=lambda record: record["extra"].get("name") == "l2_trade_queue",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("l2", "l2_trade_buy_queue"),
|
filter=lambda record: record["extra"].get("name") == "l2_trade_buy_queue",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("l2", "l2_trade_buy_progress"),
|
filter=lambda record: record["extra"].get("name") == "l2_trade_buy_progress",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("juejin", "juejin_tick"),
|
filter=lambda record: record["extra"].get("name") == "juejin_tick",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("juejin", "juejin_trade"),
|
filter=lambda record: record["extra"].get("name") == "juejin_trade",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("juejin", "huaxin_trade"),
|
filter=lambda record: record["extra"].get("name") == "huaxin_trade",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("ths", "code_operate"),
|
filter=lambda record: record["extra"].get("name") == "code_operate",
|
rotation="00:00", compression="zip", enqueue=True)
|
# 显示在控制台
|
logger.add(sys.stdout,
|
filter=lambda record: record["extra"].get("name") == "code_operate")
|
|
logger.add(self.get_path("device", "device"), filter=lambda record: record["extra"].get("name") == "device",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("system", "system"), filter=lambda record: record["extra"].get("name") == "system",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("ths", "buy_1_volumn"),
|
filter=lambda record: record["extra"].get("name") == "buy_1_volumn",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("ths", "buy_1_volumn_record"),
|
filter=lambda record: record["extra"].get("name") == "buy_1_volumn_record",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("ths", "trade_queue_price_info"),
|
filter=lambda record: record["extra"].get("name") == "trade_queue_price_info",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("ths", "day_volumn"),
|
filter=lambda record: record["extra"].get("name") == "day_volumn",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("ths", "buy_win_distibute"),
|
filter=lambda record: record["extra"].get("name") == "buy_win_distibute",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("first_code", "first_code_record"),
|
filter=lambda record: record["extra"].get("name") == "first_code_record",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("debug", "debug"),
|
filter=lambda record: record["extra"].get("name") == "debug",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("trade", "trade_record"),
|
filter=lambda record: record["extra"].get("name") == "trade_record",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("score", "place_order_score"),
|
filter=lambda record: record["extra"].get("name") == "place_order_score",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("kpl", "kpl_limit_up_reason_change"),
|
filter=lambda record: record["extra"].get("name") == "kpl_limit_up_reason_change",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("kpl", "kpl_limit_up"),
|
filter=lambda record: record["extra"].get("name") == "kpl_limit_up",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("kpl", "kpl_debug"),
|
filter=lambda record: record["extra"].get("name") == "kpl_debug",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
logger.add(self.get_path("kpl", "kpl_block_can_buy"),
|
filter=lambda record: record["extra"].get("name") == "kpl_block_can_buy",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
# 看盘日志
|
logger.add(self.get_path("kp", "kp_msg"),
|
filter=lambda record: record["extra"].get("name") == "kp_msg",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
def get_path(self, dir_name, log_name):
|
return "{}/logs/gp/{}/{}".format(constant.get_path_prefix(), dir_name,
|
log_name) + ".{time:YYYY-MM-DD}.log"
|
|
def get_logger(self, log_name):
|
return logger.bind(name=log_name)
|
|
|
__mylogger = MyLogger()
|
|
logger_trade_gui = __mylogger.get_logger("trade_gui")
|
logger_trade = __mylogger.get_logger("trade")
|
logger_trade_delegate = __mylogger.get_logger("delegate")
|
logger_l2_error = __mylogger.get_logger("l2_error")
|
logger_l2_process = __mylogger.get_logger("l2_process")
|
logger_l2_process_time = __mylogger.get_logger("l2_process_time")
|
logger_l2_data = __mylogger.get_logger("l2_data")
|
logger_l2_latest_data = __mylogger.get_logger("l2_latest_data")
|
|
logger_l2_trade = __mylogger.get_logger("l2_trade")
|
logger_l2_trade_cancel = __mylogger.get_logger("l2_trade_cancel")
|
logger_l2_s_cancel = __mylogger.get_logger("s_cancel")
|
logger_l2_h_cancel = __mylogger.get_logger("h_cancel")
|
logger_l2_trade_buy = __mylogger.get_logger("l2_trade_buy")
|
logger_l2_trade_queue = __mylogger.get_logger("l2_trade_queue")
|
logger_l2_trade_buy_queue = __mylogger.get_logger("l2_trade_buy_queue")
|
logger_l2_trade_buy_progress = __mylogger.get_logger("l2_trade_buy_progress")
|
|
logger_l2_big_data = __mylogger.get_logger("l2_big_data")
|
logger_juejin_tick = __mylogger.get_logger("juejin_tick")
|
logger_juejin_trade = __mylogger.get_logger("juejin_trade")
|
logger_huaxin_trade = __mylogger.get_logger("huaxin_trade")
|
logger_code_operate = __mylogger.get_logger("code_operate")
|
logger_device = __mylogger.get_logger("device")
|
logger_system = __mylogger.get_logger("system")
|
|
logger_buy_1_volumn = __mylogger.get_logger("buy_1_volumn")
|
|
logger_buy_1_volumn_record = __mylogger.get_logger("buy_1_volumn_record")
|
|
logger_trade_queue_price_info = __mylogger.get_logger("trade_queue_price_info")
|
|
logger_day_volumn = __mylogger.get_logger("day_volumn")
|
|
logger_buy_win_distibute = __mylogger.get_logger("buy_win_distibute")
|
|
logger_first_code_record = __mylogger.get_logger("first_code_record")
|
|
logger_debug = __mylogger.get_logger("debug")
|
|
logger_trade_record = __mylogger.get_logger("trade_record")
|
|
logger_place_order_score = __mylogger.get_logger("place_order_score")
|
|
logger_kpl_limit_up_reason_change = __mylogger.get_logger("kpl_limit_up_reason_change")
|
|
logger_kpl_limit_up = __mylogger.get_logger("kpl_limit_up")
|
|
logger_kpl_debug = __mylogger.get_logger("kpl_debug")
|
|
logger_kpl_block_can_buy = __mylogger.get_logger("kpl_block_can_buy")
|
|
logger_kp_msg = __mylogger.get_logger("kp_msg")
|
|
|
class LogUtil:
|
@classmethod
|
def extract_log_from_key(cls, key, path, target_path):
|
fw = open(target_path, mode='w', encoding="utf-8")
|
try:
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
if line.find("{}".format(key)) > 0:
|
fw.write(line)
|
finally:
|
fw.close()
|
|
|
|
|
|
# 导出数据处理位置日志
|
def __export_l2_pos_range(code, date, dir):
|
LogUtil.extract_log_from_key("{} 处理数据范围".format(code),
|
"{}/logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date),
|
"{}/l2_process_{}.log".format(dir, date))
|
|
|
# 导出交易日志
|
def __export_l2_trade_log(code, date, dir):
|
LogUtil.extract_log_from_key(code, "{}/logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date),
|
"{}/l2_trade_{}.log".format(dir, date))
|
|
|
# 导出交易取消日志
|
def __export_l2_trade_cancel_log(code, date, dir):
|
LogUtil.extract_log_from_key(code, "{}/logs/gp/l2/l2_trade_cancel.{}.log".format(constant.get_path_prefix(), date),
|
"{}/l2_trade_cancel_{}.log".format(dir, date))
|
|
|
def __analyse_pricess_time():
|
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
file_path = f"{constant.get_path_prefix()}/logs/gp/l2/l2_process.{date}.log"
|
with open(file_path, encoding="utf-8") as f:
|
line = f.readline()
|
while line:
|
time_ = line.split(":")[-1]
|
if int(time_) > 150:
|
print(line)
|
line = f.readline()
|
|
|
def export_l2_log(code):
|
if len(code) < 6:
|
return
|
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
dir_ = "{}/logs/gp/l2/{}".format(constant.get_path_prefix(), code)
|
if not os.path.exists(dir_):
|
os.mkdir(dir_)
|
__export_l2_pos_range(code, date, dir_)
|
__export_l2_trade_cancel_log(code, date, dir_)
|
__export_l2_trade_log(code, date, dir_)
|
|
|
def compute_buy1_real_time(time_):
|
ts = time_.split(":")
|
s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
|
cha = (s - 2) % 3
|
return tool.time_seconds_format(s - 2 - cha)
|
|
|
def load_l2_from_log(date=None):
|
today_data = {}
|
if date is None:
|
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
try:
|
with open("{}/logs/gp/l2/l2_data.{}.log".format(constant.get_path_prefix(), date), mode='r') as f:
|
while True:
|
data = f.readline()
|
if not data:
|
break
|
index = data.find('save_l2_data:')
|
index = data.find('-', index)
|
data = data[index + 1:].strip()
|
code = data[0:6]
|
data = data[7:]
|
dict_ = eval(data)
|
if code not in today_data:
|
today_data[code] = dict_
|
else:
|
today_data[code].extend(dict_)
|
for key in today_data:
|
news = sorted(today_data[key], key=lambda x: x["index"])
|
today_data[key] = news
|
print(key, len(today_data[key]) - 1, today_data[key][-1]["index"])
|
except:
|
pass
|
return today_data
|
|
|
# 获取日志时间
|
def __get_log_time(line):
|
time_ = line.split("|")[0].split(" ")[1].split(".")[0]
|
return time_
|
|
|
# 获取L2每次批量处理数据的位置范围
|
def get_l2_process_position(code, date=None):
|
if not date:
|
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
pos_list = []
|
with open("{}/logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date), mode='r', encoding="utf-8") as f:
|
while True:
|
line = f.readline()
|
if not line:
|
break
|
if line.find("code:{}".format(code)) < 0:
|
continue
|
time_ = __get_log_time(line)
|
line = line[line.find("处理数据范围") + len("处理数据范围") + 1:line.find("处理时间")].strip()
|
if len(pos_list) == 0 or pos_list[-1][1] < int(line.split("-")[0]):
|
if int("093000") <= int(time_.replace(":", "")) <= int("150000"):
|
pos_list.append((int(line.split("-")[0]), int(line.split("-")[1])))
|
return pos_list
|
|
|
# 获取L2每次批量处理数据的位置范围
|
def get_l2_trade_position(code, date=None):
|
if not date:
|
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
pos_list = []
|
with open("{}/logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date), mode='r', encoding="utf-8") as f:
|
while True:
|
line = f.readline()
|
if not line:
|
break
|
if line.find("code={}".format(code)) < 0:
|
continue
|
print(line)
|
time_ = __get_log_time(line)
|
if int("093000") > int(time_.replace(":", "")) or int(time_.replace(":", "")) > int("150000"):
|
continue
|
|
if line.find("获取到买入信号起始点") > 0:
|
str_ = line.split("获取到买入信号起始点:")[1].strip()
|
index = str_[0:str_.find(" ")].strip()
|
# print("信号起始位置:", index)
|
pos_list.append((0, int(index), ""))
|
|
elif line.find("获取到买入执行位置") > 0:
|
str_ = line.split("获取到买入执行位置:")[1].strip()
|
index = str_[0:str_.find(" ")].strip()
|
# print("买入执行位置:", index)
|
pos_list.append((1, int(index), ""))
|
elif line.find("触发撤单") > 0:
|
str_ = line.split("触发撤单,撤单位置:")[1].strip()
|
index = str_[0:str_.find(" ")].strip()
|
# print("撤单位置:", index)
|
pos_list.append((2, int(index), line.split("撤单原因:")[1]))
|
pass
|
else:
|
continue
|
return pos_list
|
|
|
# 获取交易进度
|
def get_trade_progress(code, date=None):
|
if not date:
|
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
index_list = []
|
buy_queues = []
|
with open("{}/logs/gp/l2/l2_trade_buy_queue.{}.log".format(constant.get_path_prefix(),date), mode='r', encoding="utf-8") as f:
|
while True:
|
line = f.readline()
|
if not line:
|
break
|
time_ = __get_log_time(line).strip()
|
if int(time_.replace(":", "")) > int("150000"):
|
continue
|
|
if line.find(f"{code}-[") >= 0:
|
buy_queues.append((eval(line.split(f"{code}-")[1]), time_))
|
|
if line.find("获取成交位置成功: code-{}".format(code)) < 0:
|
continue
|
try:
|
index = int(line.split("index-")[1].split(" ")[0])
|
index_list.append((index, time_))
|
except:
|
pass
|
return index_list, buy_queues
|
|
|
# 获取H级撤单计算结果
|
def get_h_cancel_compute_info(code, date=None):
|
if not date:
|
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
path_str = f"{constant.get_path_prefix()}/logs/gp/l2/cancel/h_cancel.{date}.log"
|
latest_info = None
|
if os.path.exists(path_str):
|
with open(path_str, mode='r', encoding="utf-8") as f:
|
while True:
|
line = f.readline()
|
if not line:
|
break
|
if line.find(f"code-{code}") < 0:
|
continue
|
if line.find(f"H级撤单计算结果") < 0:
|
continue
|
target_rate = line.split("目标比例:")[1].split(" ")[0].strip()
|
cancel_num = line.split("取消计算结果")[1][1:].split("/")[0].strip()
|
total_num = line.split("取消计算结果")[1][1:].split("/")[1].split(" ")[0].strip()
|
latest_info = (target_rate, round(int(cancel_num) / int(total_num), 2), cancel_num, total_num,)
|
return latest_info
|
|
|
# 读取看盘消息
|
def get_kp_msg_list(date=None):
|
if not date:
|
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
path_str = f"{constant.get_path_prefix()}/logs/gp/kp/kp_msg.{date}.log"
|
msg_list = []
|
if os.path.exists(path_str):
|
with open(path_str, mode='r', encoding="utf-8") as f:
|
while True:
|
line = f.readline()
|
if not line:
|
break
|
msg_list.append(line)
|
return msg_list
|
|
|
def export_logs(code):
|
code_name = gpcode_manager.get_code_name(code)
|
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
target_dir = f"{constant.get_path_prefix()}/logs/gp/l2/export/{code}_{code_name}_{date}"
|
if os.path.exists(target_dir):
|
shutil.rmtree(target_dir)
|
os.makedirs(target_dir)
|
log_names = ["l2_process", "l2_trade", "l2_trade_cancel", "l2_process_time", "l2_trade_buy",
|
"l2_trade_buy_progress", "cancel/h_cancel"]
|
# 导出交易日志
|
for log_name in log_names:
|
key = f"code={code}"
|
if log_name == "l2_process" or log_name == "l2_process_time" or log_name == "cancel/h_cancel" or log_name == "l2_trade_buy_progress":
|
key = code
|
target_path = f"{target_dir}/{log_name}.{code}_{code_name}.{date}.log"
|
# 创建文件夹
|
dir_path = "/".join(target_path.split("/")[:-1])
|
if not os.path.exists(dir_path):
|
os.makedirs(dir_path)
|
LogUtil.extract_log_from_key(key, f"{constant.get_path_prefix()}/logs/gp/l2/{log_name}.{date}.log",
|
target_path)
|
|
|
def export_trade_progress(code):
|
path = f"{constant.get_path_prefix()}/logs/gp/l2/l2_trade_buy_progress.{tool.get_now_date_str()}.log"
|
index_set = set()
|
with open(path, mode='r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
if line.find(f"code-{code}") > -1 and line.find("确定交易进度成功") > -1:
|
index = line.split("index-")[1].split(" ")[0]
|
index_set.add(int(index))
|
results = list(index_set)
|
results.sort()
|
return results
|
|
|
# 加载买入得分记录
|
def load_buy_score_recod(code):
|
path = f"{constant.get_path_prefix()}/logs/gp/trade/trade_record.{tool.get_now_date_str()}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
data_index = line.find(f"code={code}")
|
if data_index > 0:
|
time_str = line[11:19]
|
data = line[line.find("data=") + 5:]
|
type = line[line.find("type=") + 5:line.find(" ", line.find("type="))]
|
fdatas.append((time_str, type, eval("{" + data + "}")))
|
return fdatas
|
|
|
def load_kpl_reason_changes():
|
path = f"{constant.get_path_prefix()}/logs/gp/kpl/kpl_limit_up_reason_change.{tool.get_now_date_str()}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
data = line[line.find("code-") + 5:]
|
code = data.split(":")[0]
|
from_r = data.split(":")[1].split("-")[0]
|
to_r = eval(data.split(":")[1].split("-")[1])
|
fdatas.append((code, from_r, to_r))
|
return fdatas
|
|
|
if __name__ == '__main__':
|
print(get_h_cancel_compute_info("603912"))
|
|
# logger_l2_h_cancel.info("test")
|
# logger_l2_process_time.info("test123")
|
# logger_buy_score.info("测试")
|
# codes = ["603083"]
|
# for code in codes:
|
# export_logs(code)
|
|
# parse_l2_data()
|