Administrator
4 天以前 607a43593418a5c4f22986dd0c86c90465bb401c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
"""
策略管理
"""
import json
 
from code_attribute import gpcode_manager, code_nature_analyse
from db import redis_manager_delegate as redis_manager
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util
from log_module.log import logger_trade, logger_debug
from strategy.data_analyzer import KPLLimitUpDataAnalyzer
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
from strategy.strategy_params_settings import StrategyParamsSettingsManager
from strategy.strategy_variable import StockVariables
from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory
import constant
from third_data import kpl_util
from trade.trade_manager import DealCodesManager
from utils import huaxin_util, tool
 
 
@tool.singleton
class TickSummaryDataManager:
    """
    Tick概要数据
    """
    __db = 13
 
    # 成交代码的订单信息:{代码:{交易id:(量,价格,系统订单号)}}
 
    def __init__(self):
        # 开盘价信息
        self.open_price_info_dict = {}
        # 最低价
        self.low_price_dict = {}
        # 最高价
        self.high_price_info_dict = {}
 
        self.musql = Mysqldb()
        self.redis_manager = redis_manager.RedisManager(self.__db)
        self.__load_data()
 
    def __get_redis(self):
        return self.redis_manager.getRedis()
 
    def __load_data(self):
        keys = RedisUtils.keys(self.__get_redis(), "tick_open_price_info-*")
        if keys:
            for k in keys:
                code = k.split("-")[1]
                val = RedisUtils.get(self.__get_redis(), k)
                if val:
                    self.open_price_info_dict[code] = json.loads(val)
        keys = RedisUtils.keys(self.__get_redis(), "tick_high_price-*")
        if keys:
            for k in keys:
                code = k.split("-")[1]
                val = RedisUtils.get(self.__get_redis(), k)
                if val:
                    self.high_price_info_dict[code] = json.loads(val)
 
        keys = RedisUtils.keys(self.__get_redis(), "tick_low_price-*")
        if keys:
            for k in keys:
                code = k.split("-")[1]
                val = RedisUtils.get(self.__get_redis(), k)
                if val:
                    self.low_price_dict[code] = round(float(val))
 
    def set_open_price_info(self, code, info):
        """
        设置开盘信息
        @param code:
        @param info:
        @return:
        """
        self.open_price_info_dict[code] = info
        RedisUtils.setex_async(self.__db, f"tick_open_price_info-{code}", tool.get_expire(), json.dumps(info))
 
    def set_high_price_info(self, code, info):
        """
        设置最高价
        @param code:
        @param info:(价格, 时间)
        @return:
        """
        self.high_price_info_dict[code] = info
        RedisUtils.setex_async(self.__db, f"tick_high_price-{code}", tool.get_expire(), json.dumps(info))
 
    def set_low_price(self, code, price):
        """
        设置最低价
        @param code:
        @param price:
        @return:
        """
        self.low_price_dict[code] = price
        RedisUtils.setex_async(self.__db, f"tick_low_price-{code}", tool.get_expire(), price)
 
 
