Administrator
4 天以前 1f69a007aa50b04a55195083970c110954935dfd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""
涨停前大单分析:
根据某一天的涨停数据获取该代码涨停前的大单成交数量(6个点以下的)
"""
import json
import os
 
import xlwt
 
from db import mysql_data_delegate as mysql_data
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
from strategy.strategy_variable_factory import DataLoader
from utils import tool, huaxin_util
 
 
class BeforeLimitUpDataAnalyzer:
    def __init__(self, day):
        self.day = day
        # 查询当日的涨停数据
        self.limit_up_data = mysql_data.Mysqldb().select_all(
            f"select _code,_code_name, _limit_up_time, _zylt_val from kpl_limit_up_record where _day ='{self.day}' and _limit_up_high_info = '首板'")
        self.codes = [d[0] for d in self.limit_up_data]
        k_data_dict = DataLoader(tool.get_now_date_str()).load_kline_data()
        for code in k_data_dict:
            if code not in self.codes:
                continue
            k_bars = k_data_dict[code]
            for i in range(len(k_bars)):
                if k_bars[i]["bob"][:10] < self.day:
                    k_bars = k_bars[i:]
                    k_data_dict[code] = k_bars
                    break
        self.k_data_dict = k_data_dict
        self.big_order_deal_dict = LowSuctionOriginDataExportManager(day).export_big_order_deal()
        if not self.big_order_deal_dict:
            self.big_order_deal_dict = LowSuctionOriginDataExportManager(day).export_big_order_deal_by()
 
    def compute(self):
        limit_up_data_dict = {d[0]: d for d in self.limit_up_data}
        fdatas = {}
        for code in self.codes:
            if code not in self.k_data_dict:
                continue
            limit_up_time_str = tool.to_time_str(int(limit_up_data_dict[code][2]))
            pre_close_price = self.k_data_dict.get(code)[0]["close"]
            high_price = self.k_data_dict.get(code)[0]["high"]
            orders = []
            if self.big_order_deal_dict.get(code):
                order_ids = set()
                for big_order in self.big_order_deal_dict.get(code):
                    if huaxin_util.convert_time(big_order[3]) >= limit_up_time_str:
                        break
                    if not (-0.03 <= (big_order[4] - pre_close_price) / pre_close_price <= 0.06):
                        continue
                    if big_order[4] < high_price:
                        continue
 
                    if big_order[0] in order_ids:
                        continue
                    order_ids.add(big_order[0])
                    orders.append(big_order)
            fdatas[code] = (
                f"{round(limit_up_data_dict[code][3] / 1e8, 2)}亿", limit_up_data_dict[code][1], orders,
                limit_up_time_str)
        return fdatas
 
 
def write_to_excel(day, fdatas):
    # 写入excel
    wb = xlwt.Workbook()
    ws = wb.add_sheet(day)
    index = -1
    for data in fdatas:
        index += 1
        for i in range(len(data)):
            ws.write(index, i, str(data[i]))
    wb.save(f"E:/测试数据/涨停前大单_{day}.xls")
 
 
def write_to_json(day, fdatas):
    # 写入excel
    with open(f"E:/测试数据/涨停前大单_{day}.json", encoding='utf-8', mode='w') as f:
        f.write(json.dumps(fdatas))
 
 
if __name__ == "__main__":
    days = DataLoader(tool.get_now_date_str()).load_trade_days()
    # days.insert(0, tool.get_now_date_str())
    for day in days:
        if day < '2025-03-17':
            continue
        file_path = f"E:/测试数据/涨停前大单_{day}.json"
        if os.path.exists(file_path):
            continue
        print("=====" + day)
        results = BeforeLimitUpDataAnalyzer(day).compute()
        results = [(x, results[x]) for x in results]
        results.sort(key=lambda x: x[1][3])
        format_datas = []
        for data in results:
            code, result = data[0], data[1]
            format_datas.append((code, result[1], result[0], len(set([x[0] for x in result[2]])), result[3], result[2]))
        write_to_json(day, format_datas)