"""
|
策略变量工厂类
|
"""
|
import datetime
|
import json
|
import os
|
import re
|
|
import constant
|
from code_attribute import global_data_loader
|
from db import mysql_data_delegate
|
from strategy.data_analyzer import KTickLineAnalyzer, KPLLimitUpDataAnalyzer, K60SLineAnalyzer
|
from strategy.strategy_variable import StockVariables
|
from third_data import kpl_api, kpl_util
|
from third_data.history_k_data_manager import HistoryKDataManager
|
from third_data.history_k_data_util import JueJinLocalApi, HistoryKDatasUtils
|
from utils import global_util, tool
|
|
|
class DataLoader:
|
"""
|
数据加载器类,用于集中管理策略变量所需的各类数据加载逻辑
|
"""
|
|
def __init__(self, now_day, cache_path="D:/datas"):
|
"""
|
初始化数据加载器
|
:param now_day: 当前日期,格式为"2025-01-01"
|
"""
|
self.now_day = now_day
|
self.cache_path = cache_path
|
self.jueJinLocalApi = JueJinLocalApi("41c4f5da-2591-11f0-a9c9-f4b5203f67bf",
|
"018db265fa34e241dd6198b7ca507ee0a82ad029")
|
self.trade_days = self.load_trade_days()
|
self.plate_codes = {}
|
# 代码的精选板块 : {"代码":{板块}}
|
self.jx_blocks = {}
|
|
def load_kline_data(self):
|
"""
|
加载日K线数据
|
:return: {"代码": 日K线数据}
|
"""
|
dir_path = os.path.join(self.cache_path, "k_bars")
|
day = self.trade_days[0]
|
dirs = os.listdir(dir_path)
|
k_bar_code_data_dict = {}
|
for d in dirs:
|
if d.find(day) < 0:
|
continue
|
code = d.split("_")[-1][:6]
|
fpath = os.path.join(dir_path, d)
|
with open(fpath, mode='r', encoding='utf-8') as f:
|
lines = f.readlines()
|
line = lines[0]
|
k_bar_code_data_dict[code] = eval(line)
|
return k_bar_code_data_dict
|
|
def load_kline_data_by_day_and_code(self, day, code):
|
path_str = os.path.join(self.cache_path, "k_bars", f"{day}_{code}.txt")
|
if not os.path.exists(path_str):
|
return None
|
with open(path_str, mode='r', encoding='utf-8') as f:
|
lines = f.readlines()
|
line = lines[0]
|
return eval(line)
|
return None
|
|
def load_minute_data(self):
|
"""
|
加载分钟K线数据
|
:return: 分钟K线数据字典
|
"""
|
dir_path = os.path.join(self.cache_path, "bar_60s")
|
day = self.trade_days[0]
|
dirs = os.listdir(dir_path)
|
k_bar_code_data_dict = {}
|
for d in dirs:
|
if d.find(day) < 0:
|
continue
|
code = d.split("_")[-1][:6]
|
sub_dir_path = os.path.join(dir_path, d)
|
sub_dirs = os.listdir(sub_dir_path)
|
date_datas = {}
|
for sub_dir in sub_dirs:
|
fpath = os.path.join(sub_dir_path, sub_dir)
|
date = sub_dir[:10]
|
with open(fpath, mode='r', encoding='utf-8') as f:
|
lines = f.readlines()
|
line = lines[0]
|
date_datas[date] = eval(line)
|
k_bar_code_data_dict[code] = date_datas
|
return k_bar_code_data_dict
|
|
def load_tick_data(self, target_codes=None):
|
"""
|
加载当日的tick数据
|
:return:tick数据字典
|
"""
|
code_ticks_dict = {}
|
tick_dir_path = os.path.join(self.cache_path, "ticks")
|
if os.path.exists(tick_dir_path):
|
fs = os.listdir(tick_dir_path)
|
for f in fs:
|
if f.find(self.now_day) < 0:
|
continue
|
code = f.split("_")[1][:6]
|
if target_codes and code not in target_codes:
|
continue
|
tick_path = os.path.join(tick_dir_path, f)
|
with open(tick_path, mode='r') as f:
|
lines = f.readlines()
|
line = lines[0]
|
line = line.replace("datetime.datetime(", "\"datetime.datetime(").replace("('PRC'))", "('PRC'))\"")
|
line = line.replace("'", "\"").replace("\"PRC\"", "'PRC'")
|
ticks = json.loads(line)
|
for t in ticks:
|
if t["created_at"].find("datetime.datetime") >= 0:
|
created_at_line = t["created_at"].replace("datetime.datetime(", "")
|
sts = created_at_line.split(",")
|
year, month, day, hour, minute, second = sts[0].strip(), sts[1].strip(), sts[2].strip(), \
|
sts[
|
3].strip(), \
|
sts[4].strip(), (
|
sts[5].strip() if len(sts) >= 7 else 0)
|
dt = datetime.datetime(int(year), int(month), int(day), int(hour), int(minute), int(second))
|
t["created_at"] = dt.strftime("%Y-%m-%d %H:%M:%S")
|
# 整理为分钟K线
|
code_ticks_dict[code] = ticks
|
return code_ticks_dict
|
|
def load_limit_up_data(self):
|
"""
|
加载涨停数据
|
:return: 涨停数据记录[(代码, 日期, 板块, 是否炸板)]
|
"""
|
mysql = mysql_data_delegate.Mysqldb()
|
results = mysql.select_all(
|
f"select _code, _day, _hot_block_name, _open, _blocks from kpl_limit_up_record where _day>='{self.trade_days[-1]}' and _day <='{self.trade_days[0]}'")
|
for r in results:
|
r[2] = kpl_util.filter_block(r[2])
|
return results
|
|
def __compute_limit_up_reasons_for_refer(self, block_infos):
|
"""
|
计算参考票的涨停原因
|
@param block_infos: [(板块, 日期)]
|
@return:
|
"""
|
# [(板块, 日期)]
|
block_infos.sort(key=lambda x: x[1], reverse=True)
|
# {"板块":[(出现次数, 最近出现日期)]}
|
temp_dict = {}
|
for b in block_infos:
|
if b[0] in constant.KPL_INVALID_BLOCKS:
|
continue
|
if b[0] not in temp_dict:
|
temp_dict[b[0]] = [0, b[1]]
|
temp_dict[b[0]][0] += 1
|
if not temp_dict:
|
return set()
|
temp_list = [(k, temp_dict[k][0], temp_dict[k][1]) for k in temp_dict]
|
# 按照涨停次数与最近涨停时间排序
|
temp_list.sort(key=lambda x: (x[1], x[2]), reverse=True)
|
# 取涨停次数最多的和最近涨停的
|
# 取相同次数的原因
|
if temp_list:
|
_list = [t for t in temp_list if t[1] == temp_list[0][1]]
|
if _list[0][1] == 1:
|
_list = _list[:1]
|
blocks = set([x[0] for x in _list])
|
else:
|
blocks = set()
|
|
blocks -= constant.KPL_INVALID_BLOCKS
|
# 去除例如概念这些泛指词
|
return set([kpl_util.filter_block(x) for x in blocks])
|
|
def load_code_plates_for_refer(self):
|
"""
|
获取参考票的涨停原因
|
@return:
|
"""
|
sql = f"SELECT r.`_code`, r.`_day`, r.`_hot_block_name`, r.`_blocks`, r.`_open` FROM `kpl_limit_up_record` r WHERE r.`_day`>='{tool.date_sub(self.now_day, 365)}' and r.`_day` <'{self.now_day}'"
|
mysql = mysql_data_delegate.Mysqldb()
|
kpl_results = mysql.select_all(sql)
|
# {"代码":[(板块, 日期), (板块, 日期)]}
|
kpl_block_dict = {}
|
for r in kpl_results:
|
# 当日炸板的不计算原因
|
if r[4] == 1:
|
continue
|
code = r[0]
|
if code not in kpl_block_dict:
|
kpl_block_dict[code] = []
|
kpl_block_dict[code].append((r[2], r[1])) # (板块, 日期)
|
reasons_dict = {}
|
for code in kpl_block_dict:
|
block_infos = kpl_block_dict.get(code)
|
reasons_dict[code] = self.__compute_limit_up_reasons_for_refer(block_infos)
|
return reasons_dict
|
|
def load_target_plate_and_codes(self):
|
"""
|
加载目标板块与对应的代码:
|
从最近60个交易日的真正涨停数据中
|
@return: {"板块":[代码]}
|
"""
|
end_date = self.trade_days[:60][-1]
|
start_date = self.trade_days[:60][0]
|
mysql = mysql_data_delegate.Mysqldb()
|
# 获取上个交易日涨停的票
|
results = mysql.select_all(
|
f"SELECT r.`_code` FROM `kpl_limit_up_record` r where r._day='{self.trade_days[0]}' and r._open = 0")
|
exclude_codes = set([x[0] for x in results])
|
results = mysql.select_all(
|
f"select r.`_hot_block_name` from `kpl_limit_up_record` r where r.`_open`=0 and r.`_day`>'{end_date}' and r.`_day` <= '{start_date}' group by r.`_hot_block_name`")
|
blocks = set([x[0] for x in results])
|
fresults = {}
|
all_buy_plates_of_codes = self.load_all_buy_plates_of_codes()
|
valid_codes = set(all_buy_plates_of_codes.keys())
|
for b in blocks:
|
sql = f"""
|
SELECT * FROM
|
(
|
SELECT r.`_code`, r.`_code_name`, COUNT(*) AS `count`, MAX(r.`_day`) AS _day FROM `kpl_limit_up_record` r WHERE r.`_open`=0 AND r.`_day`>'{end_date}' AND r.`_day`<='{start_date}' AND r.`_hot_block_name`='{b}' GROUP BY r.`_code`
|
) a
|
|
ORDER BY a.count DESC,a._day DESC
|
"""
|
|
results = mysql.select_all(sql)
|
# 取前1/3
|
if results:
|
results = [x for x in results if
|
(tool.is_can_buy_code(x[0]) and x[0] in valid_codes and x[0] not in exclude_codes)]
|
# 取前1/3且涨停数是前10
|
max_count = len(results) // 2 if len(results) % 2 == 0 else len(results) // 2 + 1
|
results = results[:max_count]
|
# 取前10
|
results = results[:10]
|
codes = [x[0] for x in results]
|
fresults[kpl_util.filter_block(b)] = codes
|
return fresults
|
|
def load_trade_days(self):
|
"""
|
加载交易日列表,now_day前120个交易日
|
:return: 交易日列表
|
"""
|
# 将now_day转为datetime类型
|
current_date = datetime.datetime.strptime(self.now_day, '%Y-%m-%d').date()
|
pre_date = current_date - datetime.timedelta(days=1)
|
one_year_ago = (pre_date - datetime.timedelta(days=365)).strftime('%Y-%m-%d')
|
pre_date = pre_date.strftime('%Y-%m-%d')
|
|
if constant.is_windows():
|
trade_days = self.jueJinLocalApi.get_trading_dates(one_year_ago, pre_date)
|
else:
|
trade_days = HistoryKDatasUtils.get_trading_dates(one_year_ago, pre_date)
|
trade_days.sort(reverse=True)
|
trade_days = trade_days[:120]
|
return trade_days
|
|
def load_next_trade_day(self):
|
"""
|
加载交易日列表,now_day前120个交易日
|
:return: 交易日列表
|
"""
|
if constant.is_windows():
|
next_trade_day = self.jueJinLocalApi.get_next_trading_date(self.now_day)
|
else:
|
next_trade_day = HistoryKDatasUtils.get_next_trading_date(self.now_day)
|
return next_trade_day
|
|
def load_target_codes(self):
|
"""
|
获取特殊的代码,需要订阅300w以上的大单
|
@return: 代码集合, 收盘价字典:{"code":收盘价}
|
"""
|
try:
|
global_data_loader.load_zyltgb_volume_from_db()
|
pre_close_price_dict = {}
|
zylt_volume_map = global_util.zylt_volume_map
|
codes = set()
|
last_trade_day = self.trade_days[0]
|
for code in zylt_volume_map:
|
volume = zylt_volume_map.get(code)
|
# 今日涨停价要突破昨日最高价
|
k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
|
if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
|
# 自由流通市值在30亿-300亿以上
|
limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2)
|
max_price_info = max(k_bars[:1], key=lambda x: x["high"])
|
if limit_up_price > max_price_info["high"]:
|
# 今日涨停价要突破昨日最高价
|
codes.add(code)
|
pre_close_price_dict[code] = k_bars[0]["close"]
|
return codes, pre_close_price_dict
|
except Exception as e:
|
return set(), None
|
|
def load_plate_codes(self, plate_code, plate_name):
|
"""
|
获取板块有可能买的目标代码
|
@param plate_code:
|
@return:[(代码, 领涨次数, 最大领涨次数)]
|
"""
|
|
if not plate_code or plate_name == '无' or plate_name in constant.KPL_INVALID_BLOCKS:
|
return set()
|
if plate_code in self.plate_codes:
|
return self.plate_codes.get(plate_code)
|
dir_path = os.path.join(self.cache_path, "plate_codes_info", self.now_day)
|
if not os.path.exists(dir_path):
|
os.makedirs(dir_path)
|
path_ = os.path.join(dir_path, plate_code + ".text")
|
datas = []
|
if os.path.exists(path_):
|
with open(path_, mode='r', encoding='utf-8') as f:
|
codes_info = []
|
lines = f.readlines()
|
for i in range(len(lines)):
|
if i == 0:
|
continue
|
line = lines[i]
|
if line:
|
r = eval(line)
|
datas.append(r)
|
else:
|
codes_info = []
|
results = self.request_plate_codes(plate_code)
|
with open(path_, mode='w', encoding='utf-8') as f:
|
f.write(plate_name)
|
f.write("\n")
|
f.writelines([f"{x}\n" for x in results])
|
datas = results
|
# 保存到内存中
|
if datas:
|
max_data = max(datas, key=lambda x: x[3])
|
for r in datas:
|
if r[3] < 1:
|
continue
|
if re.match(r"风险|立案", r[2]):
|
continue
|
if r[1].find("ST") >= 0:
|
continue
|
if r[1].find("S") >= 0:
|
continue
|
if not tool.is_can_buy_code(r[0]):
|
continue
|
codes_info.append((r[0], r[3], max_data[3]))
|
# if len(codes_info) >= 10:
|
# break
|
f_codes_info = codes_info # [d for d in codes_info if d[1] >= d[2] // 2]
|
self.plate_codes[plate_code] = f_codes_info
|
return self.plate_codes.get(plate_code)
|
|
def load_all_codes_of_plates(self, is_for_buy=False):
|
"""
|
加载所有板块的领涨票
|
@return:{"板块代码":(板块名称, [(代码,代码名称,标签,领涨次数)])}
|
"""
|
dir_path = os.path.join(self.cache_path, "plate_codes_info", self.now_day)
|
if not os.path.exists(dir_path):
|
return None
|
fdata = {}
|
plate_files = os.listdir(dir_path)
|
for plate_file in plate_files:
|
plate_code = plate_file.split(".")[0]
|
path_ = os.path.join(dir_path, plate_file)
|
with open(path_, mode='r', encoding='utf-8') as f:
|
datas = []
|
lines = f.readlines()
|
for i in range(len(lines)):
|
if i == 0:
|
continue
|
line = lines[i]
|
if line:
|
r = eval(line)
|
if not is_for_buy:
|
if r[3] < 1:
|
continue
|
if r[1].find("ST") >= 0:
|
continue
|
if r[1].find("S") >= 0:
|
continue
|
else:
|
if re.match(r"风险|立案", r[2]):
|
continue
|
if r[1].find("ST") >= 0:
|
continue
|
if r[1].find("S") >= 0:
|
continue
|
if not tool.is_can_buy_code(r[0]):
|
continue
|
datas.append(r)
|
# if len(datas) >= 10:
|
# break
|
fdata[plate_code] = (kpl_util.filter_block(lines[0].strip()), datas)
|
return fdata
|
|
def load_all_refer_plates_of_codes(self):
|
"""
|
加载所有有领涨代码的领涨板块
|
@return:
|
"""
|
datas = self.load_all_codes_of_plates()
|
fdata = {}
|
for plate_code in datas:
|
plate_name = datas[plate_code][0]
|
codes_info = datas[plate_code][1]
|
for item in codes_info:
|
code, limit_up_count = item[0], item[3]
|
if code not in fdata:
|
fdata[code] = []
|
fdata[code].append((plate_code, plate_name, limit_up_count))
|
for code in fdata:
|
fdata[code].sort(key=lambda x: x[2], reverse=True)
|
fdata[code] = fdata[code][:3]
|
return fdata
|
|
def load_all_buy_plates_of_codes(self):
|
"""
|
加载所有代码的领涨板块
|
@return: {"代码":{"板块名称":(代码, 领涨次数, 最大领涨次数)}}
|
"""
|
datas = self.load_all_codes_of_plates(is_for_buy=True)
|
fdata = {}
|
if not datas:
|
return fdata
|
for plate_code in datas:
|
plate_name = datas[plate_code][0]
|
codes_info = datas[plate_code][1]
|
if not codes_info:
|
continue
|
max_count = max(codes_info, key=lambda x: x[3])[3]
|
for item in codes_info:
|
code, limit_up_count = item[0], item[3]
|
if code not in fdata:
|
fdata[code] = {}
|
fdata[code][plate_name] = (code, limit_up_count, max_count)
|
fdata_dict = {c: [(p, fdata[c][p]) for p in fdata[c]] for c in fdata}
|
for c in fdata_dict:
|
fdata_dict[c].sort(key=lambda x: x[1], reverse=True)
|
fdata_dict[c] = fdata_dict[c][:3]
|
|
fdata = {code: {x[0]: x[1] for x in fdata_dict[code]} for code in fdata_dict}
|
|
return fdata
|
|
def request_plate_codes(self, plate_code):
|
"""
|
获取板块的代码
|
@param plate_code:
|
@return:[代码, 名称, 风险项, 领涨次数]
|
"""
|
fresults = []
|
for i in range(1, 10):
|
results = kpl_api.getHistoryCodesByPlateOrderByLZCS(plate_code, self.now_day, "0930", i)
|
results = json.loads(results)["list"]
|
fresults.extend(results)
|
if len(results) < 30:
|
break
|
fdatas = []
|
for result in fresults:
|
d = result[0], result[1], result[2], result[40]
|
fdatas.append(d)
|
return fdatas
|
|
def get_limit_up_reasons_with_plate_code(self):
|
"""
|
获取涨停原因
|
:return: 涨停数据记录[(代码, 日期, 板块, 是否炸板)]
|
"""
|
mysql = mysql_data_delegate.Mysqldb()
|
sql = """
|
SELECT _hot_block_code,_hot_block_name FROM
|
(
|
SELECT r.`_hot_block_code`, r.`_hot_block_name`, r.`_create_time` FROM
|
(SELECT DISTINCT(c.`_hot_block_code`) FROM `kpl_limit_up_record` c WHERE c.`_day`>'最小日期' and c.`_day`<'今日日期') a
|
LEFT JOIN kpl_limit_up_record r ON r.`_hot_block_code` = a._hot_block_code ORDER BY r.`_create_time` DESC
|
) b GROUP BY b._hot_block_code HAVING b._hot_block_code IS NOT NULL
|
"""
|
sql = sql.replace("最小日期", self.trade_days[-1]).replace("今日日期", self.now_day)
|
results = mysql.select_all(sql)
|
return [x for x in results if kpl_util.filter_block(x[1]) not in constant.KPL_INVALID_BLOCKS]
|
|
def load_jx_blocks(self, code):
|
if code in self.jx_blocks:
|
self.jx_blocks.get(code)
|
# 从文件中读取
|
dir_path = os.path.join(self.cache_path, "jx_blocks", self.now_day)
|
path_str = os.path.join(dir_path, f"{code}.txt")
|
if os.path.exists(path_str):
|
with open(path_str, mode='r', encoding='utf-8') as f:
|
lines = f.readlines()
|
blocks = eval(lines[0])
|
self.jx_blocks[code] = blocks
|
if code in self.jx_blocks:
|
return self.jx_blocks.get(code)
|
|
blocks = kpl_api.getCodeJingXuanBlocks(code)
|
blocks = set([kpl_util.filter_block(x[1]) for x in blocks])
|
blocks -= constant.KPL_INVALID_BLOCKS
|
self.jx_blocks[code] = blocks
|
# 保存到文件
|
if not os.path.exists(dir_path):
|
os.makedirs(dir_path)
|
with open(os.path.join(dir_path, f"{code}.txt"), mode='w', encoding='utf-8') as f:
|
f.write(f"{blocks}")
|
return blocks
|
|
def load_all_jx_blocks(self):
|
code_blocks = {}
|
# 从文件中读取
|
dir_path = os.path.join(self.cache_path, "jx_blocks", self.now_day)
|
files = os.listdir(dir_path)
|
for file in files:
|
code = file[:6]
|
with open(os.path.join(dir_path, file), mode='r', encoding='utf-8') as f:
|
code_blocks[code] = eval(f.readlines()[0])
|
return code_blocks
|
|
|
class StrategyVariableFactory:
|
@staticmethod
|
def create_from_history_data(kline_data_1d, kline_data_60s_dict, limit_up_data_records,
|
trade_days) -> StockVariables:
|
"""
|
根据历史数据创建StockVariables对象
|
:param kline_data_1d: 日K线数据
|
:param kline_data_60s_dict:分钟k线字典:{"2025-01-01":{Bar格式}}
|
:param limit_up_data_records: 代码的历史涨停数据:[(代码,日期,涨停原因)]
|
:param trade_days: 交易日列表,从大到小倒序排列
|
:return: StockVariables实例
|
"""
|
instance = StockVariables()
|
# 设置K线相关属性
|
instance.昨日成交量 = KTickLineAnalyzer.get_yesterday_volume(kline_data_1d)
|
instance.昨日收盘价 = KTickLineAnalyzer.get_yesterday_close(kline_data_1d)
|
instance.昨日最高价 = KTickLineAnalyzer.get_yesterday_high(kline_data_1d)
|
instance.昨日成交额 = KTickLineAnalyzer.get_yesterday_amount(kline_data_1d)
|
instance.昨日非涨停 = not KTickLineAnalyzer.is_yesterday_limit_up(kline_data_1d)
|
instance.昨日非炸板 = not KTickLineAnalyzer.is_yesterday_exploded(kline_data_1d)
|
instance.昨日非跌停 = not KTickLineAnalyzer.is_yesterday_limit_down(kline_data_1d)
|
instance.昨日最低价 = KTickLineAnalyzer.get_yesterday_low_price(kline_data_1d)
|
instance.昨日开盘价 = KTickLineAnalyzer.get_yesterday_open_price(kline_data_1d)
|
|
day_counts = [5, 10, 30, 60, 120]
|
for day in [3, 5, 10, 30, 60, 120]:
|
instance.__setattr__(f"日最高价_{day}", KTickLineAnalyzer.get_recent_days_high(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"日最高量_{day}", KTickLineAnalyzer.get_recent_days_max_volume(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"涨停数_{day}", KTickLineAnalyzer.get_recent_limit_up_count(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"炸板数_{day}", KTickLineAnalyzer.get_recent_exploded_count(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"跌停数_{day}", KTickLineAnalyzer.get_recent_limit_down_count(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"日首板溢价率_{day}", KTickLineAnalyzer.get_first_limit_up_avg_premium(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"日首板炸板溢价率_{day}",
|
KTickLineAnalyzer.get_first_exploded_avg_premium(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"日首板个数_{day}", KTickLineAnalyzer.get_first_limit_up_days(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"日二板个数_{day}", KTickLineAnalyzer.get_second_limit_up_days(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"日三板个数_{day}", KTickLineAnalyzer.get_third_limit_up_days(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"日大等于四板个数_{day}",
|
KTickLineAnalyzer.get_fourth_or_more_limit_up_days(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"日首跌停个数_{day}", KTickLineAnalyzer.get_first_limit_down_days(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"日2次跌停个数_{day}", KTickLineAnalyzer.get_second_limit_down_days(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"日3次跌停个数_{day}", KTickLineAnalyzer.get_third_limit_down_days(kline_data_1d, day))
|
for day in day_counts:
|
instance.__setattr__(f"日大等于4次跌停个数_{day}",
|
KTickLineAnalyzer.get_fourth_or_more_limit_down_days(kline_data_1d, day))
|
for day in [5, 10, 15, 30, 60, 120]:
|
instance.__setattr__(f"日放倍量日期_{day}",
|
KTickLineAnalyzer.get_recent_days_double_volume_date(kline_data_1d, day))
|
|
for day in day_counts:
|
days = trade_days[:day]
|
instance.__setattr__(f"日个股最正的原因_{day}",
|
KPLLimitUpDataAnalyzer.get_most_common_reasons(limit_up_data_records, min_day=days[-1],
|
max_day=days[0]))
|
|
if kline_data_60s_dict:
|
for day in day_counts:
|
# 获取日K最高量的信息
|
volume, k_data = KTickLineAnalyzer.get_recent_days_max_volume(kline_data_1d, day)
|
d = k_data["bob"][:10]
|
kline_data_60s = kline_data_60s_dict.get(d)
|
fdata = K60SLineAnalyzer.get_close_price_of_max_volume(kline_data_60s)
|
instance.__setattr__(f"日高量分时最高量价_{day}", fdata)
|
kline_data_60s = kline_data_60s_dict.get(trade_days[0])
|
fdata = K60SLineAnalyzer.get_close_price_of_max_volume(kline_data_60s)
|
instance.__setattr__(f"昨日分时最高量价", fdata)
|
return instance
|
|
|
def __test_jx_blocks(__DataLoader):
|
def load_all_codes():
|
codes = set()
|
with open("D:/codes/codes_sh.text") as f:
|
lines = f.readlines()
|
codes |= set([x.strip() for x in lines if x.find("30") != 0])
|
with open("D:/codes/codes_sz.text") as f:
|
lines = f.readlines()
|
codes |= set([x.strip() for x in lines if x.find("30") != 0])
|
return codes
|
|
code_blocks = __DataLoader.load_all_jx_blocks()
|
# codes = load_all_codes()
|
# for code in codes:
|
# print(code)
|
# __DataLoader.load_jx_blocks(code)
|
codes = ["002639", "002366"]
|
same_blocks = set()
|
for c in codes:
|
blocks = __DataLoader.load_jx_blocks(c)
|
if not same_blocks:
|
same_blocks = blocks
|
same_blocks &= blocks
|
print("相同板块", same_blocks)
|
for code in code_blocks:
|
if len(code_blocks[code] & same_blocks) == len(same_blocks):
|
if code in codes:
|
continue
|
print(code, code_blocks[code])
|
|
|
if __name__ == "__main__":
|
__DataLoader = DataLoader("2025-06-12")
|
# __test_jx_blocks(__DataLoader)
|
|
# instance = StockVariables()
|
# day = 5
|
# instance.__setattr__(f"日最高价_{day}", 12.00)
|
# print(instance.日最高价_5)
|
|
# 下载目标票的板块
|
# fdata = __DataLoader.load_all_refer_plates_of_codes()
|
# print(fdata.get("000833"))
|
|
# result_dict = __DataLoader.load_code_plates_for_refer()
|
# print(result_dict["301279"])
|
|
results = __DataLoader.load_target_plate_and_codes()
|
# for k in results:
|
# print(k, results[k])
|
plates = ["汽车零部件", "稀土永磁", "化工", "医药", "光伏"]
|
print("==========新题材=======")
|
for p in plates:
|
print(p, results.get(p))
|
|
# print("食品饮料", results.get("食品饮料"))
|
# print("锂电池", results.get("锂电池"))
|
# print("数字经济", results.get("数字经济"))
|
# print("地产链", results.get("地产链"))
|
# print("物流", results.get("物流"))
|
|
# 下载涨停原因板块对应的代码
|
plates = __DataLoader.get_limit_up_reasons_with_plate_code()
|
for p in plates:
|
print(p)
|
__DataLoader.load_plate_codes(p[0], p[1])
|
|
# DataLoader("2025-05-06").load_tick_data()
|
#
|
# print(re.match(r"风险|立案", "风险123"))
|