import datetime
|
import hashlib
|
import json
|
import logging
|
import os
|
import time
|
|
import constant
|
from log_module.log import printlog
|
from utils import tool
|
|
|
class LogUtil:
|
@classmethod
|
def extract_log_from_key(cls, key, path, target_path):
|
fw = open(target_path, mode='w', encoding="utf-8")
|
try:
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
if line.find("{}".format(key)) > 0:
|
fw.write(line)
|
finally:
|
fw.close()
|
|
|
# 获取日志时间
|
def __get_log_time(line):
|
time_ = line.split("|")[0].split(" ")[1].split(".")[0]
|
return time_
|
|
|
def __get_async_log_time(line):
|
line = line.split(" - ")[1]
|
time_str = line[line.find("[") + 1:line.find("[") + 9]
|
if time_str.replace(":", "").replace(".", "").isdigit():
|
return time_str
|
return None
|
|
|
__log_file_contents = {}
|
|
|
# 加载文件内容
|
def __load_file_content(path_str, expire_timespace=20):
|
md5 = hashlib.md5(path_str.encode(encoding='utf-8')).hexdigest()
|
if md5 in __log_file_contents and time.time() - __log_file_contents[md5][0] < expire_timespace:
|
return __log_file_contents[md5][1]
|
contents = []
|
if os.path.exists(path_str):
|
with open(path_str, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
contents.append(line)
|
__log_file_contents[md5] = (time.time(), contents)
|
return contents
|
|
|
# 加载板块强度日志
|
def load_market_sift_plate(date=tool.get_now_date_str()):
|
"""
|
获取精选流入的成分股
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/market_sift_plate.{date}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
if line:
|
time_str = __get_log_time(line)
|
try:
|
data = line.split(" - ")[1].strip()
|
data_dict = eval(data)
|
fdatas.append((time_str, data_dict))
|
except:
|
pass
|
return fdatas
|
|
|
# 加载个股强度日志
|
def load_kpl_market_stock_heat(date=tool.get_now_date_str()):
|
"""
|
获取精选流入的成分股
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/stock_of_markets_plate.{date}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
if line:
|
time_str = __get_async_log_time(line)
|
try:
|
data = line.split(" - ")[1].strip()
|
if data.startswith("["):
|
data = data[data.find("]") + 1:].strip()
|
data_dict = eval(data)
|
fdatas.append((time_str, data_dict))
|
except:
|
pass
|
return fdatas
|
|
|
def load_kpl_market_strong(date=tool.get_now_date_str()):
|
"""
|
获取开盘啦历史强度
|
:param date:
|
:return: [("时间","分数")]
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/Overall_market_strength_score.{date}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
if line:
|
time_str = __get_async_log_time(line)
|
try:
|
data = line.split(" - ")[1].strip()
|
if data.startswith("["):
|
data = data[data.find("]") + 1:].strip()
|
fdatas.append((time_str, int(data)))
|
except:
|
pass
|
return fdatas
|
|
|
def load_target_codes_info(date=tool.get_now_date_str()):
|
"""
|
获取开盘啦历史强度
|
:param date:
|
:return: [("时间","分数")]
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/gp/codes/target_codes.{date}.log"
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
if lines:
|
line = lines[0]
|
if line:
|
try:
|
data = line.split(" - ")[1].strip()
|
if data.startswith("[") and data.find("]") < 20:
|
data = data[data.find("]") + 1:].strip()
|
return eval(data)
|
except:
|
pass
|
return None
|
|
|
def load_k_bars(date=tool.get_now_date_str()):
|
"""
|
加载K线数据
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/local_storage_data/all_stocks_all_K_line_property_dict.{date}.json"
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
return json.loads(lines[0])
|
return None
|
|
|
def load_kpl_code_plates(date=tool.get_now_date_str()):
|
"""
|
加载代码板块数据
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/kpl_code_plates.{date}.log"
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
if lines:
|
line = lines[0]
|
data = line.split(" - ")[1].strip()
|
if data.startswith("["):
|
data = data[data.find("]") + 1:].strip()
|
return eval(data)
|
return None
|
|
|
def load_kpl_limit_up_datas(date=tool.get_now_date_str()):
|
"""
|
加载开盘啦涨停数据
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/kpl_limit_up.{date}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
if lines:
|
for line in lines:
|
time_str = __get_async_log_time(line)
|
data = line.split(" - ")[1].strip()
|
if data.startswith("["):
|
data = data[data.find("]") + 1:].strip()
|
fdatas.append((time_str, eval(data)))
|
return fdatas
|
|
|
def load_stock_of_markets_plate_simple(start_time_str, end_time_str, date=tool.get_now_date_str()):
|
"""
|
加载开盘啦精选板块及其代码
|
:param end_time_str: 结束时间
|
:param start_time_str: 开始时间
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/stock_of_markets_plate_simple.{date}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
while True:
|
line = f.readline()
|
if not line:
|
break
|
time_str = __get_async_log_time(line)
|
if time_str > end_time_str:
|
break
|
if start_time_str <= time_str <= end_time_str:
|
data = line.split(" - ")[1].strip()
|
if data.startswith("["):
|
data = data[data.find("]") + 1:].strip()
|
fdatas.append((time_str, eval(data)))
|
return fdatas
|
|
|
def load_deal_big_order(date=tool.get_now_date_str()):
|
"""
|
成交大单
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/huaxin/l2/transaction.{date}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
if lines:
|
for line in lines:
|
time_str = __get_async_log_time(line)
|
if time_str:
|
data = line.split(" - ")[1].strip()
|
if data.startswith("["):
|
data = data[data.find("]") + 1:].strip()
|
else:
|
time_str = __get_log_time(line)
|
data = line.split(" - ")[1].strip()
|
fdatas.append((time_str, eval(data)))
|
return fdatas
|
|
|
def load_ticks_data(min_time_str, max_time_str, date=tool.get_now_date_str()):
|
"""
|
成交大单
|
:param max_time_str:
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/huaxin/l2/marketdata.{date}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
while True:
|
line = f.readline()
|
if not line:
|
break
|
try:
|
time_str = __get_async_log_time(line)
|
if time_str:
|
data = line.split(" - ")[1].strip()
|
if data.startswith("["):
|
data = data[data.find("]") + 1:].strip()
|
else:
|
time_str = __get_log_time(line)
|
data = line.split(" - ")[1].strip()
|
if min_time_str <= time_str <= max_time_str:
|
fdatas.append((time_str, eval(data)))
|
if time_str > max_time_str:
|
break
|
except Exception as e:
|
logging.exception(e)
|
print(line)
|
print(fdatas[-1])
|
return fdatas
|
|
|
def load_forbidden_plates_data(date=tool.get_now_date_str()):
|
"""
|
禁止买入的板块
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/forbidden_plates.{date}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
if lines:
|
for line in lines:
|
time_str = __get_async_log_time(line)
|
if time_str:
|
data = line.split(" - ")[1].strip()
|
if data.startswith("["):
|
data = data[data.find("]") + 1:].strip()
|
else:
|
time_str = __get_log_time(line)
|
data = line.split(" - ")[1].strip()
|
fdatas.append((time_str, data.split("-")[0].strip(), data.split("-")[1].strip()))
|
return fdatas
|
|
|
def load_virtual_trade_account(date=tool.get_now_date_str()):
|
"""
|
加载虚拟交易数据
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/gp/virtual_account/virtual_account_money_records.{date}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
if lines:
|
for line in lines:
|
time_str = __get_async_log_time(line)
|
data = line[line.find("]") + 1:].strip()
|
fdatas.append((time_str, eval(data)))
|
return fdatas
|
|
|
if __name__ == '__main__':
|
load_k_bars('2025-07-03')
|
datas = load_ticks_data("09:50:00")
|
fdatas = []
|
for data in datas:
|
# (距离09:15:00的秒数, 时间, 强度)
|
if "11:30:00" <= data[0] <= "13:00:00":
|
continue
|
fdatas.append([tool.trade_time_sub(data[0], "09:15:00"), data[0], data[1]])
|
print(fdatas)
|