""" 涨停前大单分析: 根据某一天的涨停数据获取该代码涨停前的大单成交数量(6个点以下的) """ import json import os import xlwt from db import mysql_data_delegate as mysql_data from strategy.low_suction_strategy import LowSuctionOriginDataExportManager from strategy.strategy_variable_factory import DataLoader from utils import tool, huaxin_util class BeforeLimitUpDataAnalyzer: def __init__(self, day): self.day = day # 查询当日的涨停数据 self.limit_up_data = mysql_data.Mysqldb().select_all( f"select _code,_code_name, _limit_up_time, _zylt_val from kpl_limit_up_record where _day ='{self.day}' and _limit_up_high_info = '首板'") self.codes = [d[0] for d in self.limit_up_data] k_data_dict = DataLoader(tool.get_now_date_str()).load_kline_data() for code in k_data_dict: if code not in self.codes: continue k_bars = k_data_dict[code] for i in range(len(k_bars)): if k_bars[i]["bob"][:10] < self.day: k_bars = k_bars[i:] k_data_dict[code] = k_bars break self.k_data_dict = k_data_dict self.big_order_deal_dict = LowSuctionOriginDataExportManager(day).export_big_order_deal() if not self.big_order_deal_dict: self.big_order_deal_dict = LowSuctionOriginDataExportManager(day).export_big_order_deal_by() def compute(self): limit_up_data_dict = {d[0]: d for d in self.limit_up_data} fdatas = {} for code in self.codes: if code not in self.k_data_dict: continue limit_up_time_str = tool.to_time_str(int(limit_up_data_dict[code][2])) pre_close_price = self.k_data_dict.get(code)[0]["close"] high_price = self.k_data_dict.get(code)[0]["high"] orders = [] if self.big_order_deal_dict.get(code): order_ids = set() for big_order in self.big_order_deal_dict.get(code): if huaxin_util.convert_time(big_order[3]) >= limit_up_time_str: break if not (-0.03 <= (big_order[4] - pre_close_price) / pre_close_price <= 0.06): continue if big_order[4] < high_price: continue if big_order[0] in order_ids: continue order_ids.add(big_order[0]) orders.append(big_order) fdatas[code] = ( f"{round(limit_up_data_dict[code][3] / 1e8, 2)}亿", limit_up_data_dict[code][1], orders, limit_up_time_str) return fdatas def write_to_excel(day, fdatas): # 写入excel wb = xlwt.Workbook() ws = wb.add_sheet(day) index = -1 for data in fdatas: index += 1 for i in range(len(data)): ws.write(index, i, str(data[i])) wb.save(f"E:/测试数据/涨停前大单_{day}.xls") def write_to_json(day, fdatas): # 写入excel with open(f"E:/测试数据/涨停前大单_{day}.json", encoding='utf-8', mode='w') as f: f.write(json.dumps(fdatas)) if __name__ == "__main__": days = DataLoader(tool.get_now_date_str()).load_trade_days() # days.insert(0, tool.get_now_date_str()) for day in days: if day < '2025-03-17': continue file_path = f"E:/测试数据/涨停前大单_{day}.json" if os.path.exists(file_path): continue print("=====" + day) results = BeforeLimitUpDataAnalyzer(day).compute() results = [(x, results[x]) for x in results] results.sort(key=lambda x: x[1][3]) format_datas = [] for data in results: code, result = data[0], data[1] format_datas.append((code, result[1], result[0], len(set([x[0] for x in result[2]])), result[3], result[2])) write_to_json(day, format_datas)