"""
|
数据下载器类,负责从各种数据源下载金融数据
|
"""
|
import datetime
|
import os
|
import json
|
from typing import Dict, List, Tuple
|
|
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
|
from strategy.strategy_variable_factory import DataLoader
|
from third_data.history_k_data_util import JueJinLocalApi
|
|
|
class DataDownloader:
|
"""
|
数据下载器类,负责集中管理各类金融数据的下载逻辑
|
"""
|
|
def __init__(self, target_day, trade_days: list, cache_dir: str = "D:/datas"):
|
"""
|
初始化数据下载器
|
:param cache_dir: 本地缓存目录
|
"""
|
self.target_day = target_day
|
self.cache_dir = cache_dir
|
self.trade_days = trade_days
|
os.makedirs(self.cache_dir, exist_ok=True)
|
self.juejin_local_api = JueJinLocalApi("41c4f5da-2591-11f0-a9c9-f4b5203f67bf",
|
"018db265fa34e241dd6198b7ca507ee0a82ad029")
|
|
def download_kline_data(self, codes) -> Dict:
|
"""
|
下载日K线数据
|
:param codes: 股票代码
|
:return: K线数据字典
|
"""
|
for code in codes:
|
cache_file = os.path.join(self.cache_dir, "k_bars", f"{self.target_day}_{code}.json")
|
if os.path.exists(cache_file):
|
continue
|
results = self.juejin_local_api.get_history_tick_n(code, 120, end_date=f"{self.target_day} 09:00:00",
|
fields="sec_id,open,high,low,close,volume,pre_close,bob,amount")
|
for result in results:
|
for k in result:
|
if type(result[k]) == datetime.datetime:
|
result[k] = result[k].strftime('%Y-%m-%d %H:%M:%S')
|
# 保存
|
with open(cache_file, mode='w', encoding='utf-8') as f:
|
f.write(json.dumps(results))
|
|
def download_minute_data(self, codes) -> dict:
|
"""
|
下载分钟K线数据
|
:param code: 股票代码
|
:param date: 日期,格式"YYYY-MM-DD"
|
:return: 分钟K线数据字典
|
"""
|
for code in codes:
|
bar_dir = os.path.join(self.cache_dir, "bar_60s", f"{self.trade_days[0]}_{code}\\")
|
if not os.path.exists(bar_dir):
|
os.makedirs(bar_dir, exist_ok=True)
|
# 每10个交易日处理一次
|
page_size = 20
|
for i in range(len(self.trade_days) // page_size):
|
end_trade_day = self.trade_days[i * page_size]
|
start_trade_day = self.trade_days[i * page_size + page_size - 1]
|
if os.path.exists(os.path.join(bar_dir, f"{end_trade_day}.txt")) and os.path.exists(
|
os.path.join(bar_dir, f"{start_trade_day}.txt")):
|
continue
|
# 保存K线数据
|
results = self.juejin_local_api.history(code, start_time=f"{start_trade_day} 09:24:00",
|
end_time=f"{end_trade_day} 15:00:00", frequency="60s")
|
day_bars_dict = {}
|
for result in results:
|
for k in result:
|
if type(result[k]) == datetime.datetime:
|
result[k] = result[k].strftime('%Y-%m-%d %H:%M:%S')
|
# 按日期
|
day = result["bob"][:10]
|
if day not in day_bars_dict:
|
day_bars_dict[day] = []
|
day_bars_dict[day].append(result)
|
for day in day_bars_dict:
|
if os.path.exists(os.path.join(bar_dir, f"{day}.txt")):
|
continue
|
# 保存Tick线
|
with open(os.path.join(bar_dir, f"{day}.txt"), encoding='utf-8', mode='w') as f:
|
f.write(f"{day_bars_dict[day]}")
|
|
def download_tick_data(self, codes):
|
"""
|
下载今日TICK数据
|
@param codes:
|
@return:
|
"""
|
|
def download(batch_codes):
|
excute_codes = set()
|
for code in batch_codes:
|
try:
|
tick_path = os.path.join(self.cache_dir, "ticks", f"{self.target_day}_{code}.txt")
|
if os.path.exists(tick_path):
|
continue
|
# 保存K线数据
|
results = self.juejin_local_api.get_history_tick_n(code, 5000,
|
end_date=f"{self.target_day} 11:30:00",
|
frequency="tick",
|
fields="symbol,open,high,low,price,cum_volume,cum_amount,last_volume,last_amount,created_at")
|
for i in range(len(results)):
|
r = results[i]
|
if r["created_at"].strftime("%Y-%m-%d") >= self.target_day:
|
results = results[i:]
|
break
|
# 保存Tick线
|
with open(tick_path, encoding='utf-8', mode='w') as f:
|
f.write(f"{results}")
|
finally:
|
excute_codes.add(code)
|
print("剩余数量:", len(set(batch_codes) - excute_codes))
|
|
# 采用dask平行执行download方法
|
if len(codes) > 10000:
|
from dask import delayed, compute
|
batch_size = 50
|
codes = list(codes)
|
batches = [codes[i:i + batch_size] for i in range(0, len(codes), batch_size)]
|
delayed_tasks = [delayed(download)(batch) for batch in batches]
|
compute(*delayed_tasks)
|
else:
|
download(codes)
|
|
|
if __name__ == "__main__":
|
day = "2025-05-12"
|
codes, code_pre_close_dict = DataLoader(day).load_target_codes()
|
juejin_local_api = JueJinLocalApi("41c4f5da-2591-11f0-a9c9-f4b5203f67bf",
|
"018db265fa34e241dd6198b7ca507ee0a82ad029")
|
data_loader = DataLoader(day)
|
trade_days = data_loader.load_trade_days()
|
__DataDownloader = DataDownloader(day, trade_days)
|
|
special_codes = LowSuctionOriginDataExportManager(day).export_special_codes()
|
fcodes = set()
|
for codes in [special_codes[p] for p in special_codes]:
|
fcodes |= codes
|
codes = fcodes
|
|
__DataDownloader.download_tick_data(codes)
|
# __DataDownloader.download_minute_data(codes)
|