"""
|
历史K线管理
|
"""
|
import copy
|
import datetime
|
import os
|
import threading
|
|
import constant
|
from code_attribute import gpcode_manager
|
from huaxin_client import l1_subscript_codes_manager
|
from log_module.log import logger_debug
|
from third_data import history_k_data_util
|
from third_data.history_k_data_util import HistoryKDatasUtils
|
from utils import tool, init_data_util
|
|
|
def update_history_k_bars():
|
"""
|
更新历史K线
|
@return: 此次更新的数量
|
"""
|
|
def update(codes_):
|
for code in codes_:
|
try:
|
datas = init_data_util.get_volumns_by_code(code, 150)
|
if datas:
|
HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas)
|
except Exception as e:
|
logger_debug.exception(e)
|
|
latest_trading_date = history_k_data_util.get_k_bar_dead_date()
|
# 刷新目标代码的自由流通量
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
|
codes = set()
|
if codes_sh:
|
for code_byte in codes_sh:
|
codes.add(code_byte.decode())
|
for code_byte in codes_sz:
|
codes.add(code_byte.decode())
|
# 获取已经更新的数据
|
codes_record = HistoryKDataManager().get_history_bars_codes(latest_trading_date)
|
codes = codes - codes_record
|
threading.Thread(target=lambda: update(codes), daemon=True).start()
|
|
return len(codes)
|
|
|
def re_set_price_pres(codes, force=False):
|
day = tool.get_now_date_str()
|
# 通过历史数据缓存获取
|
not_codes = []
|
for code in codes:
|
pre_close = HistoryKDataManager().get_pre_close(code, day)
|
if pre_close is not None:
|
gpcode_manager.CodePrePriceManager.set_price_pre(code, pre_close, force)
|
else:
|
not_codes.append(code)
|
if not_codes:
|
init_data_util.re_set_price_pres(not_codes, force)
|
|
|
class HistoryKDataManager:
|
__instance = None
|
__db = 0
|
__history_k_day_datas = {}
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(HistoryKDataManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_data()
|
return cls.__instance
|
|
@classmethod
|
def __load_data(cls):
|
pass
|
|
def __get_cache_dir(self):
|
"""
|
获取缓存路径
|
@return:
|
"""
|
dir_path = f"{constant.get_path_prefix()}/datas/k_bars"
|
if not os.path.exists(dir_path):
|
os.makedirs(dir_path)
|
return dir_path
|
|
def __del_outdate_datas(self, code):
|
dir_path = self.__get_cache_dir()
|
datas = []
|
for root, dirs, files in os.walk(dir_path):
|
for file in files:
|
# 输出文件的绝对路径
|
if file.find(code) < 0:
|
continue
|
path_ = os.path.join(root, file)
|
day = file.split("_")[0]
|
datas.append((day, path_))
|
# 保存最新的一条数据
|
if datas:
|
datas.sort(key=lambda x: int(x[0].replace("-", "")), reverse=True)
|
datas = datas[1:]
|
for d in datas:
|
os.remove(d[1])
|
|
def save_history_bars(self, code, day, datas, force=False):
|
"""
|
保存历史K线
|
@param code: 代码
|
@param day: K线最新的日期(取datas最新一条数据的日期)
|
@param datas: 数据
|
@return:
|
"""
|
cache_dir = self.__get_cache_dir()
|
file_name = f"{day}_{code}.txt"
|
path_str = f"{cache_dir}/{file_name}"
|
if os.path.exists(path_str) and not force:
|
return
|
if day not in self.__history_k_day_datas:
|
self.__history_k_day_datas[day] = {}
|
if datas:
|
self.__history_k_day_datas[day][code] = datas
|
# 将日期格式化
|
fdatas = []
|
for d in datas:
|
dd = copy.deepcopy(d)
|
for k in dd:
|
if type(dd[k]) == datetime.datetime:
|
dd[k] = dd[k].strftime("%Y-%m-%d %H:%M:%S")
|
fdatas.append(dd)
|
with open(path_str, encoding="utf-8", mode='w') as f:
|
f.write(f"{fdatas}")
|
self.__del_outdate_datas(code)
|
|
def get_history_bars(self, code, day):
|
"""
|
获取历史K线
|
@param code:
|
@param day:
|
@return:
|
"""
|
if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
|
return self.__history_k_day_datas[day][code]
|
cache_dir = self.__get_cache_dir()
|
file_name = f"{day}_{code}.txt"
|
path_str = f"{cache_dir}/{file_name}"
|
if not os.path.exists(path_str):
|
return None
|
with open(path_str, encoding="utf-8", mode='r') as f:
|
line = f.readline()
|
if line:
|
datas = eval(line)
|
# 将日期格式转为datetime
|
for d in datas:
|
for k in d:
|
if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0:
|
d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S")
|
return datas
|
return None
|
|
def get_pre_close(self, code, day):
|
"""
|
获取之前的收盘价
|
@param code:
|
@param day:
|
@return:
|
"""
|
if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
|
return self.__history_k_day_datas[day][code][0]["close"]
|
return None
|
|
def get_history_bars_codes(self, day):
|
"""
|
获取某一天的历史K线的代码数据
|
@param day:
|
@return: 代码集合
|
"""
|
|
dir_path = self.__get_cache_dir()
|
codes = set()
|
for root, dirs, files in os.walk(dir_path):
|
for file in files:
|
# 输出文件的绝对路径
|
if file.find(day) >= 0:
|
codes.add(file.split("_")[1][:6])
|
return codes
|
|
|
@tool.singleton
|
class TradeDateManager:
|
# {"日期":日期列表}
|
__date_list_dict = {}
|
"""
|
交易日期管理
|
"""
|
|
def __load_dates(self, now_day):
|
if self.__date_list_dict.get(now_day):
|
return
|
# 获取前后一个月的交易日期
|
start_date = tool.date_sub(now_day, 30)
|
end_date = tool.date_sub(now_day, -30)
|
dates = HistoryKDatasUtils.get_trading_dates(start_date, end_date)
|
self.__date_list_dict[now_day] = dates
|
|
def get_next_trade_day(self, now_day):
|
"""
|
获取下一个交易日
|
@param now_day:
|
@return:
|
"""
|
self.__load_dates(now_day)
|
for day in self.__date_list_dict[now_day]:
|
if day > now_day:
|
return day
|
return None
|
|
def get_previous_trade_day(self, now_day):
|
"""
|
获取上一个交易日
|
@param now_day:
|
@return:
|
"""
|
self.__load_dates(now_day)
|
for day in reversed(self.__date_list_dict[now_day]):
|
if day < now_day:
|
return day
|
return None
|
|
|
if __name__ == "__main__":
|
print(TradeDateManager().get_next_trade_day("2025-06-09"))
|
print(TradeDateManager().get_previous_trade_day("2025-06-09"))
|