class LowSuctionStrategy:
    """
    低吸策略
    """
 
    def __init__(self, day, script_name="strategy_script_v6.py",
                 settings=StrategyParamsSettingsManager().get_settings(), need_load_data = False):
        self.now_day = day
        # 买大单:{代码:[大单数据]}
        self.big_order_buy = {}
        # 卖大单
        self.big_order_sell = {}
        # 自由流通量
        self.zylt_volume_dict = {}
        # 历史日K数据
        self.kline_data = {}
        # 历史涨停数据
        self.limit_up_record_data = {}
        # 历史数据
        self.timeline_data = {}
        # 今日数据
        self.current_data = {}
        # 目标代码的板块
        self.code_plates_for_buy = {}
        # 代码的板块(常规)
        self.code_plates = {}
        # 下一个交易日
        self.next_trade_day = None
        # 目标代码
        self.fcodes = set()
        # 变量对象
        self.stock_variables_dict = {}
        # 当前涨停列表
        self.current_limit_up_list = []
        # 当前涨停的板块所包含的代码
        self.current_limit_up_plate_codes = {}
        # 当前板块净流入
        self.current_block_in_datas = []
 
        # 加载策略脚本文件
        with open(script_name if constant.is_windows() else f'{constant.get_path_prefix()}/{script_name}', mode='r',
                  encoding='utf-8') as f:
            lines = f.readlines()
            scripts = "\n".join(lines)
            # 注释掉里面的import与变量
            scripts = scripts.replace("from ", "#from ").replace("sv = ", "#sv =  ").replace("settings = ",
                                                                                             "#settings =  ").replace(
                "target_code = ", "#target_code =  ")
 
            self.scripts = scripts
        self.settings = settings
 
        self.data_loader = DataLoader(self.now_day)
        self.__LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(self.now_day)
        if need_load_data:
            self.load_data()
 
    def load_data(self):
        # 加载历史数据
        self.__load_before_date_data_by_timeline()
        self.__load_current_date_data_by_timeline()
        self.fcodes = set(self.code_plates_for_buy.keys()) & set(self.kline_data.keys())
 
    def __load_before_date_data_by_timeline(self):
        """
        加载回测日期之前的K线数据与历史涨停数据
        :return: 按时间排序的数据列表
        """
        trade_days = self.data_loader.trade_days
        # 加载历史数据
        self.kline_data = self.data_loader.load_kline_data()
        self.limit_up_record_data = self.data_loader.load_limit_up_data()
        self.next_trade_day = self.data_loader.load_next_trade_day()
        if not trade_days:
            raise Exception("交易日历获取失败")
        if not self.kline_data:
            raise Exception("历史日K获取失败")
        if not self.limit_up_record_data:
            raise Exception("历史涨停获取失败")
 
    def __load_current_date_data_by_timeline(self):
        """
        加载回测日期当天的数据,将这些数据根据秒切片
        :param day: 日期,格式为"YYYY-MM-DD
        :return: 按时间排序的数据列表
        """
        IS_BY_BIG_ORDER = False
        BIG_ORDER_MONEY_THRESHOLD = 200e4
        big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal(BIG_ORDER_MONEY_THRESHOLD)
        if not big_order_deals or IS_BY_BIG_ORDER:
            big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal_by(
                BIG_ORDER_MONEY_THRESHOLD)
        self.big_order_buy = big_order_deals
        # 转换格式为:{时间: [("代码", (买单号, 量, 金额, 时间, 最终成交价))]
 
        big_sell_order_deals = self.__LowSuctionOriginDataExportManager.export_big_sell_order_deal(
            BIG_ORDER_MONEY_THRESHOLD)
        if not big_sell_order_deals or IS_BY_BIG_ORDER:
            big_sell_order_deals = self.__LowSuctionOriginDataExportManager.export_big_sell_order_deal_by(
                BIG_ORDER_MONEY_THRESHOLD)
        self.big_order_sell = big_sell_order_deals
        # 加载自由流通量
        self.zylt_volume_dict = self.__LowSuctionOriginDataExportManager.export_zylt_volume()
 
        # 加载板块代码
        code_plates_dict = self.__LowSuctionOriginDataExportManager.export_code_plates()
 
        plate_codes = self.data_loader.load_target_plate_and_codes()
        code_plates_dict_for_buy = {}
        for p in plate_codes:
            for code in plate_codes.get(p):
                if code not in code_plates_dict_for_buy:
                    code_plates_dict_for_buy[code] = set()
                code_plates_dict_for_buy[code].add(p)
        self.code_plates_for_buy = code_plates_dict_for_buy
        self.code_plates = code_plates_dict
 
        if not self.zylt_volume_dict:
            raise Exception("无自由流通数据")
        if not self.code_plates:
            raise Exception("无板块数据")
        if not self.code_plates_for_buy:
            raise Exception("无目标票的买入数据")
 
    def __init_stock_variables(self, code_):
        """
        初始化变量
        @param code_:
        @return:
        """
        if code_ in self.stock_variables_dict:
            return
        stock_variables = StrategyVariableFactory.create_from_history_data(
            self.kline_data.get(code_), None,
            self.limit_up_record_data.get(code_), self.data_loader.trade_days)
 
        # 加载今日涨停价
        pre_close = self.kline_data.get(code_)[0]["close"]
        stock_variables.今日涨停价 = round(float(gpcode_manager.get_limit_up_price_by_preprice(code_, pre_close)), 2)
        stock_variables.自由流通市值 = self.zylt_volume_dict.get(code_) * pre_close
        # 获取代码板块
        stock_variables.代码板块 = self.code_plates_for_buy.get(code_)
        is_price_too_high = code_nature_analyse.is_price_too_high_in_days(code_, self.kline_data.get(code_),
                                                                          stock_variables.今日涨停价)
        stock_variables.六个交易日涨幅过高 = is_price_too_high[0]
        stock_variables.辨识度代码 = self.fcodes
        for day in [2, 5, 10, 30, 60, 120]:
            days = self.data_loader.trade_days[:day]
            stock_variables.__setattr__(f"日出现的板块_{day}",
                                        KPLLimitUpDataAnalyzer.get_limit_up_reasons(
                                            self.limit_up_record_data, min_day=days[-1],
                                            max_day=days[0]))
        stock_variables.连续老题材 = KPLLimitUpDataAnalyzer.get_continuous_limit_up_reasons(
            self.limit_up_record_data, self.data_loader.trade_days[:2])
 
        # 加载Tick信息
        open_price_info = TickSummaryDataManager().open_price_info_dict.get(code_)
        if open_price_info:
            stock_variables.今日开盘价 = open_price_info[0]
            stock_variables.今日开盘涨幅 = open_price_info[1]
        high_price_info = TickSummaryDataManager().high_price_info_dict.get(code_)
        if high_price_info:
            stock_variables.今日最高价信息 = high_price_info
 
        low_price = TickSummaryDataManager().low_price_dict.get(code_)
        if low_price:
            stock_variables.今日最低价 = low_price
        self.stock_variables_dict[code_] = stock_variables
 
    def add_big_orders(self, big_orders):
        """
        添加大单
        @param big_orders: [(代码, 买/卖, [订单号,量,金额,最后时间戳,最后价格, 初始时间戳, 初始价格])] 如:[ ('002741', 0, [475820, 91600, 1610328, 92500000, 17.58, 92500000, 17.58])]
        @return:
        """
        codes = []
        for d in big_orders:
            try:
                code = d[0]
                # 只计算200w以上的买单
                if d[2][2] < 200e4:
                    continue
                if d[1] == 0:
                    # 买单
                    if code not in self.big_order_buy:
                        self.big_order_buy[code] = []
                    self.big_order_buy[code].append(d[2])
                else:
                    # 卖单
                    if code not in self.big_order_sell:
                        self.big_order_sell[code] = []
                    self.big_order_sell[code].append(d[2])
                    if code not in codes:
                        codes.append(code)
            except Exception as e:
                logger_debug.error(f"{d}")
            # 驱动下单
        for code in codes:
            self.__run(code, self.stock_variables_dict.get(code))
 
    def add_ticks(self, ticks):
        """
        添加tick数据
        @param ticks:
        @return:
        """
        fticks = [tick for tick in ticks if tick[0] in self.fcodes]
        # 初始化对象
        for tick in fticks:
            code = tick[0]
            self.__init_stock_variables(code)
            stock_variables = self.stock_variables_dict.get(code)
 
            # (代码, 时间戳, 价格, 总交易量, 总交易额, 买5, 卖5)
            code, time_str, price, cum_volume, cum_amount, buy_5 = tick[0], huaxin_util.convert_time(tick[1]), tick[2], \
                                                                   tick[
                                                                       3], tick[4], tick[5]
 
            # 计算tick数据的值
            if time_str < '09:30:00':
                if price > 0:
                    stock_variables.今日开盘价 = price
                else:
                    stock_variables.今日开盘价 = buy_5[0][0]
                    # 今日开盘涨幅
                stock_variables.今日开盘涨幅 = round((stock_variables.今日开盘价 - stock_variables.昨日收盘价) / stock_variables.昨日收盘价,
                                               4)
                TickSummaryDataManager().set_open_price_info(code, (stock_variables.今日开盘价, stock_variables.今日开盘涨幅))
            stock_variables.今日成交量 = cum_volume
            stock_variables.今日成交额 = cum_amount
            stock_variables.当前价 = price
            if not stock_variables.今日最高价信息 or price > stock_variables.今日最高价信息[0]:
                stock_variables.今日最高价信息 = (price, time_str)
                TickSummaryDataManager().set_high_price_info(code, stock_variables.今日最高价信息)
            if not stock_variables.今日最低价 or price < stock_variables.今日最低价:
                stock_variables.今日最低价 = price
                TickSummaryDataManager().set_low_price(code, stock_variables.今日最低价)
 
    def add_limit_up_list(self, limit_up_list):
        """
        涨停列表数据
        @param limit_up_list:
        @return:
        """
        self.current_limit_up_list = limit_up_list
        current_limit_up_list = [x for x in limit_up_list if kpl_util.get_high_level_count(x[4]) < 3]
        most_real_kpl_plate_limit_up_codes_info = {}
        current_limit_up_dict = {x[0]: x for x in current_limit_up_list}
        codes = set(current_limit_up_dict.keys())
        for code in codes:
            plates = self.code_plates.get(code)
            if not plates:
                plates = {current_limit_up_dict.get(code)[5]}
            plates -= constant.KPL_INVALID_BLOCKS
            if plates:
                for p in plates:
                    if p not in most_real_kpl_plate_limit_up_codes_info:
                        most_real_kpl_plate_limit_up_codes_info[p] = []
                    most_real_kpl_plate_limit_up_codes_info[p].append(code)
        self.current_limit_up_plate_codes = most_real_kpl_plate_limit_up_codes_info
 
    def add_block_in(self, block_in_datas):
        """
        添加板块净流入
        @param block_in_datas:
        @return:
        """
        blocks = [x[0] for x in block_in_datas if x[1] > 0]
        _block_in_datas = blocks[:20]
        self.current_block_in_datas = _block_in_datas
 
    def __run(self, code, sv: StockVariables):
        if not sv:
            return
        # 运行代码
        # 注入大单
        sv.今日大单数据 = self.big_order_buy.get(code)
        sv.今日卖大单数据 = self.big_order_sell.get(code)
        # 注入板块涨停代码
        sv.开盘啦最正板块涨停 = self.current_limit_up_plate_codes
        # 注入板块流入信息
        if self.current_block_in_datas:
            sv.资金流入板块 = self.current_block_in_datas
        # 注入已成交代码
        place_order_plate_codes = DealCodesManager().get_place_order_plate_codes()
        sv.板块成交代码 = place_order_plate_codes
        sv.成交代码 = DealCodesManager().get_deal_codes()
        global_dict = {
            "sv": sv,
            "target_code": code,
            "settings": self.settings
        }
        exec(self.scripts, global_dict)
        compute_result = global_dict["compute_result"]
        if compute_result[0]:
            if code in sv.成交代码:
                return
            # 可以下单
            # 判断是否可以买
            for b in compute_result[3]:
                DealCodesManager().place_order(b, code)
                async_log_util.info(logger_trade, f"{code}下单,板块:{compute_result[3]}")
 
 
# 当前的低吸策略对象
low_suction_strtegy = None