"""
|
涨停前大单分析:
|
根据某一天的涨停数据获取该代码涨停前的大单成交数量(6个点以下的)
|
"""
|
import json
|
import os
|
|
import xlwt
|
|
from db import mysql_data_delegate as mysql_data
|
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
|
from strategy.strategy_variable_factory import DataLoader
|
from utils import tool, huaxin_util
|
|
|
class BeforeLimitUpDataAnalyzer:
|
def __init__(self, day):
|
self.day = day
|
# 查询当日的涨停数据
|
self.limit_up_data = mysql_data.Mysqldb().select_all(
|
f"select _code,_code_name, _limit_up_time, _zylt_val from kpl_limit_up_record where _day ='{self.day}' and _limit_up_high_info = '首板'")
|
self.codes = [d[0] for d in self.limit_up_data]
|
k_data_dict = DataLoader(tool.get_now_date_str()).load_kline_data()
|
for code in k_data_dict:
|
if code not in self.codes:
|
continue
|
k_bars = k_data_dict[code]
|
for i in range(len(k_bars)):
|
if k_bars[i]["bob"][:10] < self.day:
|
k_bars = k_bars[i:]
|
k_data_dict[code] = k_bars
|
break
|
self.k_data_dict = k_data_dict
|
self.big_order_deal_dict = LowSuctionOriginDataExportManager(day).export_big_order_deal()
|
if not self.big_order_deal_dict:
|
self.big_order_deal_dict = LowSuctionOriginDataExportManager(day).export_big_order_deal_by()
|
|
def compute(self):
|
limit_up_data_dict = {d[0]: d for d in self.limit_up_data}
|
fdatas = {}
|
for code in self.codes:
|
if code not in self.k_data_dict:
|
continue
|
limit_up_time_str = tool.to_time_str(int(limit_up_data_dict[code][2]))
|
pre_close_price = self.k_data_dict.get(code)[0]["close"]
|
high_price = self.k_data_dict.get(code)[0]["high"]
|
orders = []
|
if self.big_order_deal_dict.get(code):
|
order_ids = set()
|
for big_order in self.big_order_deal_dict.get(code):
|
if huaxin_util.convert_time(big_order[3]) >= limit_up_time_str:
|
break
|
if not (-0.03 <= (big_order[4] - pre_close_price) / pre_close_price <= 0.06):
|
continue
|
if big_order[4] < high_price:
|
continue
|
|
if big_order[0] in order_ids:
|
continue
|
order_ids.add(big_order[0])
|
orders.append(big_order)
|
fdatas[code] = (
|
f"{round(limit_up_data_dict[code][3] / 1e8, 2)}亿", limit_up_data_dict[code][1], orders,
|
limit_up_time_str)
|
return fdatas
|
|
|
def write_to_excel(day, fdatas):
|
# 写入excel
|
wb = xlwt.Workbook()
|
ws = wb.add_sheet(day)
|
index = -1
|
for data in fdatas:
|
index += 1
|
for i in range(len(data)):
|
ws.write(index, i, str(data[i]))
|
wb.save(f"E:/测试数据/涨停前大单_{day}.xls")
|
|
|
def write_to_json(day, fdatas):
|
# 写入excel
|
with open(f"E:/测试数据/涨停前大单_{day}.json", encoding='utf-8', mode='w') as f:
|
f.write(json.dumps(fdatas))
|
|
|
if __name__ == "__main__":
|
days = DataLoader(tool.get_now_date_str()).load_trade_days()
|
# days.insert(0, tool.get_now_date_str())
|
for day in days:
|
if day < '2025-03-17':
|
continue
|
file_path = f"E:/测试数据/涨停前大单_{day}.json"
|
if os.path.exists(file_path):
|
continue
|
print("=====" + day)
|
results = BeforeLimitUpDataAnalyzer(day).compute()
|
results = [(x, results[x]) for x in results]
|
results.sort(key=lambda x: x[1][3])
|
format_datas = []
|
for data in results:
|
code, result = data[0], data[1]
|
format_datas.append((code, result[1], result[0], len(set([x[0] for x in result[2]])), result[3], result[2]))
|
write_to_json(day, format_datas)
|