Administrator
2025-05-12 82e8474625d9af933d6ab5825b43f6a248005010
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
from code_attribute import gpcode_manager, code_nature_analyse
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
from strategy.strategy_variable import StockVariables
from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory
from utils import tool, huaxin_util
 
 
class BackTest:
 
    def __init__(self, day):
        self.day = day
        scripts = ""
        with open("低吸脚本_辨识度.py", mode='r', encoding='utf-8') as f:
            lines = f.readlines()
            scripts = "\n".join(lines)
        # 注释掉里面的import与变量
        scripts = scripts.replace("from ", "#from ").replace("sv = ", "#sv =  ")
        self.scripts = scripts
        self.stock_variables_dict = {}
 
    def load_before_date_data_by_timeline(self, data_loader: DataLoader):
        """
        加载回测日期之前的K线数据与历史涨停数据
        :return: 按时间排序的数据列表
        """
        day = self.day
        trade_days = data_loader.load_trade_days()
        timeline_data = []
        # 加载历史数据
        kline_data = data_loader.load_kline_data()
        minute_data = {}  # data_loader.load_minute_data()
        limit_up_record_data = data_loader.load_limit_up_data()
        next_trade_day = data_loader.load_next_trade_day()
        if not trade_days:
            raise Exception("交易日历获取失败")
        if not kline_data:
            raise Exception("历史日K获取失败")
        if not kline_data:
            raise Exception("历史涨停获取失败")
 
        return {
            'date': day,
            'kline_data': kline_data,
            'minute_data': minute_data,
            'limit_up_record_data': limit_up_record_data,
            "trade_days": trade_days,
            "next_trade_day": next_trade_day
        }
 
    def load_current_date_data_by_timeline(self):
        """
        加载回测日期当天的数据,将这些数据根据秒切片
        :param day: 日期,格式为"YYYY-MM-DD
        :return: 按时间排序的数据列表
        """
        day = self.day
        fdata = {}
        __LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(day)
        all_limit_up_list = __LowSuctionOriginDataExportManager.export_limit_up_list()
        fdata["limit_up_list"] = {d[0][:8]: d[1] for d in all_limit_up_list}
        big_order_deals = __LowSuctionOriginDataExportManager.export_big_order_deal()
        if not big_order_deals:
            big_order_deals = __LowSuctionOriginDataExportManager.export_big_order_deal_by()
        # 转换格式为:{时间: [("代码", (买单号, 量, 金额, 时间, 最终成交价))]
        big_order_deals_dict = {}
        for code in big_order_deals:
            for order in big_order_deals[code]:
                time_str = huaxin_util.convert_time(order[3])
                d = (code, order)
                if time_str not in big_order_deals_dict:
                    big_order_deals_dict[time_str] = []
                big_order_deals_dict[time_str].append(d)
        for k in big_order_deals_dict:
            datas = big_order_deals_dict[k]
            datas.sort(key=lambda x: huaxin_util.convert_time(x[1][3], True))
        fdata["big_order"] = big_order_deals_dict
        big_sell_order_deals = __LowSuctionOriginDataExportManager.export_big_sell_order_deal()
        if not big_sell_order_deals:
            big_sell_order_deals = __LowSuctionOriginDataExportManager.export_big_sell_order_deal_by()
        big_sell_order_deals_dict = {}
        for code in big_sell_order_deals:
            for order in big_sell_order_deals[code]:
                time_str = huaxin_util.convert_time(order[3])
                d = (code, order)
                if time_str not in big_sell_order_deals_dict:
                    big_sell_order_deals_dict[time_str] = []
                big_sell_order_deals_dict[time_str].append(d)
        for k in big_sell_order_deals_dict:
            datas = big_sell_order_deals_dict[k]
            datas.sort(key=lambda x: huaxin_util.convert_time(x[1][3], True))
        fdata["big_sell_order"] = big_sell_order_deals_dict
 
        # 加载自由流通量
        zylt_volume_dict = __LowSuctionOriginDataExportManager.export_zylt_volume()
        fdata["zylt_volume"] = zylt_volume_dict
 
        code_plates_dict = __LowSuctionOriginDataExportManager.export_code_plates()
        fdata["code_plates"] = code_plates_dict
 
        special_codes = __LowSuctionOriginDataExportManager.export_special_codes()
        temp_code_plates = {}
        for plate in special_codes:
            for code in special_codes[plate]:
                if code not in temp_code_plates:
                    temp_code_plates[code] = set()
                temp_code_plates[code].add(plate)
        for code in temp_code_plates:
            code_plates_dict[code] = temp_code_plates[code]
 
        if not fdata["zylt_volume"]:
            raise Exception("无自由流通数据")
        if not fdata["code_plates"]:
            raise Exception("无板块数据")
        if not fdata["big_order"]:
            raise Exception("无大单数据")
        if not fdata["limit_up_list"]:
            raise Exception("无涨停数据")
 
        return fdata
 
    def load_current_tick_datas(self, data_loader: DataLoader):
        """
        加载Tick数据
        :param day: 日期,格式为"YYYY-MM-DD
        :return: Tick数据
        """
        code_tick_datas = data_loader.load_tick_data()
        # 根据时间集成
        fdata = {}
        for code in code_tick_datas:
            for tick in code_tick_datas[code]:
                __time_str = tick["created_at"][-8:]
                if __time_str not in fdata:
                    fdata[__time_str] = []
                fdata[__time_str].append(tick)
        if not fdata:
            raise Exception("无分时K线数据")
 
        return fdata
 
    def __run_backtest(self, code, stock_variables: StockVariables):
        """
        执行回测
        @param stock_variables:
        @return: 是否可以买
        """
        global_dict = {
            "sv": stock_variables,
            "target_code": code
        }
        exec(self.scripts, global_dict)
        return global_dict["compute_result"]
 
    def __filter_codes(self, current_data, timeline_data):
        code_plates = current_data["code_plates"]
        start_time, end_time = "09:25:00", "11:30:00"
        fplates = set()
        for i in range(60 * 60 * 5):
            time_str = tool.trade_time_add_second(start_time, i)
            if time_str > end_time:
                break
            # 统计当前涨停数据
            current_limit_up_list = current_data["limit_up_list"].get(time_str)
            if current_limit_up_list:
                # 统计板块涨停
                plate_codes_info = {}
                for x in current_limit_up_list:
                    plates = code_plates.get(x[0])
                    if plates:
                        for p in plates:
                            if p not in plate_codes_info:
                                plate_codes_info[p] = []
                            plate_codes_info[p].append((x[0], x[2]))
                # print(time_str, "汽车零部件", plate_codes_info.get("汽车零部件"))
                # 有效的板块
                valid_plates = set([p for p in plate_codes_info if len(plate_codes_info[p]) >= 2])
                fplates |= valid_plates
        codes = set([code for code in code_plates if code_plates[code] & fplates])
        fcodes = set()
        for c in codes:
            if c not in timeline_data["kline_data"]:
                continue
            # 自由流通市值30-300亿
            pre_close = timeline_data["kline_data"].get(c)[0]["close"]
            if 30e8 <= current_data["zylt_volume"].get(c) * pre_close <= 300e8:
                fcodes.add(c)
 
        return fcodes
 
    def __get_target_codes(self):
        special_codes = LowSuctionOriginDataExportManager(self.day).export_special_codes()
        fcodes = set()
        for codes in [special_codes[p] for p in special_codes]:
            fcodes |= codes
        return fcodes
 
    def init_stock_variables(self, code_, timeline_data, current_data):
        """
        初始化变量
        @param code_:
        @return:
        """
        if code_ in self.stock_variables_dict:
            return
        stock_variables = StrategyVariableFactory.create_from_history_data(
            timeline_data["kline_data"].get(code_), timeline_data["minute_data"].get(code_),
            timeline_data["limit_up_record_data"].get(code_), timeline_data["trade_days"])
        # 加载今日涨停价
        pre_close = timeline_data["kline_data"].get(code_)[0]["close"]
        stock_variables.今日涨停价 = round(float(gpcode_manager.get_limit_up_price_by_preprice(code_, pre_close)), 2)
        stock_variables.自由流通市值 = current_data["zylt_volume"].get(code_) * pre_close
        # 获取代码板块
        stock_variables.代码板块 = current_data["code_plates"].get(code_)
        is_price_too_high = code_nature_analyse.is_price_too_high_in_days(code_, timeline_data["kline_data"].get(code_),
                                                                          stock_variables.今日涨停价)
        # if is_price_too_high[0]:
        #     print("六个交易日涨幅过高", code_)
        stock_variables.六个交易日涨幅过高 = is_price_too_high[0]
        self.stock_variables_dict[code_] = stock_variables
 
    def run(self):
        data_loader = DataLoader(self.day)
        current_data = self.load_current_date_data_by_timeline()
        # 按时间轴加载数据
        timeline_data = self.load_before_date_data_by_timeline(data_loader)
        # TODO 输出目标代码
        fcodes = self.__get_target_codes()  # __filter_codes(current_data, timeline_data)
        # print(len(fcodes), fcodes)
        current_tick_data = self.load_current_tick_datas(data_loader)
        limit_up_record_data_dict = {}
        for limit_up_item in timeline_data["limit_up_record_data"]:
            if limit_up_item[0] not in limit_up_record_data_dict:
                limit_up_record_data_dict[limit_up_item[0]] = []
            limit_up_record_data_dict[limit_up_item[0]].append(limit_up_item)
        timeline_data["limit_up_record_data"] = limit_up_record_data_dict
        next_trade_day = timeline_data["next_trade_day"]
        start_time, end_time = "09:25:00", "12:00:00"
        # 分钟K线
        minute_bars_dict = {}
        code_plates = current_data["code_plates"]
        # 板块以及买了的代码:{"板块":{"000333"}}
        deal_block_codes = {}
        deal_codes = set()
        print("======", self.day)
        # 制造回测时间
        for i in range(60 * 60 * 5):
            time_str = tool.trade_time_add_second(start_time, i)
            if time_str > end_time:
                break
            ticks = current_tick_data.get(time_str)
            # 统计当前涨停数据
            current_limit_up_list = current_data["limit_up_list"].get(time_str)
            if current_limit_up_list:
                # 统计板块涨停
                plate_codes_info = {}
                for x in current_limit_up_list:
                    plates = code_plates.get(x[0])
                    if plates:
                        for p in plates:
                            if p not in plate_codes_info:
                                plate_codes_info[p] = []
                            plate_codes_info[p].append((x[0], x[2]))
            else:
                plate_codes_info = None
 
            # 当前时刻大单
            current_big_orders = current_data["big_order"].get(time_str)
            if current_big_orders:
                for big_order in current_big_orders:
                    # 格式 ("代码", (买单号, 量, 金额, 时间, 最终成交价))
                    self.init_stock_variables(big_order[0], timeline_data, current_data)
                    stock_variables: StockVariables = self.stock_variables_dict.get(big_order[0])
                    if stock_variables.今日大单数据 is None:
                        stock_variables.今日大单数据 = []
                    stock_variables.今日大单数据.append(big_order[1])
                    # 统计大单均价
                    order_ids = set()
                    total_money = 0
                    total_volume = 0
                    for order in reversed(stock_variables.今日大单数据):
                        if order[0] in order_ids:
                            continue
                        order_ids.add(order[0])
                        total_money += order[2]
                        total_volume += order[1]
                    if total_volume > 0:
                        stock_variables.今日大单均价 = round(total_money / total_volume, 2)
                    else:
                        stock_variables.今日大单均价 = 0
 
            current_big_sell_orders = current_data["big_sell_order"].get(time_str)
            if current_big_sell_orders:
                for big_order in current_big_sell_orders:
                    # 格式 ("代码", (买单号, 量, 金额, 时间, 最终成交价))
                    self.init_stock_variables(big_order[0], timeline_data, current_data)
                    stock_variables: StockVariables = self.stock_variables_dict.get(big_order[0])
                    if stock_variables.今日卖大单数据 is None:
                        stock_variables.今日卖大单数据 = []
                    stock_variables.今日卖大单数据.append(big_order[1])
            if ticks:
                for tick in ticks:
                    code = tick["symbol"][-6:]
                    if code not in fcodes:
                        continue
                    if code not in self.stock_variables_dict:
                        # 加载基础数据
                        self.init_stock_variables(code, timeline_data, current_data)
                    stock_variables: StockVariables = self.stock_variables_dict.get(code)
                    stock_variables.板块成交代码 = deal_block_codes
                    # 设置涨停数据
                    if plate_codes_info is not None:
                        stock_variables.板块涨停 = plate_codes_info
                    if code not in minute_bars_dict:
                        minute_bars_dict[code] = [tick]
                    if minute_bars_dict[code][-1]["created_at"][:-2] == tick["created_at"][:-2]:
                        # 统计分钟K线
                        minute_bars_dict[code][-1] = tick
                    else:
                        # 保存分钟K线最高价
                        if not stock_variables.今日最高价:
                            stock_variables.今日最高价 = minute_bars_dict[code][-1]["price"]
                        if minute_bars_dict[code][-1]["price"] > stock_variables.今日最高价:
                            stock_variables.今日最高价 = minute_bars_dict[code][-1]["price"]
                    # 保存开盘价
                    if tick["created_at"][-8:] < '09:30:00':
                        stock_variables.今日开盘价 = tick["price"]
                        # 今日开盘涨幅
                        stock_variables.今日开盘涨幅 = round((tick["price"] - stock_variables.昨日收盘价) / stock_variables.昨日收盘价,
                                                       4)
                    stock_variables.今日成交量 = tick["cum_volume"]
                    stock_variables.当前价 = tick["price"]
                    # 根据表达式计算是否可买
                    # compute_result = __run_backtest(code, stock_variables)
                    # # print("回测结果:",code, compute_result)
                    # if compute_result[0] and code not in deal_codes:
                    #     # TODO 下单
                    #     deal_codes.add(code)
                    #     print("======回测结果:", code, tick["created_at"], tick["price"], compute_result[2])
                    #     for b in compute_result[1]:
                    #         if b not in deal_block_codes:
                    #             deal_block_codes[b] = set()
                    #         deal_block_codes[b].add(code)
 
            # 大单驱动
            if current_big_orders:
                for big_order in current_big_orders:
                    code = big_order[0]
                    self.init_stock_variables(code, timeline_data, current_data)
                    stock_variables: StockVariables = self.stock_variables_dict.get(code)
                    compute_result = self.__run_backtest(code, stock_variables)
                    # print("回测结果:",code, compute_result)
                    # if code == '002640':
                    #     print(code, big_order, compute_result)
                    if compute_result[0] and code not in deal_codes:
                        # TODO 下单
                        deal_codes.add(code)
                        next_k_bars = data_loader.load_kline_data_by_day_and_code(next_trade_day, code)
                        current_k_bars = data_loader.load_kline_data_by_day_and_code(data_loader.now_day, code)
                        if next_k_bars:
                            t_rate = round((next_k_bars[0]["open"] - big_order[1][4]) * 100 / big_order[1][4], 2)
                            t_rate = f"{t_rate}%"
                        else:
                            t_rate = "未知"
                        if current_k_bars:
                            c_rate = round((current_k_bars[0]["close"] - big_order[1][4]) * 100 / big_order[1][4], 2)
                            c_rate = f"{c_rate}%"
                        else:
                            c_rate = "未知"
 
                        print("======回测结果:", code, f"溢价率:{t_rate},当日盈亏:{c_rate}", compute_result[2])
                        for b in compute_result[1]:
                            if b not in deal_block_codes:
                                deal_block_codes[b] = set()
                            deal_block_codes[b].add(code)
 
 
if __name__ == "__main__":
    days = ["2025-05-06", "2025-05-07", "2025-05-08", "2025-05-09", "2025-05-12"]
    days.reverse()
    for day in days:
        BackTest(day).run()