Administrator
4 天以前 a90797eb2353afb033a22f50024214e3b439692a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
数据下载器类,负责从各种数据源下载金融数据
"""
import datetime
import os
import json
from typing import Dict, List, Tuple
 
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
from strategy.strategy_variable_factory import DataLoader
from third_data.history_k_data_util import JueJinLocalApi
 
 
class DataDownloader:
    """
    数据下载器类,负责集中管理各类金融数据的下载逻辑
    """
 
    def __init__(self, target_day, trade_days: list, cache_dir: str = "D:/datas"):
        """
        初始化数据下载器
        :param cache_dir: 本地缓存目录
        """
        self.target_day = target_day
        self.cache_dir = cache_dir
        self.trade_days = trade_days
        os.makedirs(self.cache_dir, exist_ok=True)
        self.juejin_local_api = JueJinLocalApi("41c4f5da-2591-11f0-a9c9-f4b5203f67bf",
                                               "018db265fa34e241dd6198b7ca507ee0a82ad029")
 
    def download_kline_data(self, codes) -> Dict:
        """
        下载日K线数据
        :param codes: 股票代码
        :return: K线数据字典
        """
        for code in codes:
            cache_file = os.path.join(self.cache_dir, "k_bars", f"{self.target_day}_{code}.json")
            if os.path.exists(cache_file):
                continue
            results = self.juejin_local_api.get_history_tick_n(code, 120, end_date=f"{self.target_day} 09:00:00",
                                                               fields="sec_id,open,high,low,close,volume,pre_close,bob,amount")
            for result in results:
                for k in result:
                    if type(result[k]) == datetime.datetime:
                        result[k] = result[k].strftime('%Y-%m-%d %H:%M:%S')
            # 保存
            with open(cache_file, mode='w', encoding='utf-8') as f:
                f.write(json.dumps(results))
 
    def download_minute_data(self, codes) -> dict:
        """
        下载分钟K线数据
        :param code: 股票代码
        :param date: 日期,格式"YYYY-MM-DD"
        :return: 分钟K线数据字典
        """
        for code in codes:
            bar_dir = os.path.join(self.cache_dir, "bar_60s", f"{self.trade_days[0]}_{code}\\")
            if not os.path.exists(bar_dir):
                os.makedirs(bar_dir, exist_ok=True)
            # 每10个交易日处理一次
            page_size = 20
            for i in range(len(self.trade_days) // page_size):
                end_trade_day = self.trade_days[i * page_size]
                start_trade_day = self.trade_days[i * page_size + page_size - 1]
                if os.path.exists(os.path.join(bar_dir, f"{end_trade_day}.txt")) and os.path.exists(
                        os.path.join(bar_dir, f"{start_trade_day}.txt")):
                    continue
                # 保存K线数据
                results = self.juejin_local_api.history(code, start_time=f"{start_trade_day} 09:24:00",
                                                        end_time=f"{end_trade_day} 15:00:00", frequency="60s")
                day_bars_dict = {}
                for result in results:
                    for k in result:
                        if type(result[k]) == datetime.datetime:
                            result[k] = result[k].strftime('%Y-%m-%d %H:%M:%S')
                    # 按日期
                    day = result["bob"][:10]
                    if day not in day_bars_dict:
                        day_bars_dict[day] = []
                    day_bars_dict[day].append(result)
                for day in day_bars_dict:
                    if os.path.exists(os.path.join(bar_dir, f"{day}.txt")):
                        continue
                    # 保存Tick线
                    with open(os.path.join(bar_dir, f"{day}.txt"), encoding='utf-8', mode='w') as f:
                        f.write(f"{day_bars_dict[day]}")
 
    def download_tick_data(self, codes, show_log=False):
        """
        下载今日TICK数据
        @param codes:
        @return:
        """
 
        def download(batch_codes):
            excute_codes = set()
            for code in batch_codes:
                try:
                    tick_path = os.path.join(self.cache_dir, "ticks", f"{self.target_day}_{code}.txt")
                    if os.path.exists(tick_path):
                        continue
                    # 保存K线数据
                    results = self.juejin_local_api.get_history_tick_n(code, 5000,
                                                                       end_date=f"{self.target_day} 11:30:00",
                                                                       frequency="tick",
                                                                       fields="symbol,open,high,low,price,cum_volume,cum_amount,last_volume,last_amount,created_at")
                    for i in range(len(results)):
                        r = results[i]
                        if r["created_at"].strftime("%Y-%m-%d") >= self.target_day:
                            results = results[i:]
                            break
                    # 保存Tick线
                    with open(tick_path, encoding='utf-8', mode='w') as f:
                        f.write(f"{results}")
                    print("剩余数量:", len(set(batch_codes) - excute_codes))
                finally:
                    excute_codes.add(code)
                    if show_log:
                        print("剩余数量:", len(set(batch_codes) - excute_codes))
 
        # 采用dask平行执行download方法
        if len(codes) > 10000:
            from dask import delayed, compute
            batch_size = 50
            codes = list(codes)
            batches = [codes[i:i + batch_size] for i in range(0, len(codes), batch_size)]
            delayed_tasks = [delayed(download)(batch) for batch in batches]
            compute(*delayed_tasks)
        else:
            download(codes)
 
 
if __name__ == "__main__":
    day = "2025-06-05"
    codes, code_pre_close_dict = DataLoader(day).load_target_codes()
    juejin_local_api = JueJinLocalApi("41c4f5da-2591-11f0-a9c9-f4b5203f67bf",
                                      "018db265fa34e241dd6198b7ca507ee0a82ad029")
    data_loader = DataLoader(day)
    trade_days = data_loader.load_trade_days()
    __DataDownloader = DataDownloader(day, trade_days)
 
    # special_codes = LowSuctionOriginDataExportManager(day).export_special_codes()
    # fcodes = set()
    # for codes in [special_codes[p] for p in special_codes]:
    #     fcodes |= codes
    # codes = fcodes
    codes = {'002981', '002317', '600468', '000759', '605138', '301076', '300892', '603315', '002640', '002114', '601608', '001316', '000633', '002995', '600530', '001300', '002331', '600279', '601319', '300542', '002034', '605598', '002227', '600543', '600119', '601068', '600403', '600397', '300204', '603056', '301024', '000639', '002809', '605069', '002780', '002105', '000523', '601908', '002131', '600185', '002549', '600172', '002286', '603185', '603686', '001317', '600284', '002688', '603579', '603086', '600986', '002955', '600551', '600668', '603266', '002849', '000566', '603629', '000066', '600308', '000617', '002365', '001279', '603011', '603332', '001258', '002733', '300422', '000573', '300805', '600735', '600793', '603109', '601008', '605151', '002250', '603090', '000859', '002378', '002190', '001256', '600301', '600391', '002819', '300947', '603072', '002977', '002510', '002251', '002196', '600859', '603329', '000025', '605033', '605136', '603677', '605179', '605286', '000510', '600250', '600865', '000539', '002800', '002162', '300995', '603331', '603797', '603214', '002095', '000981', '002943', '603359', '002757', '002255', '603709', '603016', '603693', '003018', '300945', '000756', '300106', '000612', '000677', '002806', '301156', '001206', '605588', '002961', '301392', '000620', '000681', '600693', '603776', '600697', '000016', '002760', '600463', '002347', '600676', '002045', '600589', '603192', '601002', '002431', '600598', '001336', '601890', '002882', '000026', '600738', '605169', '300963', '000880', '002909', '000665', '000420', '002232', '605208', '002291', '603958', '000062', '603607', '002761', '002278', '301079', '600410', '002537', '000712', '300804', '002151', '002957', '605303', '600149', '002915', '002467', '000561', '002583', '600493', '603335', '000862', '000402', '002571', '002130', '000592', '603767', '002205', '600876', '600053', '002086', '605118', '600262', '002626', '001333', '002795', '001318', '600644', '300530', '002598', '600239', '002773', '002366', '605188', '003030', '002173', '000670', '001328', '002484', '002631', '002165', '600396', '002048', '000632', '002560', '300678', '000813', '002667', '600448', '002369', '603366', '000017', '603506', '001268', '002183', '002261', '002724', '603488', '002735', '300961', '605388', '000007', '002686', '603103', '601956', '603477', '600770', '000014', '605318', '600379', '603065', '603681', '603123', '603822', '601579', '002272', '600540', '603956', '301108', '000722', '603657', '603637', '603108', '600337', '603390', '000626', '603205', '003009', '301335', '600370', '603273', '001202', '603949', '002137', '600774', '301225', '603194', '000710', '000815', '605228', '600510', '603188', '002878', '000953', '002471', '002134', '301301', '002696', '002639', '000599', '002843', '000948', '600965', '603758', '001212', '003003', '002300', '600794', '603578', '601069', '002813', '603336', '301066', '001367', '001337', '002490', '001259', '600371', '002836', '603688', '002229', '603586', '002546', '002163', '603777', '600300', '300946', '001269', '001222', '002343', '002565', '600800', '603083', '000601', '002590', '300884', '000695', '000863', '603639', '600798', '002338', '603215', '002436', '002853', '603605', '002772', '003020', '600281', '002533', '002905', '002682', '605155', '002564', '600882', '300941', '002732', '600320', '002570', '000045', '001330', '603616', '600495', '301317', '002562', '603768', '002963', '600137', '603326', '600650', '001395', '300994', '002982', '603178', '002927', '002166', '600212', '002862', '601083', '600744', '603353', '001230', '002861', '600318', '002842', '002580', '600805', '002584', '600828', '300169', '603518', '002574', '002132', '600593', '002551', '002946', '601825', '000997', '600187', '002310', '002730', '002900', '605255', '603333', '002453', '600841', '600207', '603232', '600616', '603788', '001234', '600280', '002526', '002235', '600571', '002651', '002364', '600539', '002133', '002065', '002084', '603169', '600830', '000505', '605100', '600610', '300581', '002811', '002907', '002194', '603696', '002846', '000558', '603006', '603955', '002920', '603931', '002522', '603150', '601022', '603267', '603618', '300703', '002204', '002068', '600590', '002678', '600698', '002719', '600505', '605055', '002501', '000965', '300773', '000514', '603890', '000655', '603286', '601595', '301316', '002633', '603839', '603040', '002530', '603610', '002104', '002553', '002374', '603630', '000678', '002040', '002094'}
 
    __DataDownloader.download_tick_data(codes)
    # __DataDownloader.download_minute_data(codes)