import datetime
|
import json
|
import os
|
import threading
|
import time
|
import dask
|
import constant
|
from log_module import async_log_util
|
|
from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_debug, logger_kpl_market_sift_plate, \
|
logger_kpl_limit_up, logger_kpl_code_plates
|
from strategy import kpl_api, data_cache, basic_methods, trading_dates_manager
|
from strategy.kpl_data_manager import KPLMarketStockHeatLogManager
|
from strategy.trading_dates_manager import TradingDatesManager
|
from utils import tool, hx_qc_value_util
|
|
|
# 获取行情精选板块 强度排名
|
def get_market_sift_plate_its_stock_power():
|
"""
|
|
:return: {板块:[代码信息]}, 精选流入板块
|
"""
|
|
@dask.delayed
|
def batch_get_plate_codes(fs):
|
return fs
|
|
@dask.delayed
|
def request_plate_codes(i):
|
plate_name = i[1]
|
its_stock = json.loads(kpl_api.getCodesByPlate(i[0]))
|
# now_time_str = tool.get_now_time_str()
|
# if data_cache.OPENING_TIME < now_time_str < data_cache.NOON_MARKET_TIME:
|
log_data = {plate_name: its_stock['list']}
|
# 尝试过滤掉无意义的概念板块 constant.BLACK_CONCEPT_VALUELESS_PLATE_LIST 默认拉黑(无意义板块)的常量【代表着有无强度可能】
|
if (plate_name not in constant.BLACK_CONCEPT_VALUELESS_PLATE_LIST) or ('次新' in plate_name or 'ST' in plate_name or '破净股' in plate_name):
|
# print(f"{i[1]} 强度:{i[2]}")
|
# 通过板块ID获取其下面的个股强度列表
|
# print(f"======={i[0]}=======")
|
|
# its_stock_list_info = its_stock['list']
|
# logger.info(f"its_stock_list_info==={its_stock_list_info}")
|
# 将板块强度下面对应的个股列表打印到日志中
|
# for i in its_stock_list_info:
|
# if i[0] != 1:
|
# logger.info(
|
# f"l === 个股代码:{i[0]},公司名称:{i[1]},主力资金推测:{i[2]},未知0值:{i[3]},概念:{i[4]},最新价:{i[5]},当日当时涨幅:{i[6]}%,"
|
# f"成交额:{round(i[7] / 100000000, 2)} 亿,实际换手率:{i[8]}%,未知0值:{i[9]},实际流通:{round(i[10] / 100000000, 2)}亿,"
|
# f"主力买:{round(i[11] / 100000000, 2)}亿,"
|
# f"主力卖:{round(i[12] / 100000000, 2)}亿,"
|
# f"主力净额:{round(i[13] / 10000, 2)}万,买成占比:{i[14]}%,卖成占比:{i[15]}%,净成占比:{i[16]}%,买流占比:{i[17]}%,卖流占比:{i[18]}%,净流占比:{i[19]}%,"
|
# f"区间涨幅:{i[20]}%,量比:{i[21]},未知0:{i[22]},上板情况:{i[23]},上板排名:{i[24]},换手率:{i[25]}%,"
|
# f"未知空值:{i[26]},未知零值:{i[27]},收盘封单:{i[28]},最大封单:{i[29]},未知空值?:{i[30]},"
|
# f"?:{i[30]}%,?:{i[31]},??:{i[32]},振幅:{i[33]}%,未知0????:{i[34]},未知0?????:{i[35]},"
|
# f"?=:{i[36]},?总市值:{i[37]},?流通市值:{i[38]},最终归属概念(收盘后出数据?):{i[39]},领涨次数:{i[40]},"
|
# f"41未知1值:{i[41]},上季度机构持仓【str数据勿用运算符】:{i[42]}万,?年预测净利润:{i[43]},上年预测净利润:{i[44]},年内预测净利润:{i[45]}"
|
# )
|
|
# 初始化股票强度列表
|
stock_power_list = []
|
for s in its_stock['list']:
|
# 过滤掉涨幅大于 当日涨幅s[6] < 0% 的 和 名称中包含ST的 和 涨速小于等于0%的 和 只要昨日未涨停 和 上证或深证的正股 and s[9] > 1(s[9]=涨速) 上季度机构持仓 >0
|
if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find(
|
"板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1:
|
if int(s[42]) >= 0:
|
# print(f"{s[1]},个股代码:{s[0]}, 涨幅:{s[6]}% 涨速:{s[9]}% 概念:{s[4]} 主力资金推测:{s[2]} 领涨次数:{s[40]} 今日第几板:{s[23]} 是否破版{s[24]}")
|
# 对个股强度 主要 属性列表进行装填
|
its_stock_power = [s[1], s[0], s[6], s[9], s[4], s[2], s[40]]
|
# 逐个选择性添加its_stock中的元素到个股强度列表中
|
# print(f"its_stock_power===={its_stock_power}")
|
# 整体将添加完善的个股强度列表添加到股票列表中
|
stock_power_list.append(its_stock_power)
|
else:
|
# 加入策略时间段限制
|
if data_cache.OPENING_TIME < now_time < data_cache.NOON_MARKET_TIME:
|
logger_common.info(f"【{s[1]}】,上季度机构持仓{int(s[42])} 小于0 被过滤掉")
|
filtered_stock_info = {
|
"code": s[0],
|
"sec_name": s[1],
|
"increase": s[6],
|
"institutional_holdings": int(s[42])
|
}
|
data_cache.filtered_stock_info_set.update()
|
|
|
# print(f"stock_power_list===={stock_power_list}")
|
# 过滤掉没有瞬时高强度个股的空概念
|
if len(stock_power_list) != 0:
|
# 将对应板块的股票强度列表新建一个字典
|
stock_power_item = {i[1]: stock_power_list}
|
# 并更新到精选板块个股字典中
|
market_sift_plate_stock_dict.update(stock_power_item)
|
return log_data
|
|
# 定义一个时间段,在这个时间段内才会执行下面的代码,主要就是把强度数据作为日志打印存储下来。
|
now_time = tool.get_now_time_str()
|
if '11:30:10' < now_time < '12:59:50':
|
return
|
data = (kpl_api.getMarketJingXuanRealRankingInfo())
|
market_sift_plate = json.loads(data)
|
# print(f"market_sift_plate 数 ======{len(market_sift_plate['list'])}")
|
|
# 精选板块【前20】 market_sift_plate['list'] ======
|
if data_cache.OPEN_BIDDING_TIME < now_time < data_cache.AFTER_CLOSING_TIME:
|
logger_kpl_market_sift_plate.info(f"{market_sift_plate['list']}")
|
# 总控制时间段
|
if not (data_cache.OPEN_BIDDING_TIME < now_time < data_cache.AFTER_CLOSING_TIME):
|
return
|
# ['801235', '化工', 6996, 0.027, 2.43, 117836347690, -122548038, 8105997595, -8228545633, 0.92, 8595377775454, 0.09, 332297449, 9954902621130, -192457252, 24.0487, 17.1809, 6996, 0.027]
|
# market_sift_plate['list'][0] = ['801062', '军工', 3520, -0.49, 0.666, 139133934669, 383864272, 9077352839, -8693488567, 1.183, 6129448037490,-0.12, 168245858, 7088854452019, -290614763, 50.2408, 30.3672, 3520, 0]
|
# 行情精选板块列表 前20 中的 第一个板块列表数据 = 【代码,板块名称,强度,涨幅?,量比?,成交额?,现额?,主买,主卖,1.183?,流通值?,-0.12?,300W大单净额?,总市值?,上季度机构增仓,今年平均PE,次年平均PE,强度,未知0值】
|
# logger.info(f"market_sift_plate['list'][0] ======{market_sift_plate['list'][0]}")
|
# 初始化精选板块对应个股字典
|
market_sift_plate_stock_dict = {}
|
if 'list' in market_sift_plate:
|
ds = []
|
for d in market_sift_plate['list']:
|
ds.append(request_plate_codes(d))
|
dask_result = batch_get_plate_codes(ds)
|
compute_results = dask_result.compute()
|
log_datas = {}
|
for r in compute_results:
|
if not r:
|
continue
|
for b in r:
|
log_datas[b] = r[b]
|
|
# logger.info(f"精选板块股票强度数据更新 == {market_sift_plate_stock_dict}")
|
# 只在盘中时间获取
|
KPLMarketStockHeatLogManager().add_log(market_sift_plate['list'], log_datas)
|
# 行情》精选板块》排名前20中》对应个股》符合条件的个股
|
return market_sift_plate_stock_dict, market_sift_plate.get("list", [])
|
|
|
# 调用一下获取精选板块股票强度数据函数 【本模块内使用时调用】
|
# get_market_sift_plate_its_stock_power()
|
# 精选流入字典
|
|
def get_market_sift_plate_its_stock_power_process(callback):
|
while True:
|
try:
|
# now = time.time()
|
# print(f"kpl_limit_up_process开始了{now}")
|
start_time = time.time()
|
now_time = tool.get_now_time_str()
|
if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME:
|
its_stock_power = get_market_sift_plate_its_stock_power()
|
time_str = datetime.datetime.now().strftime("%H%M%S")
|
if 92900 < int(time_str) < 95000:
|
# logger_kpl_jingxuan_in 打印的日志专用于开盘了数据的存储分析,不能轻易删除
|
logger_kpl_jingxuan_in.info(f"耗时:{time.time() - start_time} 数据:{its_stock_power[0]}")
|
callback(its_stock_power)
|
# print(f"精选板块拉升个股更新===={its_stock_power}")
|
except Exception as e:
|
logger_debug.exception(f"开盘啦板块强度线程报错An error occurred: {e}")
|
finally:
|
time.sleep(1)
|
|
|
# 获取涨停板块名称列表并存储本地的函数
|
def get_limit_up_block_names():
|
# 设定当前时间点
|
now_time = tool.get_now_time_str()
|
# print(f"now_time===={now_time}")
|
if data_cache.OPEN_BIDDING_TIME < now_time < data_cache.AFTER_CLOSING_TIME:
|
# print(f"在时间内使用--------------------------")
|
# 获取涨停信息列表
|
limit_up_info = kpl_api.get_limit_up_info()
|
# print(f"limit_up_info=={limit_up_info}")
|
# [股票代码,股票名称,未知布尔值,未知空值,涨停时间戳,涨停原因,封单金额,最大封单金额,主力净额,主力买,主力卖,成交额,最匹配概念,实际流通,实际换手,连板次数,未知的布尔值,振幅,封板状态,所属板块代码,所属板块封板数量]
|
# ['002217', '合力泰', 0, '', 1750728300, 'ST摘帽', 181201440, 305027648, -11357517, 100591704, -111949221,139796123, '面板、汽车零部件', 6857734517, 2.04, 1, 0, 0, '首板', '801082', 1]
|
# ['002703', '浙江世宝', 1, '', 1750728300, '无人驾驶', 315849152, 626532830, 139152274, 196166314, -57014040,203671488, '一季报增长、汽车零部件', 3707643911, 5.49, 1, 0, 0, '首板', '801064', 9]
|
data_cache.limit_up_info = limit_up_info
|
# 提取涨停列表中的板块名称
|
limit_up_block_names = []
|
# 循环添加涨停概念
|
for i in limit_up_info:
|
limit_up_block_names.append(i[5])
|
# print(f"limit_up_block_names==={limit_up_block_names}")
|
# return limit_up_block_names
|
# # 使用Counter计算每个元素的出现次数
|
# counter = Counter(limit_up_block_names)
|
# # 找出出现次数最多的元素及其次数
|
# most_common_element, most_common_count = counter.most_common(1)[0]
|
# # 打印出现次数最多的元素
|
# print(f"主线概念:{most_common_element},出现了 {most_common_count} 次")
|
async_log_util.info(logger_kpl_limit_up, f"{limit_up_info}")
|
|
return limit_up_block_names
|
|
|
# 为开盘啦接口获取的涨停列表概念板块单独开一个进程 形参(callback)
|
def kpl_limit_up_process(callback):
|
while True:
|
try:
|
# now = time.time()
|
# print(f"kpl_limit_up_process开始了{now}")
|
limit_up_block_names = get_limit_up_block_names()
|
callback(limit_up_block_names)
|
# logger.info(f"涨停更新===={limit_up_block_names}")
|
# print(f"涨停更新数量===={len(limit_up_block_names)}")
|
# print(f"kpl_limit_up_process完成一下{now}")
|
except Exception as e:
|
logger_debug.error(f"开盘啦涨停板块概念线程报错An error occurred: {e}")
|
finally:
|
time.sleep(1.5)
|
|
|
# kpl_limit_up_process()
|
|
|
# 构建每日信息读写对象
|
class DailyInfoDataEntryStorageManager:
|
# 初始化文件路径
|
# def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH):
|
# self.file_path = file_path
|
def __init__(self, file_path): # 移除默认值,要求调用时必须提供
|
self.file_path = file_path
|
|
# print(f"实例构建完成")
|
# 添加单日涨停信息数据到文件中的一行 函数
|
def append_data_to_file(self, data_to_append):
|
# print(f"data_to_append=={data_to_append}")
|
# 读取所有行并解析为 JSON 对象列表
|
if os.path.exists(self.file_path):
|
with open(self.file_path, 'r', encoding='utf-8') as file:
|
# 获取当前日期并格式化
|
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
lines = [json.loads(line.strip()) for line in file if line.strip()]
|
# print(f"lines type=={type(lines)}")
|
# print(f"lines=={lines}")
|
# 检查当前日期是否已存在于文件中
|
if lines: # 如果读取到的行文件列表不为空(为真)
|
if lines[-1].get(current_date) is None: # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期)
|
# 将日期和data_to_append转换为JSON格式的字符串
|
json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n'
|
# 打开文件并追加JSON行
|
with open(self.file_path, 'a', encoding='utf-8') as file:
|
file.write(json_line)
|
# print(f"已写入数据1")
|
else:
|
logger_common.info(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)")
|
else:
|
json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n'
|
# 打开文件并追加JSON行
|
with open(self.file_path, 'a', encoding='utf-8') as file:
|
file.write(json_line)
|
# print(f"已写入数据2")
|
|
# 清理多余数据函数
|
def check_and_remove_oldest_entry(self, max_entries):
|
# 读取所有行并解析为 JSON 对象列表
|
if os.path.exists(self.file_path):
|
with open(self.file_path, 'r', encoding='utf-8') as file:
|
lines = [json.loads(line.strip()) for line in file if line.strip()]
|
else:
|
lines = []
|
|
# 如果行数超过限制,移除最早的一些行
|
if len(lines) >= max_entries:
|
# 截断列表,只保留最新的 max_entries 个对象
|
lines = lines[-max_entries:]
|
# 重新打开文件以写入模式,并写入截断后的对象列表为 JSON Lines
|
with open(self.file_path, 'w', encoding='utf-8') as file:
|
for obj in lines:
|
file.write(json.dumps(obj, ensure_ascii=False) + '\n')
|
# file.write(json.dumps(obj, ensure_ascii=False))
|
|
# 隔行整理数据并合并装入一个字典数据中调用时返回这个字典数据 函数
|
def arrange_limit_up_info(self):
|
limit_info = {}
|
# 创建一个列表来存储所有解析的 JSON 对象
|
if os.path.exists(self.file_path):
|
with open(self.file_path, 'r', encoding='utf-8') as file:
|
for line in file:
|
# 去除每行末尾的换行符(如果有的话)
|
line = line.rstrip('\n')
|
# 将每行解析为一个 JSON 对象
|
info = json.loads(line)
|
# 假设每行都是一个字典数据,且只有一个键值对,其中键是日期
|
if isinstance(info, dict) and len(info) == 1:
|
date, data = list(info.items())[0]
|
limit_info[date] = data
|
return limit_info
|
|
|
# 构建一个获取读写存储本地的并整理涨停数据的函数
|
def get_arrange_limit_up_info():
|
# 实例化每日涨停信息整理方法
|
manager = DailyInfoDataEntryStorageManager(constant.KPL_LIMIT_UP_DATA_PATH)
|
manager.append_data_to_file(kpl_api.get_limit_up_info())
|
manager.check_and_remove_oldest_entry(max_entries=1000)
|
|
|
# 构建一个获取读写存储本地的并整理kpl精选流入强度数据的函数
|
def get_arrange_plate_strength_info():
|
# 实例化每日kpl精选流入强度数据整理方法
|
manager = DailyInfoDataEntryStorageManager(constant.KPL_PLATE_STRENGTH_DATA_PATH)
|
data = (kpl_api.getMarketJingXuanRealRankingInfo())
|
market_sift_plate = json.loads(data)['list']
|
manager.append_data_to_file(market_sift_plate)
|
manager.check_and_remove_oldest_entry(max_entries=1000)
|
# get_arrange_plate_strength_info()
|
|
|
# 构建一个处理历史涨停涨停信息数据的函数
|
def get_handling_limit_up_info():
|
# 实例化每日涨停信息整理方法
|
history_limit_up_info = DailyInfoDataEntryStorageManager(constant.KPL_LIMIT_UP_DATA_PATH)
|
data_cache.daily_limit_up_info = history_limit_up_info.arrange_limit_up_info()
|
# logger.info(f"读本地的日更的历史涨停数据=={data_cache.daily_limit_up_info}")
|
|
# print(f"daily_limit_up_info 类型==={type(data_cache.daily_limit_up_info)}")
|
# 统计每日主线
|
daily_limit_up_info_len = len(data_cache.daily_limit_up_info)
|
# print(f"daily_limit_up_info_len==={daily_limit_up_info_len}")
|
|
# 制备历史交易日历
|
historical_transaction_date_list = []
|
date_of_the_day = data_cache.DataCache().today_date
|
print(f"date_of_the_day = {date_of_the_day}")
|
for i in range(daily_limit_up_info_len):
|
pre_date = hx_qc_value_util.get_previous_trading_date(date_of_the_day) # 获取前一个交易日API
|
# print(f"pre_date ==== {pre_date}")
|
# pre_date = '2025-07-30' # 测试用
|
# target_date_str = basic_methods.pre_num_trading_day(data_cache.today_date, daily_limit_up_info_len)
|
# target_date = datetime.datetime.strptime(pre_date, "%Y-%m-%d").strftime("%Y-%m-%d")
|
historical_transaction_date_list.append(pre_date)
|
date_of_the_day = pre_date
|
# 历史板块排名列表 (意义不明,基本无效)【】【】【(意义不明,基本无效,后续可能另寻他法)】【】【】
|
# print(f"historical_transaction_date_list={historical_transaction_date_list}")
|
history_sorted_plate_ranking_list = []
|
for key, value in data_cache.daily_limit_up_info.items():
|
# print(f"key=={key}")
|
for i in historical_transaction_date_list:
|
# print(f"key===={key}")
|
# print(f"i======={i}")
|
# 找到每上一个交易日对应的本地数据的信息
|
if key == i:
|
# print(f"{key}===找到了!value={value}")
|
#
|
plate_ranking_list = []
|
# 遍历交易日每一个涨停股的信息
|
for v in value:
|
# print(f"v =={v}")
|
# 将每一个涨停股的涨停概念和同班级数量 汇编为一个字典
|
plate_limit_up_num_dict = {
|
v[5]: v[20]
|
}
|
# 将这个字典数据不重复的添加到概念排名列表中
|
if plate_limit_up_num_dict not in plate_ranking_list:
|
plate_ranking_list.append(plate_limit_up_num_dict)
|
# plate_ranking_set.add(v[20])
|
# print(f"plate_ranking_list={plate_ranking_list}")
|
# 使用sorted函数和lambda表达式来根据字典的值进行排序
|
# 这里我们确保不修改原始字典,仅通过list(x.values())[0]来获取值
|
sorted_plate_ranking_list = sorted(plate_ranking_list, key=lambda x: list(x.values())[0], reverse=True)
|
# logger.info(f"{key}=====>>>>{sorted_plate_ranking_list}")
|
history_sorted_plate_ranking_list.append(sorted_plate_ranking_list)
|
# print(f"history_sorted_plate_ranking_list={history_sorted_plate_ranking_list}")
|
# for ranking_list in history_sorted_plate_ranking_list:
|
# print(f"ranking_list={ranking_list}")
|
# for i in ranking_list:
|
# print(f"i={i}")
|
# 调用函数,传入整个列表
|
# count_key_occurrences(history_sorted_plate_ranking_list)
|
# daily_limit_up_info_list = list(reversed(daily_limit_up_info_list))
|
# print(f"daily_limit_up_info_list==={daily_limit_up_info_list}")
|
|
# def get_yesterday_limit_up_code_list():
|
# history_limit_up_info = DailyInfoDataEntryStorageManager(constant.KPL_LIMIT_UP_DATA_PATH)
|
# data_cache.daily_limit_up_info = history_limit_up_info.arrange_limit_up_info()
|
# 获取昨日涨停代码 (以便与K线对比) '2025-07-30'
|
pre_trading_day_limit_up_info = data_cache.daily_limit_up_info.get(data_cache.DataCache().pre_trading_day)
|
# pre_trading_day_limit_up_info = data_cache.daily_limit_up_info.get('2025-07-31')
|
if pre_trading_day_limit_up_info is not None:
|
yesterday_limit_up_code_list = []
|
for i in pre_trading_day_limit_up_info:
|
# symbol_code = basic_methods.format_stock_symbol(i[0])
|
limit_up_code = i[0]
|
yesterday_limit_up_code_list.append(limit_up_code)
|
data_cache.yesterday_limit_up_code_list = yesterday_limit_up_code_list
|
print(f"data_cache.yesterday_limit_up_code_list=={data_cache.yesterday_limit_up_code_list}")
|
logger_common.info(f"昨日涨停股票数量=={len(data_cache.yesterday_limit_up_code_list)}")
|
logger_common.info(f"昨日涨停代码列表=={yesterday_limit_up_code_list}")
|
|
|
# 计算历史涨停概念的连续出现次数函数
|
def count_key_occurrences(list_of_dicts_lists):
|
# 创建一个字典来存储每个键的总出现次数
|
key_counts = {}
|
# 遍历列表中的每个字典列表
|
for sublist in list_of_dicts_lists:
|
# 遍历当前字典列表中的每个字典
|
for dict_item in sublist:
|
# 遍历字典中的每个键
|
for key in dict_item:
|
# 如果键不在key_counts中,则初始化计数为0
|
if key not in key_counts:
|
key_counts[key] = 0
|
# 增加当前键的计数
|
key_counts[key] += 1
|
# 打印结果
|
for key, count in key_counts.items():
|
if count > 1:
|
logger_common.info(f"'{key}' 连续出现 {count} 次")
|
|
|
# 获取全部个股的板块并存储的函数
|
def get_all_stocks_plate_dict(stocks_list):
|
all_stocks_plate_dict = {}
|
# 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中
|
for i in stocks_list:
|
try:
|
code = i.split('.')[1]
|
# print(f"i==={i}")
|
# 获取个股的自由市值
|
free_market_value = kpl_api.getZYLTAmount(code)
|
# 获取个股的板块列表
|
selected_blocks = kpl_api.getStockIDPlate(code)
|
# 提取精选板块中的板块名称
|
selected_plate_list = [block[1] for block in selected_blocks]
|
# print(f"selected_block_names==={selected_block_list}")
|
block_data = {
|
# 添加自由市值
|
'free_market_value': free_market_value,
|
# 添加精选板块
|
'plate': selected_plate_list
|
}
|
# 将code作为键,stocks_selected_block_data作为值添加到stocks_block_data字典中
|
all_stocks_plate_dict[code] = block_data
|
# print(f"all_stocks_plate_dict==={all_stocks_plate_dict}")
|
except Exception as e:
|
print(f"获取全部个股的板块并存储的函数 An error occurred: {e}")
|
finally:
|
pass
|
# return stocks_plate_data
|
# print(f"all_stocks_plate_dict==={len(all_stocks_plate_dict)}")
|
# 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中
|
# 将字典转换为JSON格式的字符串
|
json_data = json.dumps(all_stocks_plate_dict)
|
# 写入文件
|
with open(constant.ALL_STOCKS_PLATE_PATH, 'w', encoding='utf-8') as f:
|
f.write(json_data)
|
# 记录到日志
|
async_log_util.info(logger_kpl_code_plates, json_data)
|
now_time = datetime.datetime.now() # 获取本机时间
|
logger_common.info(f"写入所有个股板块文件完成!::{now_time}")
|
|
|
# 计算开盘啦昨日拉取的概念数据中为空的股票数量函数
|
def get_have_no_plate_num():
|
# 初始化无概念数量
|
have_no_plate_num = 0
|
plate_are_null_list = []
|
for k, v in data_cache.all_stocks_plate_dict.items():
|
pass
|
# print(f"i==={i} T==={t}")
|
if len(v['plate']) == 0:
|
have_no_plate_num += 1
|
# print(f"{k}的概念为空")
|
# logger.info(f"{k}的概念为空")
|
# 股票代码格式转化为掘金格式
|
symbol = basic_methods.format_stock_symbol(k)
|
sec_name = data_cache.all_stocks_all_K_line_property_dict.get(symbol)
|
if sec_name is not None:
|
plate_are_null_list.append(sec_name)
|
logger_common.info(f"有{have_no_plate_num}只股票概念为空")
|
print(f"有{have_no_plate_num}只股票概念为空")
|
logger_common.info(f"个股有历史K线但概念为空的有:{plate_are_null_list}")
|
|
|
# 获取全部个股的精选板块并存储的函数
|
def stocks_list_selected_blocks(min_stocks):
|
stocks_selected_block_data = []
|
# 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中
|
for i in min_stocks:
|
try:
|
code = i.split('.')[1]
|
# 获取个股的自由市值
|
free_market_value = kpl_api.getZYLTAmount(code)
|
# 获取个股的精选板块列表
|
# selected_blocks = getCodeJingXuanBlocks('000021')
|
selected_blocks = kpl_api.getCodeJingXuanBlocks(code)
|
# 提取精选板块中的板块名称
|
selected_block_list = [block[1] for block in selected_blocks]
|
# print(f"selected_block_names==={selected_block_list}")
|
stocks_selected_block_dict = {
|
# 添加股票代码
|
'code': code,
|
# 添加自由市值
|
'free_market_value': free_market_value,
|
# 添加精选板块
|
'selected_block': selected_block_list
|
}
|
stocks_selected_block_data.append(stocks_selected_block_dict)
|
# print(f"stocks_selected_block_data==={stocks_selected_block_dict}")
|
except Exception as e:
|
logger_debug.error(f"获取全部个股的精选板块并存储的函数 An error occurred: {e}")
|
|
# print(f"stocks_selected_block_data==={len(stocks_selected_block_data)}")
|
# 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中
|
# 将字典转换为JSON格式的字符串
|
json_data = json.dumps(stocks_selected_block_data)
|
# 写入文件
|
with open('local_storage_data/stocks_selected_block_data.json', 'w', encoding='utf-8') as f:
|
f.write(json_data)
|
now_time = datetime.datetime.now() # 获取本机时间
|
print(f"写入精选板块文件完成!::{now_time}")
|
|
|
# 构建一个获取读写存储本地的并整理kpl精选流入强度数据的函数
|
def get_arrange_plate_strength_info():
|
# 实例化每日kpl精选流入强度数据整理方法
|
manager = DailyInfoDataEntryStorageManager(constant.KPL_PLATE_STRENGTH_DATA_PATH)
|
data = (kpl_api.getMarketJingXuanRealRankingInfo())
|
market_sift_plate = json.loads(data)['list']
|
manager.append_data_to_file(market_sift_plate)
|
manager.check_and_remove_oldest_entry(max_entries=1000)
|
|
|
# 构建每日板块强度历史对象
|
class DailyHistoricalPlateIntensity:
|
# 初始化
|
def __init__(self, file_path): # 移除默认值,要求调用时必须提供
|
self.file_path = file_path
|
|
# 需要构建一个每日板块强度列表的函数
|
def get_data_list(self):
|
# 读取每日板块强度文件
|
data_list = []
|
with open(constant.KPL_PLATE_STRENGTH_DATA_PATH, 'r', encoding='utf-8') as file:
|
for line in file:
|
if line.strip(): # 跳过空行
|
# print(f"line=={line}")
|
data_list.append(json.loads(line))
|
# print(f"data_list={data_list}")
|
return data_list # 返回数据
|
|
# 构建每日板块强度检索函数
|
def find_plate_name_in_data(self, data_list, plate_name):
|
# data = []
|
data_list.reverse()
|
daily_net_amount_dict = {}
|
# 以日期进行检索
|
for i in data_list:
|
# print(f"I={i}")
|
for key, value in i.items():
|
# if key == '2025-07-17':
|
for i in value:
|
if i[1] == plate_name:
|
print(f"{key} 板块:{i[1]} 主力净额:{round(i[6]/100000000,2)}亿")
|
i_net_amount = round(i[6] / 100000000, 2)
|
# data.append(i_net_amount)
|
date_net_amount_dict = {
|
key: i[6]
|
}
|
daily_net_amount_dict.update(date_net_amount_dict)
|
# print(f"daily_net_amount_dict={daily_net_amount_dict}")
|
|
return daily_net_amount_dict
|
|
def calculate_plate_trend(plate_name):
|
# 获取前7个交易日期列表
|
today_date = datetime.datetime.now().strftime("%Y-%m-%d")
|
print(f"today_date={today_date}")
|
TradingDatesManager = trading_dates_manager.TradingDatesManager()
|
pre_trading_day = TradingDatesManager.get_pre_trading_day(today_date)
|
print(f"pre_trading_day={pre_trading_day}")
|
# 统计交易日数量
|
count = 7
|
pres_trading_date_list = TradingDatesManager.get_pres_trading_days(today_date, count = 7)
|
logger_common.info(f"pres_trading_date_list=={pres_trading_date_list}")
|
print(f"pres_trading_date_list=={pres_trading_date_list}")
|
|
# 创建实例(传入 file_path)
|
plate_intensity = DailyHistoricalPlateIntensity(file_path=constant.KPL_PLATE_STRENGTH_DATA_PATH)
|
# 调用方法
|
data_list = plate_intensity.get_data_list() # 获取kpl板块强度历史数据
|
# 整理出板块主力净额
|
target_plate_net_amount = plate_intensity.find_plate_name_in_data(data_list, plate_name) # 查找板块
|
# print(f"target_plate_net_amount=={target_plate_net_amount}")
|
target_plate_net_amount_list = []
|
for i in pres_trading_date_list:
|
target_date_plate_net_amount = target_plate_net_amount.get(i)
|
if target_date_plate_net_amount is not None:
|
# print(f"{i}:{target_date_plate_net_amount}")
|
target_date_plate_net_amount = round(target_date_plate_net_amount/100000000, 2)
|
it_date_plate_net_amount_dict = {
|
'date': i,
|
'net_amount': target_date_plate_net_amount
|
}
|
target_plate_net_amount_list.append(target_date_plate_net_amount)
|
# print(f"target_plate_net_amount_list=={target_plate_net_amount_list}")
|
amount_list = target_plate_net_amount_list
|
# 2. 遍历列表,检查每个字典的值是否符合要求(例如:值不为None或满足其他条件)
|
# 统计None值数量
|
# none_count = sum(1 for item in amount_list if item['net_amount'] is None)
|
# print(f"none_count=={none_count}")
|
return target_plate_net_amount_list
|
|
"""主力净额形态分析器"""
|
class MainForceAnalyzer:
|
"""主力净额形态分析器"""
|
|
def __init__(self, window=5):
|
"""
|
初始化分析器
|
|
参数:
|
window: 分析窗口大小(默认5)
|
"""
|
self.window = window
|
|
def analyze(self, net_amounts):
|
"""
|
分析主力净额形态
|
|
参数:
|
net_amounts: 主力净额列表(时间倒序),元素为float或None
|
|
返回:
|
dict: 包含综合评分、形态描述和详细特征的字典
|
"""
|
# 数据校验
|
if not net_amounts or len(net_amounts) < self.window:
|
return {
|
"score": 0.0,
|
"description": "数据不足",
|
"details": {}
|
}
|
|
# 1. 数据预处理
|
processed, none_positions = self._preprocess_data(net_amounts)
|
|
# 2. 分析窗口数据
|
window_data = processed[:self.window]
|
|
# 3. 计算核心指标
|
strength = self._calculate_strength(window_data)
|
trend = self._calculate_trend(window_data)
|
reversal, reversal_type = self._detect_reversal(window_data, none_positions)
|
volatility = self._calculate_volatility(window_data)
|
valid_ratio = 1.0 - len(none_positions) / len(net_amounts)
|
|
# 4. 综合评分计算
|
score = self._calculate_composite_score(
|
strength=strength,
|
trend=trend,
|
reversal=reversal,
|
volatility=volatility,
|
valid_ratio=valid_ratio
|
)
|
|
# 5. 生成形态描述
|
description, details = self._generate_description(
|
score=score,
|
strength=strength,
|
trend=trend,
|
reversal_type=reversal_type,
|
volatility=volatility
|
)
|
|
return {
|
"score": score,
|
"description": description,
|
"details": details
|
}
|
|
def _preprocess_data(self, net_amounts):
|
"""数据预处理:处理None值并记录位置"""
|
processed = []
|
none_positions = []
|
moving_avg = []
|
|
for i, amount in enumerate(net_amounts):
|
if amount is None:
|
none_positions.append(i)
|
if moving_avg:
|
fill_value = sum(moving_avg) / len(moving_avg)
|
processed.append(fill_value)
|
else:
|
processed.append(0)
|
else:
|
processed.append(amount)
|
# 保持最近3个有效值
|
if len(moving_avg) < 3:
|
moving_avg.append(amount)
|
else:
|
moving_avg = moving_avg[1:] + [amount]
|
|
return processed, none_positions
|
|
def _calculate_strength(self, data):
|
"""计算主力强度指标"""
|
inflow = sum(v for v in data if v > 0)
|
outflow = sum(-v for v in data if v < 0)
|
total = inflow + outflow
|
if total > 0:
|
return (inflow - outflow) / total
|
return 0
|
|
def _calculate_trend(self, data):
|
"""计算趋势指标"""
|
if len(data) >= 2:
|
return data[0] - data[1]
|
return 0
|
|
def _calculate_volatility(self, data):
|
"""计算波动性指标"""
|
if len(data) < 2:
|
return 0
|
mean = sum(data) / len(data)
|
variance = sum((x - mean) ** 2 for x in data) / len(data)
|
return variance ** 0.5
|
|
def _detect_reversal(self, data, none_positions):
|
"""检测反转信号"""
|
reversal_score = 0
|
reversal_type = "无反转信号"
|
|
# 检查三日反转模式
|
if len(data) >= 3:
|
if all(i not in none_positions for i in range(3)):
|
# 反转加强:连续两日流出后大幅流入
|
if data[0] > 0 and data[1] < 0 and data[2] < 0:
|
avg_outflow = (abs(data[1]) + abs(data[2])) / 2
|
reversal_strength = data[0] / avg_outflow
|
reversal_score = min(1.0, reversal_strength * 0.5)
|
reversal_type = "反转加强"
|
|
# 反转走弱:连续两日流入后大幅流出
|
elif data[0] < 0 and data[1] > 0 and data[2] > 0:
|
avg_inflow = (data[1] + data[2]) / 2
|
reversal_strength = abs(data[0]) / avg_inflow
|
reversal_score = -min(1.0, reversal_strength * 0.5)
|
reversal_type = "反转走弱"
|
|
# 检查两日反转模式
|
if len(data) >= 2 and reversal_type == "无反转信号":
|
if 0 not in none_positions and 1 not in none_positions:
|
# 流入转流出
|
if data[0] < 0 and data[1] > 0:
|
reversal_score = -0.3
|
reversal_type = "流入转流出"
|
# 流出转流入
|
elif data[0] > 0 and data[1] < 0:
|
reversal_score = 0.3
|
reversal_type = "流出转流入"
|
|
return reversal_score, reversal_type
|
|
def _calculate_composite_score(self, strength, trend, reversal, volatility, valid_ratio):
|
"""计算综合评分"""
|
# 标准化趋势值
|
normalized_trend = trend / 10.0 if abs(trend) > 0 else 0
|
|
# 波动性因子(波动性越高,评分越低)
|
volatility_factor = 1 - min(volatility / 5.0, 1)
|
|
# 综合评分公式
|
score = (
|
0.4 * strength +
|
0.3 * normalized_trend +
|
0.2 * reversal +
|
0.1 * volatility_factor
|
) * valid_ratio
|
|
# 确保在-1到1范围内
|
return max(-1.0, min(1.0, score))
|
|
def _generate_description(self, score, strength, trend, reversal_type, volatility):
|
"""生成形态描述和详细特征"""
|
# 基础形态描述
|
if score > 0.8:
|
base_desc = "强势流入加速"
|
elif score > 0.6:
|
base_desc = "持续流入加强"
|
elif score > 0.4:
|
base_desc = "温和流入"
|
elif score > 0.2:
|
base_desc = "小幅净流入"
|
elif score > -0.2:
|
base_desc = "震荡调整"
|
elif score > -0.4:
|
base_desc = "小幅净流出"
|
elif score > -0.6:
|
base_desc = "温和流出"
|
elif score > -0.8:
|
base_desc = "持续流出"
|
else:
|
base_desc = "强势流出"
|
|
# 添加反转描述
|
if reversal_type != "无反转信号":
|
# 组合反转描述和基础描述
|
description = f"{reversal_type}后{base_desc}"
|
else:
|
description = base_desc
|
|
# 添加波动性描述
|
if volatility > 5:
|
description += " (高波动)"
|
elif volatility > 2:
|
description += " (中波动)"
|
|
# 创建详细特征字典
|
details = {
|
"主力强度": f"{strength:.2%}",
|
"短期趋势": f"{trend:.2f}亿",
|
"波动率": f"{volatility:.2f}",
|
"反转信号": reversal_type,
|
"分析窗口": self.window
|
}
|
|
return description, details
|
|
|
# 策略中分析板块趋势
|
def analyze_plate_trend(plate_name):
|
target_data = calculate_plate_trend(plate_name)
|
print(f"target_data = {target_data}")
|
print(f"target_data type ={type(target_data)}")
|
|
# logger_common.info(f"主力净额数据【{plate_name}】:")
|
# for i, amount in enumerate(target_data[:5]): # 只显示最近5日
|
# if amount is None:
|
# logger_common.info(f"第{i + 1}日: N/A")
|
# else:
|
# logger_common.info(f"第{i + 1}日: {amount:.2f}亿")
|
|
# 创建分析器实例
|
analyzer = MainForceAnalyzer(window=7)
|
|
# 执行分析
|
result = analyzer.analyze(target_data)
|
result["plate_name"] = "plate_name"
|
# 打印结果
|
# print("\n分析结果:")
|
# logger_common.info(f"综合评分: {result['score']:.4f}")
|
# logger_common.info(f"形态描述: {result['description']}")
|
# print(f"形态描述: {result['description']}")
|
# logger_common.info("详细特征:")
|
# for k, v in result['details'].items():
|
# logger_common.info(f" - {k}: {v}")
|
|
# 示例2:使用不同窗口大小分析
|
# print("\n不同窗口大小比较:")
|
# for window in [3, 5, 7]:
|
# analyzer = MainForceAnalyzer(window=window)
|
# result = analyzer.analyze(target_data)
|
# print(f"窗口={window}天: 评分={result['score']:.2f}, 描述={result['description']}")
|
|
# # 示例3:分析多个板块
|
# sectors = {
|
# "医药": target_data,
|
# "科技": [15.2, -4.5, -3.8, 12.1, None, -5.2, 18.3],
|
# "新能源": [8.5, 7.2, -2.5, 6.8, 5.5, None, 9.2]
|
# }
|
#
|
# print("\n板块分析比较:")
|
# analyzer = MainForceAnalyzer(window=5)
|
# for sector, data in sectors.items():
|
# result = analyzer.analyze(data)
|
# print(f"{sector}板块: 评分={result['score']:.2f}, 描述={result['description']}")
|
|
# calculate_plate_trend('机器人概念')
|
return result
|
|
|
# 将积累好的过滤文件写入本地存储
|
def write_filtered_file_local_storage():
|
with open(constant.FILTERED_STOCK_PATH, "w", encoding="utf-8") as file:
|
json.dump(list(data_cache.filtered_stock_info_set), file, ensure_ascii=False, indent=4)
|
logger_common.info(f"filtered_stock_info_set===》JSON文件已写入: {data_cache.filtered_stock_info_set}")
|
print(f"filtered_stock_info_set===》JSON文件已写入: {data_cache.filtered_stock_info_set}")
|
|
|
# 检查因强度中的过滤因素而被错过的涨停函数【建立在策略不重启的基础上】
|
def check_intensity_missing_limit_up_stock():
|
# 实例化每日涨停信息整理方法
|
today_limit_up_info = kpl_api.get_limit_up_info()
|
# print(today_limit_up_info)
|
# logger.info(f"读本地的日更的历史涨停数据=={data_cache.daily_limit_up_info}")
|
today_limit_up_code_list = []
|
# 整理出当日的涨停列表
|
if today_limit_up_info:
|
for i in today_limit_up_info:
|
limit_up_code = i[0]
|
today_limit_up_code_list.append(limit_up_code)
|
print(f"today_limit_up_code_list=={today_limit_up_code_list}")
|
logger_common.info(f"当日截止收盘 涨停股票数量=={len(today_limit_up_code_list)}")
|
logger_common.info(f"当日截止收盘 涨停代码列表=={today_limit_up_code_list}")
|
for code in today_limit_up_code_list:
|
for filtered_stock_info in data_cache.filtered_stock_info_set:
|
if filtered_stock_info["code"] == code:
|
print(f"错过的涨停股票信息: {filtered_stock_info}")
|
logger_common.info(f"错过的涨停股票信息: {filtered_stock_info}")
|
|
|
if __name__ == '__main__':
|
# analyze_plate_trend('人工智能')
|
# check_intensity_missing_limit_up_stock()
|
write_filtered_file_local_storage()
|