import datetime
|
import fileinput
|
import hashlib
|
import json
|
import logging
|
import os
|
import shutil
|
import time
|
|
from code_attribute import gpcode_manager
|
from utils import tool, constant
|
|
|
class LogUtil:
|
@classmethod
|
def extract_log_from_key(cls, key, path, target_path):
|
fw = open(target_path, mode='w', encoding="utf-8")
|
try:
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
if line.find("{}".format(key)) > 0:
|
fw.write(line)
|
finally:
|
fw.close()
|
|
|
# 获取日志时间
|
def __get_log_time(line):
|
time_ = line.split("|")[0].split(" ")[1].split(".")[0]
|
return time_
|
|
|
def __get_async_log_time(line):
|
line = line.split(" - ")[1]
|
time_str = line[line.find("[") + 1:line.find("[") + 9]
|
return time_str
|
|
|
__log_file_contents = {}
|
|
|
# 加载文件内容
|
def __load_file_content(path_str, expire_timespace=20):
|
md5 = hashlib.md5(path_str.encode(encoding='utf-8')).hexdigest()
|
if md5 in __log_file_contents and time.time() - __log_file_contents[md5][0] < expire_timespace:
|
return __log_file_contents[md5][1]
|
contents = []
|
if os.path.exists(path_str):
|
with open(path_str, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
contents.append(line)
|
__log_file_contents[md5] = (time.time(), contents)
|
return contents
|
|
|
# 加载买入得分记录
|
def load_latest_market_info(date=tool.get_now_date_str()):
|
path = f"{constant.get_path_prefix()}/{constant.LOG_DIR}/huaxin_local/l2/market.{date}.log"
|
fdatas = {}
|
MAX_LINE = 5000
|
with open(path, 'r') as file:
|
file.seek(0, 2)
|
end_position = file.tell()
|
lines_to_read = MAX_LINE + 1 # 包括最后一行的换行符
|
|
lines = []
|
while len(lines) < lines_to_read and end_position > 0:
|
# 向前移动一个字符
|
end_position -= 1
|
file.seek(end_position)
|
# 如果字符是换行符,说明到达一行的末尾
|
if file.read(1) == '\n':
|
line = file.readline()
|
if line:
|
lines.append(line)
|
|
lines.reverse()
|
for line in lines:
|
start_index = line.find("]")
|
line = line[start_index + 1:].strip()
|
data = eval(line)
|
fdatas[data[0]] = data
|
return fdatas
|
|
|
# 加载买入得分记录
|
def load_market_info(date=tool.get_now_date_str()):
|
path = f"{constant.get_path_prefix()}/{constant.LOG_DIR}/huaxin_local/l2/market.{date}.log"
|
fdatas = {}
|
with open(path, 'r') as f:
|
lines = f.readlines()
|
for line in lines:
|
start_index = line.find("]")
|
line = line[start_index + 1:].strip()
|
data = eval(line)
|
code = data[0]
|
if code not in fdatas:
|
fdatas[code] = []
|
fdatas[code].append(data)
|
return fdatas
|
|
|
def load_transactions(date=tool.get_now_date_str()):
|
path = f"{constant.get_path_prefix()}/{constant.LOG_DIR}/huaxin_local/l2/transaction.{date}.log"
|
fdatas = []
|
with open(path, 'r') as file:
|
lines = file.readlines()
|
for line in lines:
|
start_index = line.find("]")
|
if start_index < 0:
|
continue
|
line = line[start_index + 1:].strip()
|
data = eval(line)
|
fdatas.append(data)
|
return fdatas
|
|
|
def load_big_buy_order(date=tool.get_now_date_str()):
|
"""
|
获取大买单
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/{constant.LOG_DIR}/huaxin/l2/transaction_big_buy.{date}.log"
|
fdatas = {}
|
with open(path, 'r') as file:
|
lines = file.readlines()
|
for line in lines:
|
start_index = line.find("]")
|
if start_index < 0:
|
continue
|
line = line[start_index + 1:].strip()
|
ds = line.split("#")
|
code = ds[0].strip()
|
data = eval(ds[1])
|
if code not in fdatas:
|
fdatas[code] = []
|
fdatas[code].append(data)
|
return fdatas
|
|
|
def load_kpl_blocks(date=tool.get_now_date_str()):
|
fdatas = {}
|
path = f"{constant.get_path_prefix()}/{constant.LOG_DIR}/gp/kpl/kpl_blocks.{date}.log"
|
with open(path, 'r', encoding='utf-8') as file:
|
lines = file.readlines()
|
for line in lines:
|
content = line[line.find(']') + 1:]
|
ds = content.split("#")
|
code = ds[0].strip()
|
data_str = ds[1].strip()
|
blocks = eval(data_str)
|
fdatas[code] = blocks
|
return fdatas
|
|
|
def load_order_ref_strategy(date=tool.get_now_date_str()):
|
"""
|
加载下单的order_ref对应的策略
|
:param date:
|
:return:
|
"""
|
fdatas = {}
|
path = f"{constant.get_path_prefix()}/{constant.LOG_DIR}/gp/trade/trade.{date}.log"
|
with open(path, 'r', encoding='utf-8') as file:
|
lines = file.readlines()
|
for line in lines:
|
if line.find("可转载策略下单结果") > -1:
|
line = line.split("可转载策略下单结果:")[1]
|
data = eval(line)
|
fdatas[data[0]] = data[1]
|
return fdatas
|
|
|
def load_first_limit_up_time(date=tool.get_now_date_str()):
|
"""
|
加载下单的order_ref对应的策略
|
:param date:
|
:return:
|
"""
|
fdatas = {}
|
path = f"{constant.get_path_prefix()}/{constant.LOG_DIR}/gp/market/first_limit_up_time.{date}.log"
|
with open(path, 'r', encoding='utf-8') as file:
|
lines = file.readlines()
|
for line in lines:
|
if line.find("]") > -1:
|
line = line[line.find("]") + 1:]
|
data = eval(line)
|
fdatas[data[0]] = data[1]
|
return fdatas
|
|
|
if __name__ == "__main__":
|
fdatas = load_order_ref_strategy()
|
print(fdatas)
|
|
pass
|