""" 策略变量工厂类 """ import datetime import json import os import re import constant from code_attribute import global_data_loader, gpcode_manager from db import mysql_data_delegate from strategy.data_analyzer import KTickLineAnalyzer, KPLLimitUpDataAnalyzer, K60SLineAnalyzer from strategy.strategy_variable import StockVariables from third_data import kpl_api, kpl_util from third_data.history_k_data_manager import HistoryKDataManager from third_data.history_k_data_util import JueJinLocalApi, HistoryKDatasUtils from utils import global_util, tool class DataLoader: """ 数据加载器类,用于集中管理策略变量所需的各类数据加载逻辑 """ def __init__(self, now_day, cache_path=f"{constant.get_path_prefix()}/datas"): """ 初始化数据加载器 :param now_day: 当前日期,格式为"2025-01-01" """ self.now_day = now_day self.cache_path = cache_path self.jueJinLocalApi = JueJinLocalApi("41c4f5da-2591-11f0-a9c9-f4b5203f67bf", "018db265fa34e241dd6198b7ca507ee0a82ad029") self.trade_days = self.load_trade_days() self.plate_codes = {} # 代码的精选板块 : {"代码":{板块}} self.jx_blocks = {} def load_kline_data(self): """ 加载日K线数据 :return: {"代码": 日K线数据} """ dir_path = os.path.join(self.cache_path, "k_bars") day = self.trade_days[0] dirs = os.listdir(dir_path) k_bar_code_data_dict = {} for d in dirs: if d.find(day) < 0: continue code = d.split("_")[-1][:6] fpath = os.path.join(dir_path, d) with open(fpath, mode='r', encoding='utf-8') as f: lines = f.readlines() line = lines[0] k_bar_code_data_dict[code] = eval(line) return k_bar_code_data_dict def load_kline_data_by_day_and_code(self, day, code): path_str = os.path.join(self.cache_path, "k_bars", f"{day}_{code}.txt") if not os.path.exists(path_str): return None with open(path_str, mode='r', encoding='utf-8') as f: lines = f.readlines() line = lines[0] return eval(line) return None def load_minute_data(self): """ 加载分钟K线数据 :return: 分钟K线数据字典 """ dir_path = os.path.join(self.cache_path, "bar_60s") day = self.trade_days[0] dirs = os.listdir(dir_path) k_bar_code_data_dict = {} for d in dirs: if d.find(day) < 0: continue code = d.split("_")[-1][:6] sub_dir_path = os.path.join(dir_path, d) sub_dirs = os.listdir(sub_dir_path) date_datas = {} for sub_dir in sub_dirs: fpath = os.path.join(sub_dir_path, sub_dir) date = sub_dir[:10] with open(fpath, mode='r', encoding='utf-8') as f: lines = f.readlines() line = lines[0] date_datas[date] = eval(line) k_bar_code_data_dict[code] = date_datas return k_bar_code_data_dict def load_tick_data(self, target_codes=None): """ 加载当日的tick数据 :return:tick数据字典 """ code_ticks_dict = {} tick_dir_path = os.path.join(self.cache_path, "ticks") if os.path.exists(tick_dir_path): fs = os.listdir(tick_dir_path) for f in fs: if f.find(self.now_day) < 0: continue code = f.split("_")[1][:6] if target_codes and code not in target_codes: continue tick_path = os.path.join(tick_dir_path, f) with open(tick_path, mode='r') as f: lines = f.readlines() line = lines[0] line = line.replace("datetime.datetime(", "\"datetime.datetime(").replace("('PRC'))", "('PRC'))\"") line = line.replace("'", "\"").replace("\"PRC\"", "'PRC'") ticks = json.loads(line) for t in ticks: if t["created_at"].find("datetime.datetime") >= 0: created_at_line = t["created_at"].replace("datetime.datetime(", "") sts = created_at_line.split(",") year, month, day, hour, minute, second = sts[0].strip(), sts[1].strip(), sts[2].strip(), \ sts[ 3].strip(), \ sts[4].strip(), ( sts[5].strip() if len(sts) >= 7 else 0) dt = datetime.datetime(int(year), int(month), int(day), int(hour), int(minute), int(second)) t["created_at"] = dt.strftime("%Y-%m-%d %H:%M:%S") # 整理为分钟K线 code_ticks_dict[code] = ticks return code_ticks_dict def load_limit_up_data(self): """ 加载涨停数据 :return: 涨停数据记录[(代码, 日期, 板块, 是否炸板)] """ mysql = mysql_data_delegate.Mysqldb() results = mysql.select_all( f"select _code, _day, _hot_block_name, _open, _blocks from kpl_limit_up_record where _day>='{self.trade_days[-1]}' and _day <='{self.trade_days[0]}'") for r in results: r[2] = kpl_util.filter_block(r[2]) return results def __compute_limit_up_reasons_for_refer(self, block_infos): """ 计算参考票的涨停原因 @param block_infos: [(板块, 日期)] @return: """ # [(板块, 日期)] block_infos.sort(key=lambda x: x[1], reverse=True) # {"板块":[(出现次数, 最近出现日期)]} temp_dict = {} for b in block_infos: if b[0] in constant.KPL_INVALID_BLOCKS: continue if b[0] not in temp_dict: temp_dict[b[0]] = [0, b[1]] temp_dict[b[0]][0] += 1 if not temp_dict: return set() temp_list = [(k, temp_dict[k][0], temp_dict[k][1]) for k in temp_dict] # 按照涨停次数与最近涨停时间排序 temp_list.sort(key=lambda x: (x[1], x[2]), reverse=True) # 取涨停次数最多的和最近涨停的 # 取相同次数的原因 if temp_list: _list = [t for t in temp_list if t[1] == temp_list[0][1]] if _list[0][1] == 1: _list = _list[:1] blocks = set([x[0] for x in _list]) else: blocks = set() blocks -= constant.KPL_INVALID_BLOCKS # 去除例如概念这些泛指词 return set([kpl_util.filter_block(x) for x in blocks]) def load_code_plates_for_refer(self): """ 获取参考票的涨停原因 @return: """ sql = f"SELECT r.`_code`, r.`_day`, r.`_hot_block_name`, r.`_blocks`, r.`_open` FROM `kpl_limit_up_record` r WHERE r.`_day`>='{tool.date_sub(self.now_day, 365)}' and r.`_day` <'{self.now_day}'" mysql = mysql_data_delegate.Mysqldb() kpl_results = mysql.select_all(sql) # {"代码":[(板块, 日期), (板块, 日期)]} kpl_block_dict = {} for r in kpl_results: # 当日炸板的不计算原因 if r[4] == 1: continue code = r[0] if code not in kpl_block_dict: kpl_block_dict[code] = [] kpl_block_dict[code].append((r[2], r[1])) # (板块, 日期) reasons_dict = {} for code in kpl_block_dict: block_infos = kpl_block_dict.get(code) reasons_dict[code] = self.__compute_limit_up_reasons_for_refer(block_infos) return reasons_dict def load_target_plate_and_codes(self): """ 加载目标板块与对应的代码: 从最近60个交易日的真正涨停数据中 @return: {"板块":[代码]} """ end_date = self.trade_days[:60][-1] start_date = self.trade_days[:60][0] mysql = mysql_data_delegate.Mysqldb() # 获取上个交易日涨停的票 results = mysql.select_all( f"SELECT r.`_code` FROM `kpl_limit_up_record` r where r._day='{self.trade_days[0]}' and r._open = 0") exclude_codes = set([x[0] for x in results]) results = mysql.select_all( f"select r.`_hot_block_name` from `kpl_limit_up_record` r where r.`_open`=0 and r.`_day`>'{end_date}' and r.`_day` <= '{start_date}' group by r.`_hot_block_name`") blocks = set([x[0] for x in results]) fresults = {} all_buy_plates_of_codes = self.load_all_buy_plates_of_codes() valid_codes = set(all_buy_plates_of_codes.keys()) for b in blocks: sql = f""" SELECT * FROM ( SELECT r.`_code`, r.`_code_name`, COUNT(*) AS `count`, MAX(r.`_day`) AS _day FROM `kpl_limit_up_record` r WHERE r.`_open`=0 AND r.`_day`>'{end_date}' AND r.`_day`<='{start_date}' AND r.`_hot_block_name`='{b}' GROUP BY r.`_code` ) a ORDER BY a.count DESC,a._day DESC """ results = mysql.select_all(sql) # 取前1/3 if results: results = [x for x in results if (tool.is_can_buy_code(x[0]) and x[0] in valid_codes and x[0] not in exclude_codes)] # 取前1/3且涨停数是前10 max_count = len(results) // 3 if len(results) % 3 == 0 else len(results) // 3 + 1 # results = results[:max_count] # 取前10 results = results[:10] codes = [x[0] for x in results] fresults[kpl_util.filter_block(b)] = codes return fresults def load_trade_days(self): """ 加载交易日列表,now_day前120个交易日 :return: 交易日列表 """ # 将now_day转为datetime类型 current_date = datetime.datetime.strptime(self.now_day, '%Y-%m-%d').date() pre_date = current_date - datetime.timedelta(days=1) one_year_ago = (pre_date - datetime.timedelta(days=365)).strftime('%Y-%m-%d') pre_date = pre_date.strftime('%Y-%m-%d') if constant.is_windows(): trade_days = self.jueJinLocalApi.get_trading_dates(one_year_ago, pre_date) else: trade_days = HistoryKDatasUtils.get_trading_dates(one_year_ago, pre_date) trade_days.sort(reverse=True) trade_days = trade_days[:120] return trade_days def load_next_trade_day(self): """ 加载交易日列表,now_day前120个交易日 :return: 交易日列表 """ if constant.is_windows(): next_trade_day = self.jueJinLocalApi.get_next_trading_date(self.now_day) else: next_trade_day = HistoryKDatasUtils.get_next_trading_date(self.now_day) return next_trade_day def load_target_codes(self): """ 获取特殊的代码,需要订阅300w以上的大单 @return: 代码集合, 收盘价字典:{"code":收盘价} """ try: global_data_loader.load_zyltgb_volume_from_db() pre_close_price_dict = {} zylt_volume_map = global_util.zylt_volume_map codes = set() last_trade_day = self.trade_days[0] for code in zylt_volume_map: volume = zylt_volume_map.get(code) # 今日涨停价要突破昨日最高价 k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day) if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8: # 自由流通市值在30亿-300亿以上 limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2) max_price_info = max(k_bars[:1], key=lambda x: x["high"]) if limit_up_price > max_price_info["high"]: # 今日涨停价要突破昨日最高价 codes.add(code) pre_close_price_dict[code] = k_bars[0]["close"] return codes, pre_close_price_dict except Exception as e: return set(), None def load_plate_codes(self, plate_code, plate_name): """ 获取板块有可能买的目标代码 @param plate_code: @return:[(代码, 领涨次数, 最大领涨次数)] """ if not plate_code or plate_name == '无' or plate_name in constant.KPL_INVALID_BLOCKS: return set() if plate_code in self.plate_codes: return self.plate_codes.get(plate_code) dir_path = os.path.join(self.cache_path, "plate_codes_info", self.now_day) if not os.path.exists(dir_path): os.makedirs(dir_path) path_ = os.path.join(dir_path, plate_code + ".text") datas = [] if os.path.exists(path_): with open(path_, mode='r', encoding='utf-8') as f: codes_info = [] lines = f.readlines() for i in range(len(lines)): if i == 0: continue line = lines[i] if line: r = eval(line) datas.append(r) else: codes_info = [] results = self.request_plate_codes(plate_code) with open(path_, mode='w', encoding='utf-8') as f: f.write(plate_name) f.write("\n") f.writelines([f"{x}\n" for x in results]) datas = results # 保存到内存中 if datas: max_data = max(datas, key=lambda x: x[3]) for r in datas: if r[3] < 1: continue if re.match(r"风险|立案", r[2]): continue if r[1].find("ST") >= 0: continue if r[1].find("S") >= 0: continue if not tool.is_can_buy_code(r[0]): continue codes_info.append((r[0], r[3], max_data[3])) # if len(codes_info) >= 10: # break f_codes_info = codes_info # [d for d in codes_info if d[1] >= d[2] // 2] self.plate_codes[plate_code] = f_codes_info return self.plate_codes.get(plate_code) def load_all_codes_of_plates(self, is_for_buy=False): """ 加载所有板块的领涨票 @return:{"板块代码":(板块名称, [(代码,代码名称,标签,领涨次数)])} """ dir_path = os.path.join(self.cache_path, "plate_codes_info", self.now_day) if not os.path.exists(dir_path): return None fdata = {} plate_files = os.listdir(dir_path) for plate_file in plate_files: plate_code = plate_file.split(".")[0] path_ = os.path.join(dir_path, plate_file) with open(path_, mode='r', encoding='utf-8') as f: datas = [] lines = f.readlines() for i in range(len(lines)): if i == 0: continue line = lines[i] if line: r = eval(line) if not is_for_buy: if r[3] < 1: continue if r[1].find("ST") >= 0: continue if r[1].find("S") >= 0: continue else: if re.match(r"风险|立案", r[2]): continue if r[1].find("ST") >= 0: continue if r[1].find("S") >= 0: continue if not tool.is_can_buy_code(r[0]): continue datas.append(r) # if len(datas) >= 10: # break fdata[plate_code] = (kpl_util.filter_block(lines[0].strip()), datas) return fdata def load_all_refer_plates_of_codes(self): """ 加载所有有领涨代码的领涨板块 @return: """ datas = self.load_all_codes_of_plates() fdata = {} for plate_code in datas: plate_name = datas[plate_code][0] codes_info = datas[plate_code][1] for item in codes_info: code, limit_up_count = item[0], item[3] if code not in fdata: fdata[code] = [] fdata[code].append((plate_code, plate_name, limit_up_count)) for code in fdata: fdata[code].sort(key=lambda x: x[2], reverse=True) fdata[code] = fdata[code][:3] return fdata def load_all_buy_plates_of_codes(self): """ 加载所有代码的领涨板块 @return: {"代码":{"板块名称":(代码, 领涨次数, 最大领涨次数)}} """ datas = self.load_all_codes_of_plates(is_for_buy=True) fdata = {} if not datas: return fdata for plate_code in datas: plate_name = datas[plate_code][0] codes_info = datas[plate_code][1] if not codes_info: continue max_count = max(codes_info, key=lambda x: x[3])[3] for item in codes_info: code, limit_up_count = item[0], item[3] if code not in fdata: fdata[code] = {} fdata[code][plate_name] = (code, limit_up_count, max_count) fdata_dict = {c: [(p, fdata[c][p]) for p in fdata[c]] for c in fdata} for c in fdata_dict: fdata_dict[c].sort(key=lambda x: x[1], reverse=True) fdata_dict[c] = fdata_dict[c][:3] fdata = {code: {x[0]: x[1] for x in fdata_dict[code]} for code in fdata_dict} return fdata def request_plate_codes(self, plate_code): """ 获取板块的代码 @param plate_code: @return:[代码, 名称, 风险项, 领涨次数] """ fresults = [] for i in range(1, 10): results = kpl_api.getHistoryCodesByPlateOrderByLZCS(plate_code, self.now_day, "0930", i) results = json.loads(results)["list"] fresults.extend(results) if len(results) < 30: break fdatas = [] for result in fresults: d = result[0], result[1], result[2], result[40] fdatas.append(d) return fdatas def get_limit_up_reasons_with_plate_code(self): """ 获取涨停原因 :return: 涨停数据记录[(代码, 日期, 板块, 是否炸板)] """ mysql = mysql_data_delegate.Mysqldb() sql = """ SELECT _hot_block_code,_hot_block_name FROM ( SELECT r.`_hot_block_code`, r.`_hot_block_name`, r.`_create_time` FROM (SELECT DISTINCT(c.`_hot_block_code`) FROM `kpl_limit_up_record` c WHERE c.`_day`>'最小日期' and c.`_day`<'今日日期') a LEFT JOIN kpl_limit_up_record r ON r.`_hot_block_code` = a._hot_block_code ORDER BY r.`_create_time` DESC ) b GROUP BY b._hot_block_code HAVING b._hot_block_code IS NOT NULL """ sql = sql.replace("最小日期", self.trade_days[-1]).replace("今日日期", self.now_day) results = mysql.select_all(sql) return [x for x in results if kpl_util.filter_block(x[1]) not in constant.KPL_INVALID_BLOCKS] def load_jx_blocks(self, code): if code in self.jx_blocks: self.jx_blocks.get(code) # 从文件中读取 dir_path = os.path.join(self.cache_path, "jx_blocks", self.now_day) path_str = os.path.join(dir_path, f"{code}.txt") if os.path.exists(path_str): with open(path_str, mode='r', encoding='utf-8') as f: lines = f.readlines() blocks = eval(lines[0]) self.jx_blocks[code] = blocks if code in self.jx_blocks: return self.jx_blocks.get(code) blocks = kpl_api.getCodeJingXuanBlocks(code) blocks = set([kpl_util.filter_block(x[1]) for x in blocks]) blocks -= constant.KPL_INVALID_BLOCKS self.jx_blocks[code] = blocks # 保存到文件 if not os.path.exists(dir_path): os.makedirs(dir_path) with open(os.path.join(dir_path, f"{code}.txt"), mode='w', encoding='utf-8') as f: f.write(f"{blocks}") return blocks def load_all_jx_blocks(self): code_blocks = {} # 从文件中读取 dir_path = os.path.join(self.cache_path, "jx_blocks", self.now_day) files = os.listdir(dir_path) for file in files: code = file[:6] with open(os.path.join(dir_path, file), mode='r', encoding='utf-8') as f: code_blocks[code] = eval(f.readlines()[0]) return code_blocks class StrategyVariableFactory: @staticmethod def create_from_history_data(kline_data_1d, kline_data_60s_dict, limit_up_data_records, trade_days) -> StockVariables: """ 根据历史数据创建StockVariables对象 :param kline_data_1d: 日K线数据 :param kline_data_60s_dict:分钟k线字典:{"2025-01-01":{Bar格式}} :param limit_up_data_records: 代码的历史涨停数据:[(代码,日期,涨停原因)] :param trade_days: 交易日列表,从大到小倒序排列 :return: StockVariables实例 """ instance = StockVariables() # 设置K线相关属性 instance.昨日成交量 = KTickLineAnalyzer.get_yesterday_volume(kline_data_1d) instance.昨日收盘价 = KTickLineAnalyzer.get_yesterday_close(kline_data_1d) instance.昨日最高价 = KTickLineAnalyzer.get_yesterday_high(kline_data_1d) instance.昨日成交额 = KTickLineAnalyzer.get_yesterday_amount(kline_data_1d) instance.昨日非涨停 = not KTickLineAnalyzer.is_yesterday_limit_up(kline_data_1d) instance.昨日非炸板 = not KTickLineAnalyzer.is_yesterday_exploded(kline_data_1d) instance.昨日非跌停 = not KTickLineAnalyzer.is_yesterday_limit_down(kline_data_1d) instance.昨日最低价 = KTickLineAnalyzer.get_yesterday_low_price(kline_data_1d) instance.昨日开盘价 = KTickLineAnalyzer.get_yesterday_open_price(kline_data_1d) day_counts = [5, 10, 30, 60, 120] for day in [3, 5, 10, 30, 60, 120]: instance.__setattr__(f"日最高价_{day}", KTickLineAnalyzer.get_recent_days_high(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"日最高量_{day}", KTickLineAnalyzer.get_recent_days_max_volume(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"涨停数_{day}", KTickLineAnalyzer.get_recent_limit_up_count(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"炸板数_{day}", KTickLineAnalyzer.get_recent_exploded_count(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"跌停数_{day}", KTickLineAnalyzer.get_recent_limit_down_count(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"日首板溢价率_{day}", KTickLineAnalyzer.get_first_limit_up_avg_premium(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"日首板炸板溢价率_{day}", KTickLineAnalyzer.get_first_exploded_avg_premium(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"日首板个数_{day}", KTickLineAnalyzer.get_first_limit_up_days(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"日二板个数_{day}", KTickLineAnalyzer.get_second_limit_up_days(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"日三板个数_{day}", KTickLineAnalyzer.get_third_limit_up_days(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"日大等于四板个数_{day}", KTickLineAnalyzer.get_fourth_or_more_limit_up_days(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"日首跌停个数_{day}", KTickLineAnalyzer.get_first_limit_down_days(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"日2次跌停个数_{day}", KTickLineAnalyzer.get_second_limit_down_days(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"日3次跌停个数_{day}", KTickLineAnalyzer.get_third_limit_down_days(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"日大等于4次跌停个数_{day}", KTickLineAnalyzer.get_fourth_or_more_limit_down_days(kline_data_1d, day)) for day in [5, 10, 15, 30, 60, 120]: instance.__setattr__(f"日放倍量日期_{day}", KTickLineAnalyzer.get_recent_days_double_volume_date(kline_data_1d, day)) for day in day_counts: days = trade_days[:day] instance.__setattr__(f"日个股最正的原因_{day}", KPLLimitUpDataAnalyzer.get_most_common_reasons(limit_up_data_records, min_day=days[-1], max_day=days[0])) if kline_data_60s_dict: for day in day_counts: # 获取日K最高量的信息 volume, k_data = KTickLineAnalyzer.get_recent_days_max_volume(kline_data_1d, day) d = k_data["bob"][:10] kline_data_60s = kline_data_60s_dict.get(d) fdata = K60SLineAnalyzer.get_close_price_of_max_volume(kline_data_60s) instance.__setattr__(f"日高量分时最高量价_{day}", fdata) kline_data_60s = kline_data_60s_dict.get(trade_days[0]) fdata = K60SLineAnalyzer.get_close_price_of_max_volume(kline_data_60s) instance.__setattr__(f"昨日分时最高量价", fdata) if KTickLineAnalyzer.is_too_high_and_not_relase_volume(kline_data_1d): instance.涨得高未放量 = True else: instance.涨得高未放量 = False return instance def __test_jx_blocks(__DataLoader): def load_all_codes(): codes = set() with open("D:/codes/codes_sh.text") as f: lines = f.readlines() codes |= set([x.strip() for x in lines if x.find("30") != 0]) with open("D:/codes/codes_sz.text") as f: lines = f.readlines() codes |= set([x.strip() for x in lines if x.find("30") != 0]) return codes code_blocks = __DataLoader.load_all_jx_blocks() # codes = load_all_codes() # for code in codes: # print(code) # __DataLoader.load_jx_blocks(code) codes = ["002639", "002366"] same_blocks = set() for c in codes: blocks = __DataLoader.load_jx_blocks(c) if not same_blocks: same_blocks = blocks same_blocks &= blocks print("相同板块", same_blocks) for code in code_blocks: if len(code_blocks[code] & same_blocks) == len(same_blocks): if code in codes: continue print(code, code_blocks[code]) def __load_target_codes_v1(): """ 50亿以下的 @return: """ def get_zylt(code): zylt_volume_map = global_util.zylt_volume_map last_trade_day = __DataLoader.trade_days[0] volume = zylt_volume_map.get(code) # 今日涨停价要突破昨日最高价 k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day) return k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) __DataLoader = DataLoader('2025-06-13') global_data_loader.load_zyltgb_volume_from_db() results = __DataLoader.load_target_plate_and_codes() # for k in results: # print(k, results[k]) plates = ["天然气", "军工"] print("==========新题材=======") for p in plates: codes = [x for x in results.get(p)] # if get_zylt(x) < 31e8 print("======", p) for code in codes: print("\t\t", code, gpcode_manager.CodesNameManager().get_code_name(code)) if __name__ == "__main__": # __load_target_codes_v1() __DataLoader = DataLoader("2025-06-04") # __test_jx_blocks(__DataLoader) # instance = StockVariables() # day = 5 # instance.__setattr__(f"日最高价_{day}", 12.00) # print(instance.日最高价_5) # 下载目标票的板块 # fdata = __DataLoader.load_all_refer_plates_of_codes() # print(fdata.get("000833")) # result_dict = __DataLoader.load_code_plates_for_refer() # print(result_dict["301279"]) results = __DataLoader.load_target_plate_and_codes() # for k in results: # print(k, results[k]) plates = ["锂电池"] print("==========新题材=======") for p in plates: print(p, results.get(p)) # print("食品饮料", results.get("食品饮料")) # print("锂电池", results.get("锂电池")) # print("数字经济", results.get("数字经济")) # print("地产链", results.get("地产链")) # print("物流", results.get("物流")) # 下载涨停原因板块对应的代码 plates = __DataLoader.get_limit_up_reasons_with_plate_code() for p in plates: print(p) __DataLoader.load_plate_codes(p[0], p[1]) # DataLoader("2025-05-06").load_tick_data() # # print(re.match(r"风险|立案", "风险123"))