"""
|
历史K线管理
|
"""
|
import copy
|
import datetime
|
import os
|
import threading
|
|
import constant
|
from code_attribute import gpcode_manager
|
from huaxin_client import l1_subscript_codes_manager
|
from log_module.log import logger_debug
|
from third_data import history_k_data_util
|
from utils import tool, init_data_util
|
|
|
def update_history_k_bars():
|
"""
|
更新历史K线
|
@return: 此次更新的数量
|
"""
|
|
def update(codes_):
|
for code in codes_:
|
try:
|
datas = init_data_util.get_volumns_by_code(code, 150)
|
logger_debug.info(f"获取到的K线:{datas}")
|
if datas:
|
HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas)
|
except Exception as e:
|
logger_debug.exception(e)
|
|
previous_trading_date = history_k_data_util.HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
|
if previous_trading_date is None:
|
raise Exception("上一个交易日获取失败")
|
# 刷新目标代码的自由流通量
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
|
codes = set()
|
if codes_sh:
|
for code_byte in codes_sh:
|
codes.add(code_byte.decode())
|
for code_byte in codes_sz:
|
codes.add(code_byte.decode())
|
# 获取已经更新的数据
|
codes_record = HistoryKDataManager().get_history_bars_codes(previous_trading_date)
|
codes = codes - codes_record
|
threading.Thread(target=lambda: update(codes), daemon=True).start()
|
|
return len(codes)
|
|
|
def re_set_price_pres(codes, force=False):
|
day = tool.get_now_date_str()
|
# 通过历史数据缓存获取
|
not_codes = []
|
for code in codes:
|
pre_close = HistoryKDataManager().get_pre_close(code, day)
|
if pre_close is not None:
|
gpcode_manager.CodePrePriceManager.set_price_pre(code, pre_close, force)
|
else:
|
not_codes.append(code)
|
if not_codes:
|
init_data_util.re_set_price_pres(not_codes, force)
|
|
|
class HistoryKDataManager:
|
__instance = None
|
__db = 0
|
__history_k_day_datas = {}
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(HistoryKDataManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_data()
|
return cls.__instance
|
|
@classmethod
|
def __load_data(cls):
|
pass
|
|
def __get_cache_dir(self):
|
"""
|
获取缓存路径
|
@return:
|
"""
|
dir_path = f"{constant.get_path_prefix()}/datas/k_bars"
|
if not os.path.exists(dir_path):
|
os.makedirs(dir_path)
|
return dir_path
|
|
def __del_outdate_datas(self, code):
|
dir_path = self.__get_cache_dir()
|
datas = []
|
for root, dirs, files in os.walk(dir_path):
|
for file in files:
|
# 输出文件的绝对路径
|
if file.find(code) < 0:
|
continue
|
path_ = os.path.join(root, file)
|
day = file.split("_")[0]
|
datas.append((day, path_))
|
# 保存最新的一条数据
|
if datas:
|
datas.sort(key=lambda x: int(x[0].replace("-", "")), reverse=True)
|
datas = datas[1:]
|
for d in datas:
|
os.remove(d[1])
|
|
def save_history_bars(self, code, day, datas, force=False):
|
"""
|
保存历史K线
|
@param code: 代码
|
@param day: K线最新的日期(取datas最新一条数据的日期)
|
@param datas: 数据
|
@return:
|
"""
|
cache_dir = self.__get_cache_dir()
|
file_name = f"{day}_{code}.txt"
|
path_str = f"{cache_dir}/{file_name}"
|
if os.path.exists(path_str) and not force:
|
return
|
if day not in self.__history_k_day_datas:
|
self.__history_k_day_datas[day] = {}
|
if datas:
|
self.__history_k_day_datas[day][code] = datas
|
# 将日期格式化
|
fdatas = []
|
for d in datas:
|
dd = copy.deepcopy(d)
|
for k in dd:
|
if type(dd[k]) == datetime.datetime:
|
dd[k] = dd[k].strftime("%Y-%m-%d %H:%M:%S")
|
fdatas.append(dd)
|
with open(path_str, encoding="utf-8", mode='w') as f:
|
f.write(f"{fdatas}")
|
self.__del_outdate_datas(code)
|
|
def get_history_bars(self, code, day):
|
"""
|
获取历史K线
|
@param code:
|
@param day:
|
@return:
|
"""
|
if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
|
return self.__history_k_day_datas[day][code]
|
cache_dir = self.__get_cache_dir()
|
file_name = f"{day}_{code}.txt"
|
path_str = f"{cache_dir}/{file_name}"
|
if not os.path.exists(path_str):
|
return None
|
with open(path_str, encoding="utf-8", mode='r') as f:
|
line = f.readline()
|
if line:
|
datas = eval(line)
|
# 将日期格式转为datetime
|
for d in datas:
|
for k in d:
|
if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0:
|
d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S")
|
return datas
|
return None
|
|
def get_pre_close(self, code, day):
|
"""
|
获取之前的收盘价
|
@param code:
|
@param day:
|
@return:
|
"""
|
if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
|
return self.__history_k_day_datas[day][code][0]["close"]
|
return None
|
|
def get_history_bars_codes(self, day):
|
"""
|
获取某一天的历史K线的代码数据
|
@param day:
|
@return: 代码集合
|
"""
|
|
dir_path = self.__get_cache_dir()
|
codes = set()
|
for root, dirs, files in os.walk(dir_path):
|
for file in files:
|
# 输出文件的绝对路径
|
if file.find(day) >= 0:
|
codes.add(file.split("_")[1][:6])
|
return codes
|