整理 kpl_api 将其中的函数 分流到其他模块
整理data_cache中的时间字符串
新增计算市场分布形态因子 函数【未调用】
1个文件已添加
12个文件已修改
1865 ■■■■■ 已修改文件
huaxin_client/l2_client.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 49 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/all_K_line.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/check_timer.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/data_cache.py 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/instant_time_market.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/kpl_api.py 1017 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/local_data_management.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/logging_config.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/market_sentiment_analysis.py 192 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/plate_strength_analysis.py 514 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/selling_strategy.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -167,11 +167,12 @@
            logger_l2_codes_subscript.exception(e)
        finally:
            # 保存一份最新的数据
            self.__set_latest_datas(codes)
            # self.__set_latest_datas(codes)
            pass
    @classmethod
    def __set_latest_datas(cls, codes_data):
        data_str = json.dumps([tool.get_now_date_str(), codes_data])
        data_str = json.dumps([tool.get_now_date_str(), list(codes_data)])
        with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
            f.write(data_str)
huaxin_client/l2_data_manager.py
@@ -195,8 +195,7 @@
                    data = self.__real_time_buy1_data[code]
                    # 如果最新的买1是原来买1的1/2时开始上传
                    if data[2] > 0 and data[3] / data[2] <= 0.5:
                        self.data_callback_distribute_manager.get_distributed_callback(code).OnRealTimeBuy1Info(code,
                                                                                                                data)
                        self.data_callback_distribute_manager.get_distributed_callback(code).OnRealTimeBuy1Info(code, data)
            except:
                pass
            finally:
main.py
@@ -17,11 +17,11 @@
# 引入瞬时分时行情模块
# 引入账户管理模块【进行资金和仓位管理】
from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management, \
    order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis
    order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis, plate_strength_analysis
from huaxin_client import l2_market_client, l2_client
from log_module import async_log_util
from trade import huaxin_trade_data_update, huaxin_trade_api
from utils import hx_qc_value_util, huaxin_util
from utils import hx_qc_value_util, huaxin_util, juejin_api, tool
# 引入行情订阅模块
# import subscribe_market
@@ -87,7 +87,7 @@
    # 实时运行定时器线程【定时器函数目前 只管理 15:00 后运行一次 整理当日涨停信息 和 获取所有个股的板块概念】
    threading.Thread(target=lambda: check_timer.check_time(), daemon=True).start()
    # 获取实时大盘行情情绪综合强度 [分数] 线程
    threading.Thread(target=lambda: market_sentiment_analysis.get_real_time_market_strong(), daemon=True).start()
    threading.Thread(target=lambda: market_sentiment_analysis.set_plan_position_quantity(), daemon=True).start()
    # 实时检测是否拉取K线线程
    threading.Thread(target=lambda: all_K_line.check_time_and_data_date(), daemon=True).start()
    # print(f"all_stocks_all_K_line_property_dict== {type(data_cache.all_stocks_all_K_line_property_dict)}")
@@ -109,8 +109,8 @@
    # 开启开盘啦 涨停列表 和 全盘个股概念板块 接口线程
    # 涨停概念线程
    # threading.Thread(target=kpl_api.kpl_limit_up_process, daemon=True).start()    #该行代码为只运行单一线程不回调数据的方式
    threading.Thread(target=kpl_api.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start()
    # threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, daemon=True).start()    #该行代码为只运行单一线程不回调数据的方式
    threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start()
    # # 开盘啦的板块强度下的个股强度回调函数
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict):
@@ -119,7 +119,7 @@
        data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
    # 板块强度下个股强度线程
    threading.Thread(target=kpl_api.get_market_sift_plate_its_stock_power_process,
    threading.Thread(target=plate_strength_analysis.get_market_sift_plate_its_stock_power_process,
                     args=(get_market_sift_plate_its_stock_power_process,), daemon=True).start()
    # 初始化get_current_data方法函数,下单买逻辑才会运行中。。。【核心主线程,随时考虑其启动顺序】>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
@@ -127,13 +127,13 @@
    try:
        # 计算开盘啦昨日拉取的概念数据中为空的股票数量
        kpl_api.get_have_no_plate_num()
        plate_strength_analysis.get_have_no_plate_num()
    except Exception as e:
        logger_system.exception(e)
    # 获取历史涨停信息数据并整理
    try:
        kpl_api.get_handling_limit_up_info()
        plate_strength_analysis.get_handling_limit_up_info()
    except Exception as e:
        logger_system.exception(e)
@@ -143,19 +143,39 @@
    except Exception as e:
        logger_system.exception(e)
    # # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】
    # kpl_api.get_all_stocks_plate_dict(data_cache.min_stocks)
# 持仓代码的L2数据回调
class MyPositionsL2DataCallback(L2DataCallBack):
    __last_price_dict = {}
    __pre_close_price_dict = {}  # 昨日收盘价
    def OnL2Transaction(self, code, datas):
        """
        昨日持仓L2逐笔成交回调
        :param code:
        :param datas:
        :return:
        """
        if datas:
            # 获取最近的成交价
            price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3])
            # TODO 涨停价变为非涨停价才处理
            self.__last_price_dict[code] = price
            try:
                # 获取最近的成交价
                if code not in self.__pre_close_price_dict:
                    # 获取收盘价格
                    results = juejin_api.JueJinApi.history_n(tool.get_symbol(code), "1d", 1, 1, "close")
                    if results:
                        self.__pre_close_price_dict[code] = results[0]["close"]
                if self.__last_price_dict.get(code) == price:
                    return
                limit_up_price = tool.get_limit_up_price(code, self.__pre_close_price_dict[code])
                if code in self.__last_price_dict:
                    if abs(limit_up_price - self.__last_price_dict[code]) < 0.0001 < abs(limit_up_price - price):
                        # TODO 处理炸板逻辑
                        # 炸板
                        logger_debug.info(f"炸板:{code}-({price},{time_str})")
            finally:
                self.__last_price_dict[code] = price
    def OnMarketData(self, code, datas):
        # logger_debug.info(f"收到L2Market数据:{datas}")
@@ -163,6 +183,7 @@
            code = d["securityID"]
            buy1 = d["buy"][0]
    # 实时L2买1成交量
    def OnRealTimeBuy1Info(self, code, buy1_info):
        # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量]
        async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
strategy/all_K_line.py
@@ -12,9 +12,6 @@
import constant
from log_module.log import logger_common
# 引入掘金API
# import utils.juejin_api
# import kpl_api
from strategy import data_cache
# 引入基础算法模块
@@ -413,10 +410,10 @@
            # # if now_time > data_cache.AFTER_CLOSING_TIME:
            #     data_cache.execution = True
            #     # 整理当日涨停信息并写入本地管理好本地数据
            #     kpl_api.get_arrange_limit_up_info()
            #     plate_strength_analysis.get_arrange_limit_up_info()
            #     logger.info(f"整理当日涨停信息 已经运行完成")
            #     # # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】
            #     kpl_api.get_all_stocks_plate_dict(data_cache.min_stocks)
            #     plate_strength_analysis.get_all_stocks_plate_dict(data_cache.min_stocks)
            #     # logger.info(f"获取所有个股的板块概念 已经运行完成")
            # 构造一个循环检测K线子带你中所有个股代码下的日期是不是和某日一致的,如果不一致则返回False
strategy/check_timer.py
@@ -5,7 +5,7 @@
import time
from log_module.log import logger_common
from strategy import data_cache, market_sentiment_analysis
from strategy import data_cache, market_sentiment_analysis, plate_strength_analysis
from strategy import kpl_api
from utils import tool
@@ -29,10 +29,10 @@
            # print(f"now_time==={now_time}")
            if now_time > data_cache.AFTER_CLOSING_TIME and data_cache.execution is False:
                # 整理当日涨停信息并写入本地管理好本地数据
                kpl_api.get_arrange_limit_up_info()
                plate_strength_analysis.get_arrange_limit_up_info()
                logger.info(f"整理当日涨停信息 已经运行完成")
                # # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】
                kpl_api.get_all_stocks_plate_dict(data_cache.DataCache().filtered_stocks)
                plate_strength_analysis.get_all_stocks_plate_dict(data_cache.DataCache().filtered_stocks)
                logger.info(f"获取所有个股的板块概念 已经运行完成")
                # 完成了后将是否执行的开个标记为真
                data_cache.execution = True
strategy/data_cache.py
@@ -112,38 +112,29 @@
now_time = datetime.datetime.now().strftime("%H:%M:%S")  # 定义并实时获取 当前时间
'''
设定常用时间点【常量】
字符串格式=="09:25:12"
'''
SERVER_RESTART_TIME = datetime.time(9, 00, 00).strftime("%H:%M:%S")  # 定义9:00
L1_DATA_START_TIME = datetime.time(9, 15, 00).strftime("%H:%M:%S")  # 定义9:15
BEFORE_OPEN_BIDDING_TIME = datetime.time(9, 20, 00).strftime("%H:%M:%S")  # 定义9:20
OPEN_BIDDING_TIME = datetime.time(9, 25, 00).strftime("%H:%M:%S")  # 定义 盘前 集合竞价 时间
LATER_OPEN_BIDDING_TIME = datetime.time(9, 25, 6).strftime("%H:%M:%S")  # 定义 盘前 集合竞价 时间
AFTER_OPEN_BIDDING_TIME = datetime.time(9, 25, 12).strftime("%H:%M:%S")  # 定义 集合竞价 开始后 时间
OPENING_TIME = datetime.time(9, 30, 00).strftime("%H:%M:%S")  # 定义开盘时间
MORN_MARKET_TIME = datetime.time(9, 35, 00).strftime("%H:%M:%S")  # 定义早盘时间
MORN_MARKET_CLOSING_TIME = datetime.time(11, 30, 00).strftime("%H:%M:%S")  # 定义上午收盘时间
NOON_MARKET_OPENING_TIME = datetime.time(13, 0, 00).strftime("%H:%M:%S")  # 定义下午开盘时间
NOON_MARKET_TIME = datetime.time(13, 5, 00).strftime("%H:%M:%S")  # 定义午盘时间
CLOSE_POSITION_TIME = datetime.time(14, 55, 00).strftime("%H:%M:%S")  # 定义平仓时间
WATCH_DISK_END_TIME = datetime.time(14, 56, 00).strftime("%H:%M:%S")  # 定义 板上盯结束时间
CLOSE_BIDDING_TIME = datetime.time(14, 57, 00).strftime("%H:%M:%S")  # 定义 盘后 集合竞价 时间
CLOSING_TIME = datetime.time(15, 00, 00).strftime("%H:%M:%S")  # 定义 收盘时间
AFTER_CLOSING_TIME = datetime.time(15, 1, 00).strftime("%H:%M:%S")  # 定义 收盘后时间
CHECKING_DATA_TIME = datetime.time(17, 00, 00).strftime("%H:%M:%S")  # 定义 检查数据时间
UPDATE_DATA_TIME = datetime.time(18, 31, 00).strftime("%H:%M:%S")  # 定义更新数据时间
PROGRAM_SLEEP_TIME = datetime.time(23, 00, 00).strftime("%H:%M:%S")  # 定义程序休眠时间
# todo 2025-03-25 后无BUG即可彻底删除下处注释部分
# # 读取已经获取到并存储在本地的目标范围的个股的板块概念
# # 读取JSON文件并解析为字典
# if os.path.exists(constant.ALL_STOCKS_PLATE_PATH):
#     with open(constant.ALL_STOCKS_PLATE_PATH, 'r',
#               encoding='utf-8') as f:
#         json_data = f.read()
# else:
#     json_data = "{}"
# all_stocks_plate_dict = json.loads(json_data)
# logger.info(f"all_stocks_plate_dict的数量={len(all_stocks_plate_dict)}")
SERVER_RESTART_TIME = "09:00:00"  # 服务器重启时间
L1_DATA_START_TIME = "09:15:00"   # L1数据开始时间
BEFORE_OPEN_BIDDING_TIME = "09:20:00"  # 【盘前】集合竞价开始前时间
OPEN_BIDDING_TIME = "09:25:00"   # 【盘前】集合竞价开始时间
LATER_OPEN_BIDDING_TIME = "09:25:06"  # 【盘前】集合竞价开始后瞬间
AFTER_OPEN_BIDDING_TIME = "09:25:12"  # 【盘前】集合竞价开始后一会
OPENING_TIME = "09:30:00"  # 上午开盘时间
MORN_MARKET_TIME = "09:35:00"  # 早盘黄金五分钟
MORN_MARKET_CLOSING_TIME = "11:30:00"  # 上午收盘时间
NOON_MARKET_OPENING_TIME = "13:00:00"  # 下午开盘时间
NOON_MARKET_TIME = "13:05:00"  # 午盘黄金五分钟
CLOSE_POSITION_TIME = "14:55:00"  # 尾盘平仓时间
WATCH_DISK_END_TIME = "14:56:00"  # 板上盯结束时间
CLOSE_BIDDING_TIME = "14:57:00"  # 【盘后】集合竞价开始
CLOSING_TIME = "15:00:00"  # 定义 收盘时间
AFTER_CLOSING_TIME = "15:01:00"  # 下午收盘后时间
CHECKING_DATA_TIME = "17:00:00"  # 检查数据时间
UPDATE_DATA_TIME = "18:31:00"  # 更新数据时间
PROGRAM_SLEEP_TIME = "22:00:00"  # 程序休眠时间【不能在23点之后仍运行,获取到的数据可能有谬误】
# 初始化当日当时最高价
high_price = 0
strategy/instant_time_market.py
@@ -344,7 +344,7 @@
            # 分批处理数据
            ds = []
            total_count = len(current_infos)
            page = 15
            page = 20
            page_size = total_count // page + 1
            for p in range(page):
                temp_list = current_infos[p * page_size:(p + 1) * page_size]
strategy/kpl_api.py
@@ -10,10 +10,9 @@
import requests
import constant
from log_module import async_log_util
from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_Overall_market_strength_score, \
    logger_stock_of_markets_plate, logger_debug
# import requests
from strategy import data_cache
from strategy import basic_methods
from strategy.kpl_data_manager import KPLStockOfMarketsPlateLogManager
@@ -185,8 +184,6 @@
    return json.dumps({"errcode": 0, "list": fresults})
# 获取涨停信息数据
def get_limit_up_info():
    # 获取涨停信息列表
@@ -239,510 +236,510 @@
# --------------------------------------------------------------------------------------------------------------------------------------------------------------
# 获取行情精选板块 强度排名
def get_market_sift_plate_its_stock_power():
    @dask.delayed
    def batch_get_plate_codes(fs):
        return fs
    @dask.delayed
    def request_plate_codes(i):
        plate_name = i[1]
        log_data = None
        its_stock = json.loads(getCodesByPlate(i[0]))
        now_time_str = tool.get_now_time_str()
        if data_cache.OPENING_TIME < now_time_str < data_cache.NOON_MARKET_TIME:
            log_data = {plate_name: its_stock['list']}
        # 尝试过滤掉无意义的概念板块(plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革','超跌', '壳资源', '股权转让', '送转填权']) and '增长' in plate_name
        if (plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革', '超跌',
                               '壳资源', '股权转让', '送转填权']) or ('增长' in plate_name):
            # print(f"{i[1]} 强度:{i[2]}")
            # 通过板块ID获取其下面的个股强度列表
            # print(f"======={i[0]}=======")
            # its_stock_list_info = its_stock['list']
            # logger.info(f"its_stock_list_info==={its_stock_list_info}")
            # 将板块强度下面对应的个股列表打印到日志中
            # for i in its_stock_list_info:
            #     if i[0] != 1:
            #         logger.info(
            #             f"l === 个股代码:{i[0]},公司名称:{i[1]},主力资金推测:{i[2]},未知0值:{i[3]},概念:{i[4]},最新价:{i[5]},当日当时涨幅:{i[6]}%,"
            #             f"成交额:{round(i[7] / 100000000, 2)} 亿,实际换手率:{i[8]}%,未知0值:{i[9]},实际流通:{round(i[10] / 100000000, 2)}亿,"
            #             f"主力买:{round(i[11] / 100000000, 2)}亿,"
            #             f"主力卖:{round(i[12] / 100000000, 2)}亿,"
            #             f"主力净额:{round(i[13] / 10000, 2)}万,买成占比:{i[14]}%,卖成占比:{i[15]}%,净成占比:{i[16]}%,买流占比:{i[17]}%,卖流占比:{i[18]}%,净流占比:{i[19]}%,"
            #             f"区间涨幅:{i[20]}%,量比:{i[21]},未知0:{i[22]},上板情况:{i[23]},上板排名:{i[24]},换手率:{i[25]}%,"
            #             f"未知空值:{i[26]},未知零值:{i[27]},收盘封单:{i[28]},最大封单:{i[29]},未知空值?:{i[30]},"
            #             f"?:{i[30]}%,?:{i[31]},??:{i[32]},振幅:{i[33]}%,未知0????:{i[34]},未知0?????:{i[35]},"
            #             f"?=:{i[36]},?总市值:{i[37]},?流通市值:{i[38]},最终归属概念(收盘后出数据?):{i[39]},领涨次数:{i[40]},"
            #             f"41未知1值:{i[41]},第三季度机构持仓【str数据勿用运算符】:{i[42]}万,?年预测净利润:{i[43]},上年预测净利润:{i[44]},年内预测净利润:{i[45]}"
            #         )
            # 初始化股票强度列表
            stock_power_list = []
            for s in its_stock['list']:
                # 过滤掉涨幅大于  and s[6] < 6.5 且小于0%的 和 名称中包含ST的 和 涨速小于等于0%的 和 只要昨日未涨停 和 上证或深证的正股    and s[9] > 0.0025
                if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find("板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1:
                    # print(f"{s[1]},个股代码:{s[0]},   涨幅:{s[6]}%   涨速:{s[9]}%   概念:{s[4]}   主力资金推测:{s[2]}   领涨次数:{s[40]}  今日第几板:{s[23]} 是否破版{s[24]}")
                    # 对个股强度 主要 属性列表进行装填
                    its_stock_power = [s[1], s[0], s[6], s[9], s[4], s[2], s[40]]
                    # 逐个选择性添加its_stock中的元素到个股强度列表中
                    # print(f"its_stock_power===={its_stock_power}")
                    # 整体将添加完善的个股强度列表添加到股票列表中
                    stock_power_list.append(its_stock_power)
            # print(f"stock_power_list===={stock_power_list}")
            # 过滤掉没有瞬时高强度个股的空概念
            if len(stock_power_list) != 0:
                # 将对应板块的股票强度列表新建一个字典
                stock_power_item = {i[1]: stock_power_list}
                # 并更新到精选板块个股字典中
                market_sift_plate_stock_dict.update(stock_power_item)
        return log_data
    data = (getMarketJingXuanRealRankingInfo())
    market_sift_plate = json.loads(data)
    # logger_kpl_jingxuan_in 打印的日志专用于开盘了数据的存储分析,不能轻易删除
    # print(f"market_sift_plate 数 ======{len(market_sift_plate['list'])}")
    # 行情》精选板块》排名前20中》对应个股》符合条件的个股
    # logger.info(f"market_sift_plate['list']======{market_sift_plate['list']}")
    # logger.info(f"market_sift_plate['list'][0]  ======{market_sift_plate['list'][0]}")
    # 初始化精选板块对应个股字典
    market_sift_plate_stock_dict = {}
    if 'list' in market_sift_plate:
        ds = []
        for d in market_sift_plate['list']:
            ds.append(request_plate_codes(d))
        dask_result = batch_get_plate_codes(ds)
        compute_results = dask_result.compute()
        log_datas = {}
        for r in compute_results:
            if not r:
                continue
            for b in r:
                log_datas[b] = r[b]
        now_time = tool.get_now_time_str()
        if data_cache.L1_DATA_START_TIME < now_time < data_cache.NOON_MARKET_TIME:
            # logger.info(f"精选板块股票强度数据更新 == {market_sift_plate_stock_dict}")
            # 只在盘中时间获取
            KPLStockOfMarketsPlateLogManager().add_log(market_sift_plate['list'], log_datas)
    return market_sift_plate_stock_dict
# 调用一下获取精选板块股票强度数据函数  【本模块内使用时调用】
# get_market_sift_plate_its_stock_power()
def get_market_sift_plate_its_stock_power_process(callback):
    while True:
        try:
            # now = time.time()
            # print(f"kpl_limit_up_process开始了{now}")
            start_time = time.time()
            now_time = tool.get_now_time_str()
            if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME:
                its_stock_power = get_market_sift_plate_its_stock_power()
                time_str = datetime.datetime.now().strftime("%H%M%S")
                if 92900 < int(time_str) < 95000:
                    logger_kpl_jingxuan_in.info(f"耗时:{time.time() - start_time}  数据:{its_stock_power}")
                callback(its_stock_power)
                # print(f"精选板块拉升个股更新===={its_stock_power}")
        except Exception as e:
            logger_debug.exception(e)
            logger.error(f"开盘啦板块强度线程报错An error occurred: {e}")
        finally:
            time.sleep(2)
# 获取涨停板块名称列表并存储本地的函数
def get_limit_up_block_names():
    # 设定当前时间点
    now_time = tool.get_now_time_str()
    # print(f"now_time===={now_time}")
    if data_cache.SERVER_RESTART_TIME < now_time < data_cache.UPDATE_DATA_TIME:
        # print(f"在时间内使用--------------------------")
        # 获取涨停信息列表
        limit_up_info = get_limit_up_info()
        # print(f"limit_up_info=={limit_up_info}")
        data_cache.limit_up_info = get_limit_up_info()
        # 提取涨停列表中的板块名称
        limit_up_block_names = []
        # 循环添加涨停概念
        for i in limit_up_info:
            limit_up_block_names.append(i[5])
        # print(f"limit_up_block_names==={limit_up_block_names}")
        # return limit_up_block_names
        # # 使用Counter计算每个元素的出现次数
        # counter = Counter(limit_up_block_names)
        # # 找出出现次数最多的元素及其次数
        # most_common_element, most_common_count = counter.most_common(1)[0]
        # # 打印出现次数最多的元素
        # print(f"主线概念:{most_common_element},出现了 {most_common_count} 次")
        return limit_up_block_names
# 为开盘啦接口获取的涨停列表概念板块单独开一个进程  形参(callback)
def kpl_limit_up_process(callback):
    while True:
        try:
            # now = time.time()
            # print(f"kpl_limit_up_process开始了{now}")
            limit_up_block_names = get_limit_up_block_names()
            callback(limit_up_block_names)
            # logger.info(f"涨停更新===={limit_up_block_names}")
            # print(f"涨停更新数量===={len(limit_up_block_names)}")
            # print(f"kpl_limit_up_process完成一下{now}")
        except Exception as e:
            logger.error(f"开盘啦涨停板块概念线程报错An error occurred: {e}")
        finally:
            time.sleep(1.5)
# kpl_limit_up_process()
# 构建涨停信息读写对象
class DailyLimitUpInfoStorageManager:
    # 初始化文件路径
    def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH):
        self.file_path = file_path
    # 添加单日涨停信息数据到文件中的一行 函数
    def append_data_to_file(self, data_to_append):
        # print(f"data_to_append=={data_to_append}")
        # 读取所有行并解析为 JSON 对象列表
        if os.path.exists(self.file_path):
            with open(self.file_path, 'r', encoding='utf-8') as file:
                # 获取当前日期并格式化
                current_date = datetime.datetime.now().strftime('%Y-%m-%d')
                lines = [json.loads(line.strip()) for line in file if line.strip()]
                # print(f"lines type=={type(lines)}")
                # print(f"lines=={lines}")
                # 检查当前日期是否已存在于文件中
                if lines:  # 如果读取到的行文件列表不为空(为真)
                    if lines[-1].get(current_date) is None:  # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期)
                        # 将日期和data_to_append转换为JSON格式的字符串
                        json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n'
                        # 打开文件并追加JSON行
                        with open(self.file_path, 'a', encoding='utf-8') as file:
                            file.write(json_line)
                    else:
                        logger.info(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)")
                else:
                    json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n'
                    # 打开文件并追加JSON行
                    with open(self.file_path, 'a', encoding='utf-8') as file:
                        file.write(json_line)
    # 清理多余数据函数
    def check_and_remove_oldest_entry(self, max_entries):
        # 读取所有行并解析为 JSON 对象列表
        if os.path.exists(self.file_path):
            with open(self.file_path, 'r', encoding='utf-8') as file:
                lines = [json.loads(line.strip()) for line in file if line.strip()]
        else:
            lines = []
            # 如果行数超过限制,移除最早的一些行
        if len(lines) >= max_entries:
            # 截断列表,只保留最新的 max_entries 个对象
            lines = lines[-max_entries:]
            # 重新打开文件以写入模式,并写入截断后的对象列表为 JSON Lines
            with open(self.file_path, 'w', encoding='utf-8') as file:
                for obj in lines:
                    file.write(json.dumps(obj, ensure_ascii=False) + '\n')
                    # file.write(json.dumps(obj, ensure_ascii=False))
    # 隔行整理数据并合并装入一个字典数据中调用时返回这个字典数据 函数
    def arrange_limit_up_info(self):
        limit_info = {}
        # 创建一个列表来存储所有解析的 JSON 对象
        if os.path.exists(self.file_path):
            with open(self.file_path, 'r', encoding='utf-8') as file:
                for line in file:
                    # 去除每行末尾的换行符(如果有的话)
                    line = line.rstrip('\n')
                    # 将每行解析为一个 JSON 对象
                    info = json.loads(line)
                    # 假设每行都是一个字典数据,且只有一个键值对,其中键是日期
                    if isinstance(info, dict) and len(info) == 1:
                        date, data = list(info.items())[0]
                        limit_info[date] = data
        return limit_info
# 构建一个获取读写存储本地的并整理涨停数据的函数
def get_arrange_limit_up_info():
    # 实例化每日涨停信息整理方法
    manager = DailyLimitUpInfoStorageManager()
    manager.append_data_to_file(get_limit_up_info())
    manager.check_and_remove_oldest_entry(max_entries=1000)
# 构建一个处理历史涨停涨停信息数据的函数
def get_handling_limit_up_info():
    # 实例化每日涨停信息整理方法
    history_limit_up_info = DailyLimitUpInfoStorageManager()
    data_cache.daily_limit_up_info = history_limit_up_info.arrange_limit_up_info()
    # logger.info(f"读本地的日更的历史涨停数据=={data_cache.daily_limit_up_info}")
    # print(f"daily_limit_up_info  类型==={type(data_cache.daily_limit_up_info)}")
    # 统计每日主线
    daily_limit_up_info_len = len(data_cache.daily_limit_up_info)
    # print(f"daily_limit_up_info_len==={daily_limit_up_info_len}")
    historical_transaction_date_list = []
    date_of_the_day = data_cache.DataCache().today_date
    for i in range(daily_limit_up_info_len):
        pre_date = hx_qc_value_util.get_previous_trading_date(date_of_the_day)  # 获取前一个交易日API
        # target_date_str = basic_methods.pre_num_trading_day(data_cache.today_date, daily_limit_up_info_len)
        date_format = "%Y-%m-%d"
        target_date = datetime.datetime.strptime(pre_date, date_format).strftime("%Y-%m-%d")
        historical_transaction_date_list.append(target_date)
        date_of_the_day = pre_date
    # print(f"historical_transaction_date_list={historical_transaction_date_list}")
    history_sorted_plate_ranking_list = []
    for key, value in data_cache.daily_limit_up_info.items():
        # print(f"key=={key}")
        for i in historical_transaction_date_list:
            # print(f"i======={i}")
            # 找到每上一个交易日对应的本地数据的信息
            if key == i:
                # print(f"{key}===找到了!value={value}")
                #
                plate_ranking_list = []
                # 遍历交易日每一个涨停股的信息
                for v in value:
                    # print(f"v =={v}")
                    # 将每一个涨停股的涨停概念和同班级数量 汇编为一个字典
                    plate_limit_up_num_dict = {
                        v[5]: v[20]
                    }
                    # 将这个字典数据不重复的添加到概念排名列表中
                    if plate_limit_up_num_dict not in plate_ranking_list:
                        plate_ranking_list.append(plate_limit_up_num_dict)
                        # plate_ranking_set.add(v[20])
                # print(f"plate_ranking_list={plate_ranking_list}")
                # 使用sorted函数和lambda表达式来根据字典的值进行排序
                # 这里我们确保不修改原始字典,仅通过list(x.values())[0]来获取值
                sorted_plate_ranking_list = sorted(plate_ranking_list, key=lambda x: list(x.values())[0], reverse=True)
                # logger.info(f"{key}=====>>>>{sorted_plate_ranking_list}")
                history_sorted_plate_ranking_list.append(sorted_plate_ranking_list)
    # print(f"history_sorted_plate_ranking_list={history_sorted_plate_ranking_list}")
    # for ranking_list in history_sorted_plate_ranking_list:
    #     print(f"ranking_list={ranking_list}")
    #     for i in ranking_list:
    #         print(f"i={i}")
    # 计算历史涨停概念的连续出现次数函数
    def count_key_occurrences(list_of_dicts_lists):
        # 创建一个字典来存储每个键的总出现次数
        key_counts = {}
        # 遍历列表中的每个字典列表
        for sublist in list_of_dicts_lists:
            # 遍历当前字典列表中的每个字典
            for dict_item in sublist:
                # 遍历字典中的每个键
                for key in dict_item:
                    # 如果键不在key_counts中,则初始化计数为0
                    if key not in key_counts:
                        key_counts[key] = 0
                        # 增加当前键的计数
                    key_counts[key] += 1
                    # 打印结果
        for key, count in key_counts.items():
            if count > 1:
                logger.info(f"'{key}' 连续出现 {count} 次")
    # 调用函数,传入整个列表
    # count_key_occurrences(history_sorted_plate_ranking_list)
    # daily_limit_up_info_list = list(reversed(daily_limit_up_info_list))
    # print(f"daily_limit_up_info_list==={daily_limit_up_info_list}")
    # 获取昨日涨停代码 (以便与K线对比)
    pre_trading_day_limit_up_info = data_cache.daily_limit_up_info.get(data_cache.DataCache().pre_trading_day)
    if pre_trading_day_limit_up_info is not None:
        yesterday_limit_up_code_list = []
        for i in pre_trading_day_limit_up_info:
            symbol_code = basic_methods.format_stock_symbol(i[0])
            limit_up_code = symbol_code
            yesterday_limit_up_code_list.append(limit_up_code)
        data_cache.yesterday_limit_up_code_list = yesterday_limit_up_code_list
        logger.info(f"昨日涨停股票数量=={len(data_cache.yesterday_limit_up_code_list)}")
        logger.info(f"昨日涨停代码列表=={yesterday_limit_up_code_list}")
        # code = pre_trading_day_limit_up_info[0][0]
        # logger.info(f"股票代码=={code}")
        # cor_name = pre_trading_day_limit_up_info[0][1]
        # logger.info(f"公司名称=={cor_name}")
        # unknown_zero_2 = pre_trading_day_limit_up_info[0][2]
        # logger.info(f"未知零值2=={unknown_zero_2}")
        # none_data = pre_trading_day_limit_up_info[0][3]
        # logger.info(f"空数据=={none_data}")
        # # 总市值(万)?
        # total_market_value = round((pre_trading_day_limit_up_info[0][4] / 10000), 2)
        # logger.info(f"总市值=={total_market_value}(万)?")
        # # 最相关概念
        # the_most_relevant_plate = pre_trading_day_limit_up_info[0][5]
        # logger.info(f"最相关概念=={the_most_relevant_plate}")
        # # 收盘封单金额(万)
        # closing_amount = round((pre_trading_day_limit_up_info[0][6] / 10000), 2)
        # logger.info(f"收盘封单金额=={closing_amount}(万)")
        # # 最大封单金额(万)
        # maximum_blocked_amount = round((pre_trading_day_limit_up_info[0][7] / 10000), 2)
        # logger.info(f"最大封单金额=={maximum_blocked_amount}(万)")
        # # 主力净额
        # main_net_amount = round((pre_trading_day_limit_up_info[0][8] / 10000), 2)
        # logger.info(f"主力净额=={main_net_amount}(万)")
        # # 主力买
        # main_buyers = round((pre_trading_day_limit_up_info[0][9] / 10000), 2)
        # logger.info(f"主力买=={main_buyers}(万)")
        # # 主力卖
        # main_sellers = round((pre_trading_day_limit_up_info[0][10] / 10000), 2)
        # logger.info(f"主力卖=={main_sellers}(万)")
        # # 成交额
        # transaction_amount = round((pre_trading_day_limit_up_info[0][11] / 10000), 2)
        # logger.info(f"成交额=={transaction_amount}(万)")
        # # 所属精选板块
        # selected_plate = pre_trading_day_limit_up_info[0][12]
        # logger.info(f"所属精选板块=={selected_plate}")
        # # 实际流通
        # actual_circulation = round((pre_trading_day_limit_up_info[0][11] / 100000000), 2)
        # logger.info(f"实际流通=={actual_circulation}(亿)")
        # # 量比?(不是,不知道是什么)
        # equivalent_ratio = pre_trading_day_limit_up_info[0][14]
        # logger.info(f"量比?=={equivalent_ratio}")
        # # 领涨次数
        # leading_increases_times = pre_trading_day_limit_up_info[0][15]
        # logger.info(f"领涨次数=={leading_increases_times}")
        # # 未知零值
        # unknown_zero_16 = pre_trading_day_limit_up_info[0][16]
        # logger.info(f"未知零值16=={unknown_zero_16}")
        # # 未知零值
        # unknown_zero_17 = pre_trading_day_limit_up_info[0][17]
        # logger.info(f"未知零值17=={unknown_zero_17}")
        # # 第几板(连续涨停天数)
        # continuous_limit_up_days = pre_trading_day_limit_up_info[0][18]
        # logger.info(f"第几板=={continuous_limit_up_days}")
        # # 最相关概念的代码
        # the_most_relevant_plate_code = pre_trading_day_limit_up_info[0][19]
        # logger.info(f"最相关概念的代码=={the_most_relevant_plate_code}")
        # # 同班级的数量(同概念涨停数量)
        # the_same_class_amount = pre_trading_day_limit_up_info[0][20]
        # logger.info(f"同概念涨停数量=={the_same_class_amount}")
# get_handling_limit_up_info()
# 获取全部个股的板块并存储的函数
def get_all_stocks_plate_dict(stocks_list):
    all_stocks_plate_dict = {}
    # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中
    for i in stocks_list:
        try:
            code = i.split('.')[1]
            # print(f"i==={i}")
            # 获取个股的自由市值
            free_market_value = getZYLTAmount(code)
            # 获取个股的板块列表
            selected_blocks = getStockIDPlate(code)
            # 提取精选板块中的板块名称
            selected_plate_list = [block[1] for block in selected_blocks]
            # print(f"selected_block_names==={selected_block_list}")
            block_data = {
                # 添加自由市值
                'free_market_value': free_market_value,
                # 添加精选板块
                'plate': selected_plate_list
            }
            # 将code作为键,stocks_selected_block_data作为值添加到stocks_block_data字典中
            all_stocks_plate_dict[code] = block_data
            # print(f"all_stocks_plate_dict==={all_stocks_plate_dict}")
        except Exception as e:
            print(f"获取全部个股的板块并存储的函数 An error occurred: {e}")
        finally:
            pass
    # return stocks_plate_data
    # print(f"all_stocks_plate_dict==={len(all_stocks_plate_dict)}")
    # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中
    # 将字典转换为JSON格式的字符串
    json_data = json.dumps(all_stocks_plate_dict)
    # 写入文件
    with open(constant.ALL_STOCKS_PLATE_PATH, 'w', encoding='utf-8') as f:
        f.write(json_data)
    now_time = datetime.datetime.now()  # 获取本机时间
    logger.info(f"写入所有个股板块文件完成!::{now_time}")
# 计算开盘啦昨日拉取的概念数据中为空的股票数量函数
def get_have_no_plate_num():
    # 初始化无概念数量
    have_no_plate_num = 0
    plate_are_null_list = []
    for k, v in data_cache.all_stocks_plate_dict.items():
        pass
        # print(f"i==={i}  T==={t}")
        if len(v['plate']) == 0:
            have_no_plate_num += 1
            # print(f"{k}的概念为空")
            # logger.info(f"{k}的概念为空")
            # 股票代码格式转化为掘金格式
            symbol = basic_methods.format_stock_symbol(k)
            sec_name = data_cache.all_stocks_all_K_line_property_dict.get(symbol)
            if sec_name is not None:
                plate_are_null_list.append(sec_name)
    logger.info(f"有{have_no_plate_num}只股票概念为空")
    logger.info(f"个股有历史K线但概念为空的有:{plate_are_null_list}")
# 获取全部个股的精选板块并存储的函数
def stocks_list_selected_blocks(min_stocks):
    stocks_selected_block_data = []
    # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中
    for i in min_stocks:
        try:
            code = i.split('.')[1]
            # 获取个股的自由市值
            free_market_value = getZYLTAmount(code)
            # 获取个股的精选板块列表
            # selected_blocks = getCodeJingXuanBlocks('000021')
            selected_blocks = getCodeJingXuanBlocks(code)
            # 提取精选板块中的板块名称
            selected_block_list = [block[1] for block in selected_blocks]
            # print(f"selected_block_names==={selected_block_list}")
            stocks_selected_block_dict = {
                # 添加股票代码
                'code': code,
                # 添加自由市值
                'free_market_value': free_market_value,
                # 添加精选板块
                'selected_block': selected_block_list
            }
            stocks_selected_block_data.append(stocks_selected_block_dict)
            # print(f"stocks_selected_block_data==={stocks_selected_block_dict}")
        except Exception as e:
            logger.error(f"获取全部个股的精选板块并存储的函数 An error occurred: {e}")
    # print(f"stocks_selected_block_data==={len(stocks_selected_block_data)}")
    # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中
    # 将字典转换为JSON格式的字符串
    json_data = json.dumps(stocks_selected_block_data)
    # 写入文件
    with open('local_storage_data/stocks_selected_block_data.json', 'w', encoding='utf-8') as f:
        f.write(json_data)
    now_time = datetime.datetime.now()  # 获取本机时间
    print(f"写入精选板块文件完成!::{now_time}")
# kpl_stocks_list_selected_blocks_process()   #在 kpl_api.py中可以调用
# stocks_list_selected_blocks(min_stocks)   #在 kpl_api.py中可以调用
# list = ['SHSE.600805','SHSE.600804']
#
# all_stocks_plate_dict(list)
# # 获取行情精选板块 强度排名
# def get_market_sift_plate_its_stock_power():
#     @dask.delayed
#     def batch_get_plate_codes(fs):
#         return fs
#
#     @dask.delayed
#     def request_plate_codes(i):
#         plate_name = i[1]
#         log_data = None
#         its_stock = json.loads(getCodesByPlate(i[0]))
#         now_time_str = tool.get_now_time_str()
#         if data_cache.OPENING_TIME < now_time_str < data_cache.NOON_MARKET_TIME:
#             log_data = {plate_name: its_stock['list']}
#         # 尝试过滤掉无意义的概念板块(plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革','超跌', '壳资源', '股权转让', '送转填权']) and '增长' in plate_name
#         if (plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革', '超跌',
#                                '壳资源', '股权转让', '送转填权']) or ('增长' in plate_name):
#
#             # print(f"{i[1]} 强度:{i[2]}")
#             # 通过板块ID获取其下面的个股强度列表
#             # print(f"======={i[0]}=======")
#
#             # its_stock_list_info = its_stock['list']
#             # logger.info(f"its_stock_list_info==={its_stock_list_info}")
#             # 将板块强度下面对应的个股列表打印到日志中
#             # for i in its_stock_list_info:
#             #     if i[0] != 1:
#             #         logger.info(
#             #             f"l === 个股代码:{i[0]},公司名称:{i[1]},主力资金推测:{i[2]},未知0值:{i[3]},概念:{i[4]},最新价:{i[5]},当日当时涨幅:{i[6]}%,"
#             #             f"成交额:{round(i[7] / 100000000, 2)} 亿,实际换手率:{i[8]}%,未知0值:{i[9]},实际流通:{round(i[10] / 100000000, 2)}亿,"
#             #             f"主力买:{round(i[11] / 100000000, 2)}亿,"
#             #             f"主力卖:{round(i[12] / 100000000, 2)}亿,"
#             #             f"主力净额:{round(i[13] / 10000, 2)}万,买成占比:{i[14]}%,卖成占比:{i[15]}%,净成占比:{i[16]}%,买流占比:{i[17]}%,卖流占比:{i[18]}%,净流占比:{i[19]}%,"
#             #             f"区间涨幅:{i[20]}%,量比:{i[21]},未知0:{i[22]},上板情况:{i[23]},上板排名:{i[24]},换手率:{i[25]}%,"
#             #             f"未知空值:{i[26]},未知零值:{i[27]},收盘封单:{i[28]},最大封单:{i[29]},未知空值?:{i[30]},"
#             #             f"?:{i[30]}%,?:{i[31]},??:{i[32]},振幅:{i[33]}%,未知0????:{i[34]},未知0?????:{i[35]},"
#             #             f"?=:{i[36]},?总市值:{i[37]},?流通市值:{i[38]},最终归属概念(收盘后出数据?):{i[39]},领涨次数:{i[40]},"
#             #             f"41未知1值:{i[41]},第三季度机构持仓【str数据勿用运算符】:{i[42]}万,?年预测净利润:{i[43]},上年预测净利润:{i[44]},年内预测净利润:{i[45]}"
#             #         )
#
#             # 初始化股票强度列表
#             stock_power_list = []
#             for s in its_stock['list']:
#                 # 过滤掉涨幅大于  and s[6] < 6.5 且小于0%的 和 名称中包含ST的 和 涨速小于等于0%的 和 只要昨日未涨停 和 上证或深证的正股    and s[9] > 0.0025
#                 if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find("板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1:
#                     # print(f"{s[1]},个股代码:{s[0]},   涨幅:{s[6]}%   涨速:{s[9]}%   概念:{s[4]}   主力资金推测:{s[2]}   领涨次数:{s[40]}  今日第几板:{s[23]} 是否破版{s[24]}")
#                     # 对个股强度 主要 属性列表进行装填
#                     its_stock_power = [s[1], s[0], s[6], s[9], s[4], s[2], s[40]]
#                     # 逐个选择性添加its_stock中的元素到个股强度列表中
#                     # print(f"its_stock_power===={its_stock_power}")
#                     # 整体将添加完善的个股强度列表添加到股票列表中
#                     stock_power_list.append(its_stock_power)
#             # print(f"stock_power_list===={stock_power_list}")
#             # 过滤掉没有瞬时高强度个股的空概念
#             if len(stock_power_list) != 0:
#                 # 将对应板块的股票强度列表新建一个字典
#                 stock_power_item = {i[1]: stock_power_list}
#                 # 并更新到精选板块个股字典中
#                 market_sift_plate_stock_dict.update(stock_power_item)
#         return log_data
#
#     data = (getMarketJingXuanRealRankingInfo())
#     market_sift_plate = json.loads(data)
#     # logger_kpl_jingxuan_in 打印的日志专用于开盘了数据的存储分析,不能轻易删除
#     # print(f"market_sift_plate 数 ======{len(market_sift_plate['list'])}")
#     # 行情》精选板块》排名前20中》对应个股》符合条件的个股
#     # logger.info(f"market_sift_plate['list']======{market_sift_plate['list']}")
#     # logger.info(f"market_sift_plate['list'][0]  ======{market_sift_plate['list'][0]}")
#     # 初始化精选板块对应个股字典
#     market_sift_plate_stock_dict = {}
#     if 'list' in market_sift_plate:
#         ds = []
#         for d in market_sift_plate['list']:
#             ds.append(request_plate_codes(d))
#         dask_result = batch_get_plate_codes(ds)
#         compute_results = dask_result.compute()
#         log_datas = {}
#         for r in compute_results:
#             if not r:
#                 continue
#             for b in r:
#                 log_datas[b] = r[b]
#         now_time = tool.get_now_time_str()
#         if data_cache.L1_DATA_START_TIME < now_time < data_cache.NOON_MARKET_TIME:
#             # logger.info(f"精选板块股票强度数据更新 == {market_sift_plate_stock_dict}")
#             # 只在盘中时间获取
#             KPLStockOfMarketsPlateLogManager().add_log(market_sift_plate['list'], log_datas)
#
#     return market_sift_plate_stock_dict
#
#
# # 调用一下获取精选板块股票强度数据函数  【本模块内使用时调用】
# # get_market_sift_plate_its_stock_power()
#
# def get_market_sift_plate_its_stock_power_process(callback):
#     while True:
#         try:
#             # now = time.time()
#             # print(f"kpl_limit_up_process开始了{now}")
#             start_time = time.time()
#             now_time = tool.get_now_time_str()
#             if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME:
#                 its_stock_power = get_market_sift_plate_its_stock_power()
#                 time_str = datetime.datetime.now().strftime("%H%M%S")
#                 if 92900 < int(time_str) < 95000:
#                     logger_kpl_jingxuan_in.info(f"耗时:{time.time() - start_time}  数据:{its_stock_power}")
#                 callback(its_stock_power)
#                 # print(f"精选板块拉升个股更新===={its_stock_power}")
#         except Exception as e:
#             logger_debug.exception(e)
#             logger.error(f"开盘啦板块强度线程报错An error occurred: {e}")
#         finally:
#             time.sleep(2)
#
#
# # 获取涨停板块名称列表并存储本地的函数
# def get_limit_up_block_names():
#     # 设定当前时间点
#     now_time = tool.get_now_time_str()
#     # print(f"now_time===={now_time}")
#     if data_cache.SERVER_RESTART_TIME < now_time < data_cache.UPDATE_DATA_TIME:
#         # print(f"在时间内使用--------------------------")
#         # 获取涨停信息列表
#         limit_up_info = get_limit_up_info()
#         # print(f"limit_up_info=={limit_up_info}")
#         data_cache.limit_up_info = get_limit_up_info()
#         # 提取涨停列表中的板块名称
#         limit_up_block_names = []
#         # 循环添加涨停概念
#         for i in limit_up_info:
#             limit_up_block_names.append(i[5])
#         # print(f"limit_up_block_names==={limit_up_block_names}")
#         # return limit_up_block_names
#         # # 使用Counter计算每个元素的出现次数
#         # counter = Counter(limit_up_block_names)
#         # # 找出出现次数最多的元素及其次数
#         # most_common_element, most_common_count = counter.most_common(1)[0]
#         # # 打印出现次数最多的元素
#         # print(f"主线概念:{most_common_element},出现了 {most_common_count} 次")
#
#         return limit_up_block_names
#
#
# # 为开盘啦接口获取的涨停列表概念板块单独开一个进程  形参(callback)
# def kpl_limit_up_process(callback):
#     while True:
#         try:
#             # now = time.time()
#             # print(f"kpl_limit_up_process开始了{now}")
#             limit_up_block_names = get_limit_up_block_names()
#             callback(limit_up_block_names)
#             # logger.info(f"涨停更新===={limit_up_block_names}")
#             # print(f"涨停更新数量===={len(limit_up_block_names)}")
#             # print(f"kpl_limit_up_process完成一下{now}")
#         except Exception as e:
#             logger.error(f"开盘啦涨停板块概念线程报错An error occurred: {e}")
#         finally:
#             time.sleep(1.5)
#
#
# # kpl_limit_up_process()
#
#
# # 构建涨停信息读写对象
# class DailyLimitUpInfoStorageManager:
#     # 初始化文件路径
#     def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH):
#         self.file_path = file_path
#
#     # 添加单日涨停信息数据到文件中的一行 函数
#     def append_data_to_file(self, data_to_append):
#         # print(f"data_to_append=={data_to_append}")
#         # 读取所有行并解析为 JSON 对象列表
#         if os.path.exists(self.file_path):
#             with open(self.file_path, 'r', encoding='utf-8') as file:
#                 # 获取当前日期并格式化
#                 current_date = datetime.datetime.now().strftime('%Y-%m-%d')
#                 lines = [json.loads(line.strip()) for line in file if line.strip()]
#                 # print(f"lines type=={type(lines)}")
#                 # print(f"lines=={lines}")
#                 # 检查当前日期是否已存在于文件中
#                 if lines:  # 如果读取到的行文件列表不为空(为真)
#                     if lines[-1].get(current_date) is None:  # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期)
#                         # 将日期和data_to_append转换为JSON格式的字符串
#                         json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n'
#                         # 打开文件并追加JSON行
#                         with open(self.file_path, 'a', encoding='utf-8') as file:
#                             file.write(json_line)
#                     else:
#                         logger.info(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)")
#                 else:
#                     json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n'
#                     # 打开文件并追加JSON行
#                     with open(self.file_path, 'a', encoding='utf-8') as file:
#                         file.write(json_line)
#
#     # 清理多余数据函数
#     def check_and_remove_oldest_entry(self, max_entries):
#         # 读取所有行并解析为 JSON 对象列表
#         if os.path.exists(self.file_path):
#             with open(self.file_path, 'r', encoding='utf-8') as file:
#                 lines = [json.loads(line.strip()) for line in file if line.strip()]
#         else:
#             lines = []
#
#             # 如果行数超过限制,移除最早的一些行
#         if len(lines) >= max_entries:
#             # 截断列表,只保留最新的 max_entries 个对象
#             lines = lines[-max_entries:]
#             # 重新打开文件以写入模式,并写入截断后的对象列表为 JSON Lines
#             with open(self.file_path, 'w', encoding='utf-8') as file:
#                 for obj in lines:
#                     file.write(json.dumps(obj, ensure_ascii=False) + '\n')
#                     # file.write(json.dumps(obj, ensure_ascii=False))
#
#     # 隔行整理数据并合并装入一个字典数据中调用时返回这个字典数据 函数
#     def arrange_limit_up_info(self):
#         limit_info = {}
#         # 创建一个列表来存储所有解析的 JSON 对象
#         if os.path.exists(self.file_path):
#             with open(self.file_path, 'r', encoding='utf-8') as file:
#                 for line in file:
#                     # 去除每行末尾的换行符(如果有的话)
#                     line = line.rstrip('\n')
#                     # 将每行解析为一个 JSON 对象
#                     info = json.loads(line)
#                     # 假设每行都是一个字典数据,且只有一个键值对,其中键是日期
#                     if isinstance(info, dict) and len(info) == 1:
#                         date, data = list(info.items())[0]
#                         limit_info[date] = data
#         return limit_info
#
#
# # 构建一个获取读写存储本地的并整理涨停数据的函数
# def get_arrange_limit_up_info():
#     # 实例化每日涨停信息整理方法
#     manager = DailyLimitUpInfoStorageManager()
#     manager.append_data_to_file(get_limit_up_info())
#     manager.check_and_remove_oldest_entry(max_entries=1000)
#
#
# # 构建一个处理历史涨停涨停信息数据的函数
# def get_handling_limit_up_info():
#     # 实例化每日涨停信息整理方法
#     history_limit_up_info = DailyLimitUpInfoStorageManager()
#     data_cache.daily_limit_up_info = history_limit_up_info.arrange_limit_up_info()
#     # logger.info(f"读本地的日更的历史涨停数据=={data_cache.daily_limit_up_info}")
#
#     # print(f"daily_limit_up_info  类型==={type(data_cache.daily_limit_up_info)}")
#     # 统计每日主线
#     daily_limit_up_info_len = len(data_cache.daily_limit_up_info)
#     # print(f"daily_limit_up_info_len==={daily_limit_up_info_len}")
#
#     historical_transaction_date_list = []
#     date_of_the_day = data_cache.DataCache().today_date
#     for i in range(daily_limit_up_info_len):
#         pre_date = hx_qc_value_util.get_previous_trading_date(date_of_the_day)  # 获取前一个交易日API
#         # target_date_str = basic_methods.pre_num_trading_day(data_cache.today_date, daily_limit_up_info_len)
#         date_format = "%Y-%m-%d"
#         target_date = datetime.datetime.strptime(pre_date, date_format).strftime("%Y-%m-%d")
#         historical_transaction_date_list.append(target_date)
#         date_of_the_day = pre_date
#
#     # print(f"historical_transaction_date_list={historical_transaction_date_list}")
#     history_sorted_plate_ranking_list = []
#     for key, value in data_cache.daily_limit_up_info.items():
#         # print(f"key=={key}")
#         for i in historical_transaction_date_list:
#             # print(f"i======={i}")
#             # 找到每上一个交易日对应的本地数据的信息
#             if key == i:
#                 # print(f"{key}===找到了!value={value}")
#                 #
#                 plate_ranking_list = []
#                 # 遍历交易日每一个涨停股的信息
#                 for v in value:
#                     # print(f"v =={v}")
#                     # 将每一个涨停股的涨停概念和同班级数量 汇编为一个字典
#                     plate_limit_up_num_dict = {
#                         v[5]: v[20]
#                     }
#                     # 将这个字典数据不重复的添加到概念排名列表中
#                     if plate_limit_up_num_dict not in plate_ranking_list:
#                         plate_ranking_list.append(plate_limit_up_num_dict)
#                         # plate_ranking_set.add(v[20])
#                 # print(f"plate_ranking_list={plate_ranking_list}")
#                 # 使用sorted函数和lambda表达式来根据字典的值进行排序
#                 # 这里我们确保不修改原始字典,仅通过list(x.values())[0]来获取值
#                 sorted_plate_ranking_list = sorted(plate_ranking_list, key=lambda x: list(x.values())[0], reverse=True)
#                 # logger.info(f"{key}=====>>>>{sorted_plate_ranking_list}")
#                 history_sorted_plate_ranking_list.append(sorted_plate_ranking_list)
#
#     # print(f"history_sorted_plate_ranking_list={history_sorted_plate_ranking_list}")
#     # for ranking_list in history_sorted_plate_ranking_list:
#     #     print(f"ranking_list={ranking_list}")
#     #     for i in ranking_list:
#     #         print(f"i={i}")
#
#     # 计算历史涨停概念的连续出现次数函数
#     def count_key_occurrences(list_of_dicts_lists):
#         # 创建一个字典来存储每个键的总出现次数
#         key_counts = {}
#         # 遍历列表中的每个字典列表
#         for sublist in list_of_dicts_lists:
#             # 遍历当前字典列表中的每个字典
#             for dict_item in sublist:
#                 # 遍历字典中的每个键
#                 for key in dict_item:
#                     # 如果键不在key_counts中,则初始化计数为0
#                     if key not in key_counts:
#                         key_counts[key] = 0
#                         # 增加当前键的计数
#                     key_counts[key] += 1
#                     # 打印结果
#         for key, count in key_counts.items():
#             if count > 1:
#                 logger.info(f"'{key}' 连续出现 {count} 次")
#
#     # 调用函数,传入整个列表
#     # count_key_occurrences(history_sorted_plate_ranking_list)
#
#     # daily_limit_up_info_list = list(reversed(daily_limit_up_info_list))
#     # print(f"daily_limit_up_info_list==={daily_limit_up_info_list}")
#
#     # 获取昨日涨停代码 (以便与K线对比)
#     pre_trading_day_limit_up_info = data_cache.daily_limit_up_info.get(data_cache.DataCache().pre_trading_day)
#     if pre_trading_day_limit_up_info is not None:
#         yesterday_limit_up_code_list = []
#         for i in pre_trading_day_limit_up_info:
#             symbol_code = basic_methods.format_stock_symbol(i[0])
#             limit_up_code = symbol_code
#             yesterday_limit_up_code_list.append(limit_up_code)
#         data_cache.yesterday_limit_up_code_list = yesterday_limit_up_code_list
#         logger.info(f"昨日涨停股票数量=={len(data_cache.yesterday_limit_up_code_list)}")
#         logger.info(f"昨日涨停代码列表=={yesterday_limit_up_code_list}")
#
#         # code = pre_trading_day_limit_up_info[0][0]
#         # logger.info(f"股票代码=={code}")
#         # cor_name = pre_trading_day_limit_up_info[0][1]
#         # logger.info(f"公司名称=={cor_name}")
#         # unknown_zero_2 = pre_trading_day_limit_up_info[0][2]
#         # logger.info(f"未知零值2=={unknown_zero_2}")
#         # none_data = pre_trading_day_limit_up_info[0][3]
#         # logger.info(f"空数据=={none_data}")
#         # # 总市值(万)?
#         # total_market_value = round((pre_trading_day_limit_up_info[0][4] / 10000), 2)
#         # logger.info(f"总市值=={total_market_value}(万)?")
#         # # 最相关概念
#         # the_most_relevant_plate = pre_trading_day_limit_up_info[0][5]
#         # logger.info(f"最相关概念=={the_most_relevant_plate}")
#         # # 收盘封单金额(万)
#         # closing_amount = round((pre_trading_day_limit_up_info[0][6] / 10000), 2)
#         # logger.info(f"收盘封单金额=={closing_amount}(万)")
#         # # 最大封单金额(万)
#         # maximum_blocked_amount = round((pre_trading_day_limit_up_info[0][7] / 10000), 2)
#         # logger.info(f"最大封单金额=={maximum_blocked_amount}(万)")
#         # # 主力净额
#         # main_net_amount = round((pre_trading_day_limit_up_info[0][8] / 10000), 2)
#         # logger.info(f"主力净额=={main_net_amount}(万)")
#         # # 主力买
#         # main_buyers = round((pre_trading_day_limit_up_info[0][9] / 10000), 2)
#         # logger.info(f"主力买=={main_buyers}(万)")
#         # # 主力卖
#         # main_sellers = round((pre_trading_day_limit_up_info[0][10] / 10000), 2)
#         # logger.info(f"主力卖=={main_sellers}(万)")
#         # # 成交额
#         # transaction_amount = round((pre_trading_day_limit_up_info[0][11] / 10000), 2)
#         # logger.info(f"成交额=={transaction_amount}(万)")
#         # # 所属精选板块
#         # selected_plate = pre_trading_day_limit_up_info[0][12]
#         # logger.info(f"所属精选板块=={selected_plate}")
#         # # 实际流通
#         # actual_circulation = round((pre_trading_day_limit_up_info[0][11] / 100000000), 2)
#         # logger.info(f"实际流通=={actual_circulation}(亿)")
#         # # 量比?(不是,不知道是什么)
#         # equivalent_ratio = pre_trading_day_limit_up_info[0][14]
#         # logger.info(f"量比?=={equivalent_ratio}")
#         # # 领涨次数
#         # leading_increases_times = pre_trading_day_limit_up_info[0][15]
#         # logger.info(f"领涨次数=={leading_increases_times}")
#         # # 未知零值
#         # unknown_zero_16 = pre_trading_day_limit_up_info[0][16]
#         # logger.info(f"未知零值16=={unknown_zero_16}")
#         # # 未知零值
#         # unknown_zero_17 = pre_trading_day_limit_up_info[0][17]
#         # logger.info(f"未知零值17=={unknown_zero_17}")
#         # # 第几板(连续涨停天数)
#         # continuous_limit_up_days = pre_trading_day_limit_up_info[0][18]
#         # logger.info(f"第几板=={continuous_limit_up_days}")
#         # # 最相关概念的代码
#         # the_most_relevant_plate_code = pre_trading_day_limit_up_info[0][19]
#         # logger.info(f"最相关概念的代码=={the_most_relevant_plate_code}")
#         # # 同班级的数量(同概念涨停数量)
#         # the_same_class_amount = pre_trading_day_limit_up_info[0][20]
#         # logger.info(f"同概念涨停数量=={the_same_class_amount}")
#
#
# # get_handling_limit_up_info()
#
#
# # 获取全部个股的板块并存储的函数
# def get_all_stocks_plate_dict(stocks_list):
#     all_stocks_plate_dict = {}
#     # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中
#     for i in stocks_list:
#         try:
#             code = i.split('.')[1]
#             # print(f"i==={i}")
#             # 获取个股的自由市值
#             free_market_value = getZYLTAmount(code)
#             # 获取个股的板块列表
#             selected_blocks = getStockIDPlate(code)
#             # 提取精选板块中的板块名称
#             selected_plate_list = [block[1] for block in selected_blocks]
#             # print(f"selected_block_names==={selected_block_list}")
#             block_data = {
#                 # 添加自由市值
#                 'free_market_value': free_market_value,
#                 # 添加精选板块
#                 'plate': selected_plate_list
#             }
#             # 将code作为键,stocks_selected_block_data作为值添加到stocks_block_data字典中
#             all_stocks_plate_dict[code] = block_data
#             # print(f"all_stocks_plate_dict==={all_stocks_plate_dict}")
#         except Exception as e:
#             print(f"获取全部个股的板块并存储的函数 An error occurred: {e}")
#         finally:
#             pass
#     # return stocks_plate_data
#     # print(f"all_stocks_plate_dict==={len(all_stocks_plate_dict)}")
#     # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中
#     # 将字典转换为JSON格式的字符串
#     json_data = json.dumps(all_stocks_plate_dict)
#     # 写入文件
#     with open(constant.ALL_STOCKS_PLATE_PATH, 'w', encoding='utf-8') as f:
#         f.write(json_data)
#     now_time = datetime.datetime.now()  # 获取本机时间
#     logger.info(f"写入所有个股板块文件完成!::{now_time}")
#
#
# # 计算开盘啦昨日拉取的概念数据中为空的股票数量函数
# def get_have_no_plate_num():
#     # 初始化无概念数量
#     have_no_plate_num = 0
#     plate_are_null_list = []
#     for k, v in data_cache.all_stocks_plate_dict.items():
#         pass
#         # print(f"i==={i}  T==={t}")
#         if len(v['plate']) == 0:
#             have_no_plate_num += 1
#             # print(f"{k}的概念为空")
#             # logger.info(f"{k}的概念为空")
#             # 股票代码格式转化为掘金格式
#             symbol = basic_methods.format_stock_symbol(k)
#             sec_name = data_cache.all_stocks_all_K_line_property_dict.get(symbol)
#             if sec_name is not None:
#                 plate_are_null_list.append(sec_name)
#     logger.info(f"有{have_no_plate_num}只股票概念为空")
#     logger.info(f"个股有历史K线但概念为空的有:{plate_are_null_list}")
#
#
# # 获取全部个股的精选板块并存储的函数
# def stocks_list_selected_blocks(min_stocks):
#     stocks_selected_block_data = []
#     # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中
#     for i in min_stocks:
#         try:
#             code = i.split('.')[1]
#             # 获取个股的自由市值
#             free_market_value = getZYLTAmount(code)
#             # 获取个股的精选板块列表
#             # selected_blocks = getCodeJingXuanBlocks('000021')
#             selected_blocks = getCodeJingXuanBlocks(code)
#             # 提取精选板块中的板块名称
#             selected_block_list = [block[1] for block in selected_blocks]
#             # print(f"selected_block_names==={selected_block_list}")
#             stocks_selected_block_dict = {
#                 # 添加股票代码
#                 'code': code,
#                 # 添加自由市值
#                 'free_market_value': free_market_value,
#                 # 添加精选板块
#                 'selected_block': selected_block_list
#             }
#             stocks_selected_block_data.append(stocks_selected_block_dict)
#             # print(f"stocks_selected_block_data==={stocks_selected_block_dict}")
#         except Exception as e:
#             logger.error(f"获取全部个股的精选板块并存储的函数 An error occurred: {e}")
#
#     # print(f"stocks_selected_block_data==={len(stocks_selected_block_data)}")
#     # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中
#     # 将字典转换为JSON格式的字符串
#     json_data = json.dumps(stocks_selected_block_data)
#     # 写入文件
#     with open('local_storage_data/stocks_selected_block_data.json', 'w', encoding='utf-8') as f:
#         f.write(json_data)
#     now_time = datetime.datetime.now()  # 获取本机时间
#     print(f"写入精选板块文件完成!::{now_time}")
#
#
# # kpl_stocks_list_selected_blocks_process()   #在 kpl_api.py中可以调用
# # stocks_list_selected_blocks(min_stocks)   #在 kpl_api.py中可以调用
# # list = ['SHSE.600805','SHSE.600804']
# #
# # all_stocks_plate_dict(list)
#
strategy/local_data_management.py
@@ -3,11 +3,7 @@
import constant
from strategy import data_cache
from log_module.log import logger_common
# 获取logger实例
logger = logger_common
# 读取本地的所有带属性K线数据(所有个股K线及指数线)
@@ -40,7 +36,7 @@
    else:
        json_data = "{}"
    data_cache.all_stocks_plate_dict = json.loads(json_data)
    logger.info(f"all_stocks_plate_dict的数量={len(data_cache.all_stocks_plate_dict)}")
    logger_common.info(f"all_stocks_plate_dict的数量={len(data_cache.all_stocks_plate_dict)}")
if __name__ == '__main__':
strategy/logging_config.py
@@ -6,6 +6,7 @@
from datetime import datetime
import constant
# 构造一个loger对象函数
def get_logger(name='my_logger', log_dir=constant.LOG_PATH, log_level=logging.INFO):
    """
strategy/market_sentiment_analysis.py
@@ -548,7 +548,6 @@
            # 对于未知类型,你可以选择保留原样、跳过或引发异常
            # 这里我们选择保留原样
            return obj
    try:
        json_data = json.dumps(convert_datetime(all_index_k_line_property_dict), ensure_ascii=False, indent=4)
        # 将转换后的JSON字符串写入文件
@@ -561,8 +560,145 @@
    logger.info(f"加属性的指数k线写完了!{tool.get_now_time_str()}")
# 获取实时大盘行情情绪综合强度 [分数] 函数 并 计算当日计划持仓数量
def get_real_time_market_strong():
# 计算市场分布形态因子 函数
def calculate_market_distribution_pattern_factor(data):
    """
        原生Python数据预处理
        返回:
            sorted_bins   : 排序后的涨跌幅区间(整数列表)
            sorted_counts : 对应区间的股票数量
            total         : 总股票数
            pos_counts    : 正涨跌区间的数量列表
            neg_counts    : 负涨跌区间的数量列表
        """
    # 将字符串键转为整数并排序
    sorted_items = sorted(data.items(), key=lambda x: int(x[0]))
    sorted_bins = [int(k) for k, v in sorted_items]
    sorted_counts = [v for k, v in sorted_items]
    # 计算总股票数
    total = sum(sorted_counts)
    # 分离正负区间(排除0%)
    pos_counts = [v for k, v in sorted_items if int(k) > 0]
    neg_counts = [v for k, v in sorted_items if int(k) < 0]
    return sorted_bins, sorted_counts, total, pos_counts, neg_counts
# 执行预处理
bins, counts, total, pos_counts, neg_counts = calculate_market_distribution_pattern_factor(data_cache.rise_and_fall_statistics_dirt)
# ====================== 因子计算模块 ======================
class NativeFactorCalculator:
    def __init__(self, bins, counts, total, pos_counts, neg_counts):
        self.bins = bins
        self.counts = counts
        # todo total 会默认为0的情况
        if total != 0:
            self.total = total
        else:
            self.total = 1
        self.pos_counts = pos_counts
        self.neg_counts = neg_counts
    def calculate_bdr(self):
        """原生涨跌比计算"""
        sum_pos = sum(self.pos_counts)
        sum_neg = sum(self.neg_counts)
        return sum_pos / sum_neg if sum_neg != 0 else float('nan')
    def calculate_extreme_ratio(self):
        """极端波动比例计算"""
        extreme_total = 0
        for b, c in zip(self.bins, self.counts):
            if b >= 5 or b <= -5:
                extreme_total += c
        return extreme_total / self.total
    def calculate_market_breadth(self):
        """市场宽度因子计算"""
        diff = sum(self.pos_counts) - sum(self.neg_counts)
        return diff / self.total
    def _expand_distribution(self):
        """将分箱数据展开为原始数据点(用于计算统计指标)"""
        expanded = []
        for value, count in zip(self.bins, self.counts):
            expanded.extend([value] * count)
        return expanded
    def calculate_distribution_metrics(self):
        """原生偏度与峰度计算"""
        data = self._expand_distribution()
        n = len(data)
        if n < 2:
            return (float('nan'), float('nan'))
        # 计算均值
        mean = sum(data) / n
        # 计算标准差
        variance = sum((x - mean) ** 2 for x in data) / (n - 1)
        std_dev = variance ** 0.5
        # 计算偏度
        skewness = (sum((x - mean) ** 3 for x in data) / n) / (std_dev ** 3) if std_dev != 0 else 0
        # 计算峰度(Fisher定义,即正态分布峰度为0)
        kurtosis = (sum((x - mean) ** 4 for x in data) / n) / (std_dev ** 4) - 3 if std_dev != 0 else 0
        return (skewness, kurtosis)
# 实例化并计算因子
calculator = NativeFactorCalculator(bins, counts, total, pos_counts, neg_counts)
bdr = calculator.calculate_bdr()
extreme_ratio = calculator.calculate_extreme_ratio()
market_breadth = calculator.calculate_market_breadth()
skewness, kurtosis = calculator.calculate_distribution_metrics()
# ====================== 策略信号生成模块 ======================
class NativeTradingStrategy:
    def __init__(self, bdr_threshold=1.5, extreme_threshold=0.15):
        self.bdr_threshold = bdr_threshold
        self.extreme_threshold = extreme_threshold
    def generate_signals(self, bdr, extreme_ratio, market_breadth, skewness):
        """生成交易信号(与之前相同,保持策略逻辑)"""
        signals = []
        # 信号1:基于涨跌比
        if bdr > self.bdr_threshold:
            signals.append("多头趋势增强 - 加仓指数ETF")
        elif bdr < 1 / self.bdr_threshold:
            signals.append("空头趋势增强 - 减仓或反向操作")
        else:
            signals.append("趋势中性 - 保持当前仓位")
        # 信号2:极端波动
        if extreme_ratio > self.extreme_threshold:
            signals.append("波动风险升高 - 启动对冲或降低杠杆")
        else:
            signals.append("波动正常 - 维持现有风险敞口")
        # 信号3:市场宽度与偏度
        if market_breadth > 0.2 and skewness > 0.5:
            signals.append("市场普涨且偏度正向 - 超配行业龙头股")
        elif market_breadth < -0.2 and skewness < -0.5:
            signals.append("市场普跌且偏度负向 - 低估值防御型配置")
        return signals
# 实例化策略并生成信号
strategy = NativeTradingStrategy()
signals = strategy.generate_signals(bdr, extreme_ratio, market_breadth, skewness)
# 实时设置计划持仓数量 函数
def set_plan_position_quantity():
    while True:
        try:
            if data_cache.position_automatic_management_switch is True:
@@ -571,26 +707,24 @@
                    # 获取大盘综合强度分数
                    data_cache.real_time_market_strong = kpl_api.get_market_strong()
                    # 获取市场情绪字典【完整】,并整理
                    data_cache.real_time_market_sentiment_dirt = kpl_api.changeStatistics()
                    date_today = data_cache.real_time_market_sentiment_dirt.get('Day', None)
                    significant_drawdown = data_cache.real_time_market_sentiment_dirt.get('df_num', None)
                    sentiment_indicators = data_cache.real_time_market_sentiment_dirt.get('ztjs', None)
                    limit_up_amount = data_cache.real_time_market_sentiment_dirt.get('ztjs', None)
                    connecting_board_height = data_cache.real_time_market_sentiment_dirt.get('lbgd', None)
                    data_cache.real_time_market_sentiment_dirt = kpl_api.changeStatistics()  # 市场情绪字典字典
                    date_today = data_cache.real_time_market_sentiment_dirt.get('Day', None)  # 情绪数据日期
                    significant_drawdown = data_cache.real_time_market_sentiment_dirt.get('df_num', None)  # 大幅回撤
                    sentiment_indicators = data_cache.real_time_market_sentiment_dirt.get('ztjs', None)  # 情绪指标
                    limit_up_amount = data_cache.real_time_market_sentiment_dirt.get('ztjs', None)  # 涨停家数
                    connecting_board_height = data_cache.real_time_market_sentiment_dirt.get('lbgd', None)  # 连板高度
                    # 获取市场情绪-涨跌统计
                    data_cache.rise_and_fall_statistics_dirt = kpl_api.getMarketFelling()
                    limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('ZT', None)
                    actual_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJZT', None)
                    ST_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('STZT', None)
                    limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('DT', None)
                    actual_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJDT', None)
                    ST_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('STDT', None)
                    data_cache.rise_and_fall_statistics_dirt = kpl_api.getMarketFelling()  # 涨跌统计字典
                    limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('ZT', None)  # 涨停家数
                    actual_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJZT', None)  # 实际涨停家数
                    ST_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('STZT', None)  # ST涨停家数
                    limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('DT', None)  # 跌停家数
                    actual_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJDT', None)  # 实际跌停家数
                    ST_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('STDT', None)  # ST跌停家数
                    rise_numbers = data_cache.rise_and_fall_statistics_dirt.get('SZJS', None)
                    fall_numbers = data_cache.rise_and_fall_statistics_dirt.get('XDJS', None)
                    # 该logger.info的的日志不再需要打印,后续将转入到GUI客户端上直接显示,该数据的打印交由下方的打印机制异步执行单独存储,以便后续可视化呈现后进行更高效的数据分析
                    # logger.info(f"大盘行情情绪综合强度 [分数]==={data_cache.real_time_market_strong}分")
                    # 控制打印日志的时间段
                    if data_cache.MORN_MARKET_CLOSING_TIME < now_time < data_cache.NOON_MARKET_OPENING_TIME:
                        pass
                        logger.info(f"午间休市时间内 不打印大盘综合强度分数")
@@ -599,7 +733,7 @@
                        # logger_Overall_market_strength_score.info(data_cache.real_time_market_strong)
                        async_log_util.info(logger_Overall_market_strength_score, f"{data_cache.real_time_market_strong}")
                        logger.info(f"日期:{date_today},情绪指标:{sentiment_indicators}分,大幅回撤:{significant_drawdown},涨停家数:{limit_up_amount},连板高度:{connecting_board_height}")
                        logger.info(f"上涨家数:{rise_numbers}分,下跌家数:{fall_numbers},实际涨停家数:{actual_limit_up_numbers},实际跌停家数:{actual_limit_down_numbers}")
                        logger.info(f"上涨家数:{rise_numbers},下跌家数:{fall_numbers},实际涨停家数:{actual_limit_up_numbers},实际跌停家数:{actual_limit_down_numbers}")
                        logger.info(f"涨跌统计字典{data_cache.rise_and_fall_statistics_dirt}")
                    usefulMoney = data_cache.account_finance_dict[0].get('usefulMoney', 0)
@@ -642,8 +776,8 @@
        except Exception as error:
            logger_debug.exception(error)
            logger.error(f"获取实时大盘行情情绪综合强度[分数] 函数报错: {error}")
            print(f"获取实时大盘行情情绪综合强度[分数] 函数报错: {error}")
            logger.error(f"实时设置计划持仓数量 函数报错: {error}")
            print(f"实时设置计划持仓数量 函数报错: {error}")
        finally:
            time.sleep(3)
@@ -651,9 +785,19 @@
if __name__ == '__main__':
    # market_strong = kpl_api.get_market_strong()
    # print(f"{market_strong}")
    get_real_time_market_strong()
    # get_real_time_market_strong()
    # all_index_K_line_dict = get_index_K_line()
    # all_index_k_line_dict_write()
    # print(f"指数K线{data_cache.all_index_k_line_property_dict}")
    # all_index_k_line_dict_write()
    # all_index_k_line_dict_write()
    # ====================== 结果输出 ======================
    print("========== 因子数值(原生计算) ==========")
    print(f"涨跌比(BDR): {bdr:.2f}")
    print(f"极端波动比例: {extreme_ratio:.2%}")
    print(f"市场宽度因子: {market_breadth:.2f}")
    print(f"分布偏度: {skewness:.2f}, 峰度: {kurtosis:.2f}")
    print("\n========== 策略信号 ==========")
    for i, signal in enumerate(signals, 1):
        print(f"信号{i}: {signal}")
strategy/plate_strength_analysis.py
New file
@@ -0,0 +1,514 @@
import datetime
import json
import os
import time
import dask
import constant
from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_debug
from strategy import kpl_api, data_cache, basic_methods
from utils import tool, hx_qc_value_util
# 获取行情精选板块 强度排名
def get_market_sift_plate_its_stock_power():
    @dask.delayed
    def batch_get_plate_codes(fs):
        return fs
    @dask.delayed
    def request_plate_codes(i):
        plate_name = i[1]
        log_data = None
        its_stock = json.loads(kpl_api.getCodesByPlate(i[0]))
        now_time_str = tool.get_now_time_str()
        if data_cache.OPENING_TIME < now_time_str < data_cache.NOON_MARKET_TIME:
            log_data = {plate_name: its_stock['list']}
        # 尝试过滤掉无意义的概念板块(plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革','超跌', '壳资源', '股权转让', '送转填权']) and '增长' in plate_name
        if (plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革', '超跌',
                               '壳资源', '股权转让', '送转填权']) or ('增长' in plate_name):
            # print(f"{i[1]} 强度:{i[2]}")
            # 通过板块ID获取其下面的个股强度列表
            # print(f"======={i[0]}=======")
            # its_stock_list_info = its_stock['list']
            # logger.info(f"its_stock_list_info==={its_stock_list_info}")
            # 将板块强度下面对应的个股列表打印到日志中
            # for i in its_stock_list_info:
            #     if i[0] != 1:
            #         logger.info(
            #             f"l === 个股代码:{i[0]},公司名称:{i[1]},主力资金推测:{i[2]},未知0值:{i[3]},概念:{i[4]},最新价:{i[5]},当日当时涨幅:{i[6]}%,"
            #             f"成交额:{round(i[7] / 100000000, 2)} 亿,实际换手率:{i[8]}%,未知0值:{i[9]},实际流通:{round(i[10] / 100000000, 2)}亿,"
            #             f"主力买:{round(i[11] / 100000000, 2)}亿,"
            #             f"主力卖:{round(i[12] / 100000000, 2)}亿,"
            #             f"主力净额:{round(i[13] / 10000, 2)}万,买成占比:{i[14]}%,卖成占比:{i[15]}%,净成占比:{i[16]}%,买流占比:{i[17]}%,卖流占比:{i[18]}%,净流占比:{i[19]}%,"
            #             f"区间涨幅:{i[20]}%,量比:{i[21]},未知0:{i[22]},上板情况:{i[23]},上板排名:{i[24]},换手率:{i[25]}%,"
            #             f"未知空值:{i[26]},未知零值:{i[27]},收盘封单:{i[28]},最大封单:{i[29]},未知空值?:{i[30]},"
            #             f"?:{i[30]}%,?:{i[31]},??:{i[32]},振幅:{i[33]}%,未知0????:{i[34]},未知0?????:{i[35]},"
            #             f"?=:{i[36]},?总市值:{i[37]},?流通市值:{i[38]},最终归属概念(收盘后出数据?):{i[39]},领涨次数:{i[40]},"
            #             f"41未知1值:{i[41]},第三季度机构持仓【str数据勿用运算符】:{i[42]}万,?年预测净利润:{i[43]},上年预测净利润:{i[44]},年内预测净利润:{i[45]}"
            #         )
            # 初始化股票强度列表
            stock_power_list = []
            for s in its_stock['list']:
                # 过滤掉涨幅大于  and s[6] < 6.5 且小于0%的 和 名称中包含ST的 和 涨速小于等于0%的 和 只要昨日未涨停 和 上证或深证的正股    and s[9] > 0.0025
                if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find("板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1:
                    # print(f"{s[1]},个股代码:{s[0]},   涨幅:{s[6]}%   涨速:{s[9]}%   概念:{s[4]}   主力资金推测:{s[2]}   领涨次数:{s[40]}  今日第几板:{s[23]} 是否破版{s[24]}")
                    # 对个股强度 主要 属性列表进行装填
                    its_stock_power = [s[1], s[0], s[6], s[9], s[4], s[2], s[40]]
                    # 逐个选择性添加its_stock中的元素到个股强度列表中
                    # print(f"its_stock_power===={its_stock_power}")
                    # 整体将添加完善的个股强度列表添加到股票列表中
                    stock_power_list.append(its_stock_power)
            # print(f"stock_power_list===={stock_power_list}")
            # 过滤掉没有瞬时高强度个股的空概念
            if len(stock_power_list) != 0:
                # 将对应板块的股票强度列表新建一个字典
                stock_power_item = {i[1]: stock_power_list}
                # 并更新到精选板块个股字典中
                market_sift_plate_stock_dict.update(stock_power_item)
        return log_data
    data = (kpl_api.getMarketJingXuanRealRankingInfo())
    market_sift_plate = json.loads(data)
    # logger_kpl_jingxuan_in 打印的日志专用于开盘了数据的存储分析,不能轻易删除
    # print(f"market_sift_plate 数 ======{len(market_sift_plate['list'])}")
    # 行情》精选板块》排名前20中》对应个股》符合条件的个股
    # logger.info(f"market_sift_plate['list']======{market_sift_plate['list']}")
    # logger.info(f"market_sift_plate['list'][0]  ======{market_sift_plate['list'][0]}")
    # 初始化精选板块对应个股字典
    market_sift_plate_stock_dict = {}
    if 'list' in market_sift_plate:
        ds = []
        for d in market_sift_plate['list']:
            ds.append(request_plate_codes(d))
        dask_result = batch_get_plate_codes(ds)
        compute_results = dask_result.compute()
        log_datas = {}
        for r in compute_results:
            if not r:
                continue
            for b in r:
                log_datas[b] = r[b]
        now_time = tool.get_now_time_str()
        if data_cache.L1_DATA_START_TIME < now_time < data_cache.NOON_MARKET_TIME:
            # logger.info(f"精选板块股票强度数据更新 == {market_sift_plate_stock_dict}")
            # 只在盘中时间获取
            kpl_api.KPLStockOfMarketsPlateLogManager().add_log(market_sift_plate['list'], log_datas)
    return market_sift_plate_stock_dict
# 调用一下获取精选板块股票强度数据函数  【本模块内使用时调用】
# get_market_sift_plate_its_stock_power()
def get_market_sift_plate_its_stock_power_process(callback):
    while True:
        try:
            # now = time.time()
            # print(f"kpl_limit_up_process开始了{now}")
            start_time = time.time()
            now_time = tool.get_now_time_str()
            if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME:
                its_stock_power = get_market_sift_plate_its_stock_power()
                time_str = datetime.datetime.now().strftime("%H%M%S")
                if 92900 < int(time_str) < 95000:
                    logger_kpl_jingxuan_in.info(f"耗时:{time.time() - start_time}  数据:{its_stock_power}")
                callback(its_stock_power)
                # print(f"精选板块拉升个股更新===={its_stock_power}")
        except Exception as e:
            logger_debug.exception(f"开盘啦板块强度线程报错An error occurred: {e}")
        finally:
            time.sleep(2)
# 获取涨停板块名称列表并存储本地的函数
def get_limit_up_block_names():
    # 设定当前时间点
    now_time = tool.get_now_time_str()
    # print(f"now_time===={now_time}")
    if data_cache.SERVER_RESTART_TIME < now_time < data_cache.UPDATE_DATA_TIME:
        # print(f"在时间内使用--------------------------")
        # 获取涨停信息列表
        limit_up_info = kpl_api.get_limit_up_info()
        # print(f"limit_up_info=={limit_up_info}")
        data_cache.limit_up_info = kpl_api.get_limit_up_info()
        # 提取涨停列表中的板块名称
        limit_up_block_names = []
        # 循环添加涨停概念
        for i in limit_up_info:
            limit_up_block_names.append(i[5])
        # print(f"limit_up_block_names==={limit_up_block_names}")
        # return limit_up_block_names
        # # 使用Counter计算每个元素的出现次数
        # counter = Counter(limit_up_block_names)
        # # 找出出现次数最多的元素及其次数
        # most_common_element, most_common_count = counter.most_common(1)[0]
        # # 打印出现次数最多的元素
        # print(f"主线概念:{most_common_element},出现了 {most_common_count} 次")
        return limit_up_block_names
# 为开盘啦接口获取的涨停列表概念板块单独开一个进程  形参(callback)
def kpl_limit_up_process(callback):
    while True:
        try:
            # now = time.time()
            # print(f"kpl_limit_up_process开始了{now}")
            limit_up_block_names = get_limit_up_block_names()
            callback(limit_up_block_names)
            # logger.info(f"涨停更新===={limit_up_block_names}")
            # print(f"涨停更新数量===={len(limit_up_block_names)}")
            # print(f"kpl_limit_up_process完成一下{now}")
        except Exception as e:
            logger_debug.error(f"开盘啦涨停板块概念线程报错An error occurred: {e}")
        finally:
            time.sleep(1.5)
# kpl_limit_up_process()
# 构建涨停信息读写对象
class DailyLimitUpInfoStorageManager:
    # 初始化文件路径
    def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH):
        self.file_path = file_path
    # 添加单日涨停信息数据到文件中的一行 函数
    def append_data_to_file(self, data_to_append):
        # print(f"data_to_append=={data_to_append}")
        # 读取所有行并解析为 JSON 对象列表
        if os.path.exists(self.file_path):
            with open(self.file_path, 'r', encoding='utf-8') as file:
                # 获取当前日期并格式化
                current_date = datetime.datetime.now().strftime('%Y-%m-%d')
                lines = [json.loads(line.strip()) for line in file if line.strip()]
                # print(f"lines type=={type(lines)}")
                # print(f"lines=={lines}")
                # 检查当前日期是否已存在于文件中
                if lines:  # 如果读取到的行文件列表不为空(为真)
                    if lines[-1].get(current_date) is None:  # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期)
                        # 将日期和data_to_append转换为JSON格式的字符串
                        json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n'
                        # 打开文件并追加JSON行
                        with open(self.file_path, 'a', encoding='utf-8') as file:
                            file.write(json_line)
                    else:
                        logger_common.info(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)")
                else:
                    json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n'
                    # 打开文件并追加JSON行
                    with open(self.file_path, 'a', encoding='utf-8') as file:
                        file.write(json_line)
    # 清理多余数据函数
    def check_and_remove_oldest_entry(self, max_entries):
        # 读取所有行并解析为 JSON 对象列表
        if os.path.exists(self.file_path):
            with open(self.file_path, 'r', encoding='utf-8') as file:
                lines = [json.loads(line.strip()) for line in file if line.strip()]
        else:
            lines = []
            # 如果行数超过限制,移除最早的一些行
        if len(lines) >= max_entries:
            # 截断列表,只保留最新的 max_entries 个对象
            lines = lines[-max_entries:]
            # 重新打开文件以写入模式,并写入截断后的对象列表为 JSON Lines
            with open(self.file_path, 'w', encoding='utf-8') as file:
                for obj in lines:
                    file.write(json.dumps(obj, ensure_ascii=False) + '\n')
                    # file.write(json.dumps(obj, ensure_ascii=False))
    # 隔行整理数据并合并装入一个字典数据中调用时返回这个字典数据 函数
    def arrange_limit_up_info(self):
        limit_info = {}
        # 创建一个列表来存储所有解析的 JSON 对象
        if os.path.exists(self.file_path):
            with open(self.file_path, 'r', encoding='utf-8') as file:
                for line in file:
                    # 去除每行末尾的换行符(如果有的话)
                    line = line.rstrip('\n')
                    # 将每行解析为一个 JSON 对象
                    info = json.loads(line)
                    # 假设每行都是一个字典数据,且只有一个键值对,其中键是日期
                    if isinstance(info, dict) and len(info) == 1:
                        date, data = list(info.items())[0]
                        limit_info[date] = data
        return limit_info
# 构建一个获取读写存储本地的并整理涨停数据的函数
def get_arrange_limit_up_info():
    # 实例化每日涨停信息整理方法
    manager = DailyLimitUpInfoStorageManager()
    manager.append_data_to_file(kpl_api.get_limit_up_info())
    manager.check_and_remove_oldest_entry(max_entries=1000)
# 构建一个处理历史涨停涨停信息数据的函数
def get_handling_limit_up_info():
    # 实例化每日涨停信息整理方法
    history_limit_up_info = DailyLimitUpInfoStorageManager()
    data_cache.daily_limit_up_info = history_limit_up_info.arrange_limit_up_info()
    # logger.info(f"读本地的日更的历史涨停数据=={data_cache.daily_limit_up_info}")
    # print(f"daily_limit_up_info  类型==={type(data_cache.daily_limit_up_info)}")
    # 统计每日主线
    daily_limit_up_info_len = len(data_cache.daily_limit_up_info)
    # print(f"daily_limit_up_info_len==={daily_limit_up_info_len}")
    historical_transaction_date_list = []
    date_of_the_day = data_cache.DataCache().today_date
    for i in range(daily_limit_up_info_len):
        pre_date = hx_qc_value_util.get_previous_trading_date(date_of_the_day)  # 获取前一个交易日API
        # target_date_str = basic_methods.pre_num_trading_day(data_cache.today_date, daily_limit_up_info_len)
        date_format = "%Y-%m-%d"
        target_date = datetime.datetime.strptime(pre_date, date_format).strftime("%Y-%m-%d")
        historical_transaction_date_list.append(target_date)
        date_of_the_day = pre_date
    # print(f"historical_transaction_date_list={historical_transaction_date_list}")
    history_sorted_plate_ranking_list = []
    for key, value in data_cache.daily_limit_up_info.items():
        # print(f"key=={key}")
        for i in historical_transaction_date_list:
            # print(f"i======={i}")
            # 找到每上一个交易日对应的本地数据的信息
            if key == i:
                # print(f"{key}===找到了!value={value}")
                #
                plate_ranking_list = []
                # 遍历交易日每一个涨停股的信息
                for v in value:
                    # print(f"v =={v}")
                    # 将每一个涨停股的涨停概念和同班级数量 汇编为一个字典
                    plate_limit_up_num_dict = {
                        v[5]: v[20]
                    }
                    # 将这个字典数据不重复的添加到概念排名列表中
                    if plate_limit_up_num_dict not in plate_ranking_list:
                        plate_ranking_list.append(plate_limit_up_num_dict)
                        # plate_ranking_set.add(v[20])
                # print(f"plate_ranking_list={plate_ranking_list}")
                # 使用sorted函数和lambda表达式来根据字典的值进行排序
                # 这里我们确保不修改原始字典,仅通过list(x.values())[0]来获取值
                sorted_plate_ranking_list = sorted(plate_ranking_list, key=lambda x: list(x.values())[0], reverse=True)
                # logger.info(f"{key}=====>>>>{sorted_plate_ranking_list}")
                history_sorted_plate_ranking_list.append(sorted_plate_ranking_list)
    # print(f"history_sorted_plate_ranking_list={history_sorted_plate_ranking_list}")
    # for ranking_list in history_sorted_plate_ranking_list:
    #     print(f"ranking_list={ranking_list}")
    #     for i in ranking_list:
    #         print(f"i={i}")
    # 计算历史涨停概念的连续出现次数函数
    def count_key_occurrences(list_of_dicts_lists):
        # 创建一个字典来存储每个键的总出现次数
        key_counts = {}
        # 遍历列表中的每个字典列表
        for sublist in list_of_dicts_lists:
            # 遍历当前字典列表中的每个字典
            for dict_item in sublist:
                # 遍历字典中的每个键
                for key in dict_item:
                    # 如果键不在key_counts中,则初始化计数为0
                    if key not in key_counts:
                        key_counts[key] = 0
                        # 增加当前键的计数
                    key_counts[key] += 1
                    # 打印结果
        for key, count in key_counts.items():
            if count > 1:
                logger_common.info(f"'{key}' 连续出现 {count} 次")
    # 调用函数,传入整个列表
    # count_key_occurrences(history_sorted_plate_ranking_list)
    # daily_limit_up_info_list = list(reversed(daily_limit_up_info_list))
    # print(f"daily_limit_up_info_list==={daily_limit_up_info_list}")
    # 获取昨日涨停代码 (以便与K线对比)
    pre_trading_day_limit_up_info = data_cache.daily_limit_up_info.get(data_cache.DataCache().pre_trading_day)
    if pre_trading_day_limit_up_info is not None:
        yesterday_limit_up_code_list = []
        for i in pre_trading_day_limit_up_info:
            symbol_code = basic_methods.format_stock_symbol(i[0])
            limit_up_code = symbol_code
            yesterday_limit_up_code_list.append(limit_up_code)
        data_cache.yesterday_limit_up_code_list = yesterday_limit_up_code_list
        logger_common.info(f"昨日涨停股票数量=={len(data_cache.yesterday_limit_up_code_list)}")
        logger_common.info(f"昨日涨停代码列表=={yesterday_limit_up_code_list}")
        # code = pre_trading_day_limit_up_info[0][0]
        # logger.info(f"股票代码=={code}")
        # cor_name = pre_trading_day_limit_up_info[0][1]
        # logger.info(f"公司名称=={cor_name}")
        # unknown_zero_2 = pre_trading_day_limit_up_info[0][2]
        # logger.info(f"未知零值2=={unknown_zero_2}")
        # none_data = pre_trading_day_limit_up_info[0][3]
        # logger.info(f"空数据=={none_data}")
        # # 总市值(万)?
        # total_market_value = round((pre_trading_day_limit_up_info[0][4] / 10000), 2)
        # logger.info(f"总市值=={total_market_value}(万)?")
        # # 最相关概念
        # the_most_relevant_plate = pre_trading_day_limit_up_info[0][5]
        # logger.info(f"最相关概念=={the_most_relevant_plate}")
        # # 收盘封单金额(万)
        # closing_amount = round((pre_trading_day_limit_up_info[0][6] / 10000), 2)
        # logger.info(f"收盘封单金额=={closing_amount}(万)")
        # # 最大封单金额(万)
        # maximum_blocked_amount = round((pre_trading_day_limit_up_info[0][7] / 10000), 2)
        # logger.info(f"最大封单金额=={maximum_blocked_amount}(万)")
        # # 主力净额
        # main_net_amount = round((pre_trading_day_limit_up_info[0][8] / 10000), 2)
        # logger.info(f"主力净额=={main_net_amount}(万)")
        # # 主力买
        # main_buyers = round((pre_trading_day_limit_up_info[0][9] / 10000), 2)
        # logger.info(f"主力买=={main_buyers}(万)")
        # # 主力卖
        # main_sellers = round((pre_trading_day_limit_up_info[0][10] / 10000), 2)
        # logger.info(f"主力卖=={main_sellers}(万)")
        # # 成交额
        # transaction_amount = round((pre_trading_day_limit_up_info[0][11] / 10000), 2)
        # logger.info(f"成交额=={transaction_amount}(万)")
        # # 所属精选板块
        # selected_plate = pre_trading_day_limit_up_info[0][12]
        # logger.info(f"所属精选板块=={selected_plate}")
        # # 实际流通
        # actual_circulation = round((pre_trading_day_limit_up_info[0][11] / 100000000), 2)
        # logger.info(f"实际流通=={actual_circulation}(亿)")
        # # 量比?(不是,不知道是什么)
        # equivalent_ratio = pre_trading_day_limit_up_info[0][14]
        # logger.info(f"量比?=={equivalent_ratio}")
        # # 领涨次数
        # leading_increases_times = pre_trading_day_limit_up_info[0][15]
        # logger.info(f"领涨次数=={leading_increases_times}")
        # # 未知零值
        # unknown_zero_16 = pre_trading_day_limit_up_info[0][16]
        # logger.info(f"未知零值16=={unknown_zero_16}")
        # # 未知零值
        # unknown_zero_17 = pre_trading_day_limit_up_info[0][17]
        # logger.info(f"未知零值17=={unknown_zero_17}")
        # # 第几板(连续涨停天数)
        # continuous_limit_up_days = pre_trading_day_limit_up_info[0][18]
        # logger.info(f"第几板=={continuous_limit_up_days}")
        # # 最相关概念的代码
        # the_most_relevant_plate_code = pre_trading_day_limit_up_info[0][19]
        # logger.info(f"最相关概念的代码=={the_most_relevant_plate_code}")
        # # 同班级的数量(同概念涨停数量)
        # the_same_class_amount = pre_trading_day_limit_up_info[0][20]
        # logger.info(f"同概念涨停数量=={the_same_class_amount}")
# get_handling_limit_up_info()
# 获取全部个股的板块并存储的函数
def get_all_stocks_plate_dict(stocks_list):
    all_stocks_plate_dict = {}
    # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中
    for i in stocks_list:
        try:
            code = i.split('.')[1]
            # print(f"i==={i}")
            # 获取个股的自由市值
            free_market_value = kpl_api.getZYLTAmount(code)
            # 获取个股的板块列表
            selected_blocks = kpl_api.getStockIDPlate(code)
            # 提取精选板块中的板块名称
            selected_plate_list = [block[1] for block in selected_blocks]
            # print(f"selected_block_names==={selected_block_list}")
            block_data = {
                # 添加自由市值
                'free_market_value': free_market_value,
                # 添加精选板块
                'plate': selected_plate_list
            }
            # 将code作为键,stocks_selected_block_data作为值添加到stocks_block_data字典中
            all_stocks_plate_dict[code] = block_data
            # print(f"all_stocks_plate_dict==={all_stocks_plate_dict}")
        except Exception as e:
            print(f"获取全部个股的板块并存储的函数 An error occurred: {e}")
        finally:
            pass
    # return stocks_plate_data
    # print(f"all_stocks_plate_dict==={len(all_stocks_plate_dict)}")
    # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中
    # 将字典转换为JSON格式的字符串
    json_data = json.dumps(all_stocks_plate_dict)
    # 写入文件
    with open(constant.ALL_STOCKS_PLATE_PATH, 'w', encoding='utf-8') as f:
        f.write(json_data)
    now_time = datetime.datetime.now()  # 获取本机时间
    logger_common.info(f"写入所有个股板块文件完成!::{now_time}")
# 计算开盘啦昨日拉取的概念数据中为空的股票数量函数
def get_have_no_plate_num():
    # 初始化无概念数量
    have_no_plate_num = 0
    plate_are_null_list = []
    for k, v in data_cache.all_stocks_plate_dict.items():
        pass
        # print(f"i==={i}  T==={t}")
        if len(v['plate']) == 0:
            have_no_plate_num += 1
            # print(f"{k}的概念为空")
            # logger.info(f"{k}的概念为空")
            # 股票代码格式转化为掘金格式
            symbol = basic_methods.format_stock_symbol(k)
            sec_name = data_cache.all_stocks_all_K_line_property_dict.get(symbol)
            if sec_name is not None:
                plate_are_null_list.append(sec_name)
    logger_common.info(f"有{have_no_plate_num}只股票概念为空")
    print(f"有{have_no_plate_num}只股票概念为空")
    logger_common.info(f"个股有历史K线但概念为空的有:{plate_are_null_list}")
# 获取全部个股的精选板块并存储的函数
def stocks_list_selected_blocks(min_stocks):
    stocks_selected_block_data = []
    # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中
    for i in min_stocks:
        try:
            code = i.split('.')[1]
            # 获取个股的自由市值
            free_market_value = kpl_api.getZYLTAmount(code)
            # 获取个股的精选板块列表
            # selected_blocks = getCodeJingXuanBlocks('000021')
            selected_blocks = kpl_api.getCodeJingXuanBlocks(code)
            # 提取精选板块中的板块名称
            selected_block_list = [block[1] for block in selected_blocks]
            # print(f"selected_block_names==={selected_block_list}")
            stocks_selected_block_dict = {
                # 添加股票代码
                'code': code,
                # 添加自由市值
                'free_market_value': free_market_value,
                # 添加精选板块
                'selected_block': selected_block_list
            }
            stocks_selected_block_data.append(stocks_selected_block_dict)
            # print(f"stocks_selected_block_data==={stocks_selected_block_dict}")
        except Exception as e:
            logger_debug.error(f"获取全部个股的精选板块并存储的函数 An error occurred: {e}")
    # print(f"stocks_selected_block_data==={len(stocks_selected_block_data)}")
    # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中
    # 将字典转换为JSON格式的字符串
    json_data = json.dumps(stocks_selected_block_data)
    # 写入文件
    with open('local_storage_data/stocks_selected_block_data.json', 'w', encoding='utf-8') as f:
        f.write(json_data)
    now_time = datetime.datetime.now()  # 获取本机时间
    print(f"写入精选板块文件完成!::{now_time}")
if __name__ == '__main__':
    # get_have_no_plate_num()
    pass
strategy/selling_strategy.py
@@ -311,22 +311,22 @@
                                    # 平开视界 < ±1%
                                    if -1 < today_open_growth < 1:
                                        logger_info(
                                            f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【浮盈】【平开】,平开:{today_open_growth}%,设定委卖数量【十分之五仓】")
                                        order_methods.sell_order_by_part_volume(0.5, symbol, position_volume_yesterday,
                                            f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【浮盈】【平开】,平开:{today_open_growth}%,设定委卖数量【四分之一仓】")
                                        order_methods.sell_order_by_part_volume(0.25, symbol, position_volume_yesterday,
                                                                                current_price,
                                                                                k_line_data[0]['sec_name'], index)
                                    # 小低开视界
                                    if -4 < today_open_growth <= -1:
                                        logger_info(
                                            f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【浮盈】【小低开】,低开:{today_open_growth}%,设定委卖数量【十分之七仓】")
                                        order_methods.sell_order_by_part_volume(0.7, symbol, position_volume_yesterday,
                                            f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【浮盈】【小低开】,低开:{today_open_growth}%,设定委卖数量【十分之三仓】")
                                        order_methods.sell_order_by_part_volume(0.3, symbol, position_volume_yesterday,
                                                                                current_price,
                                                                                k_line_data[0]['sec_name'], index)
                                    # 中低开/大低开视界
                                    if today_open_growth <= -4:
                                        logger_info(
                                            f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【中低开/大低开】,低开:{today_open_growth}%,设定委卖数量【全仓】")
                                        order_methods.sell_order_by_part_volume(1, symbol, position_volume_yesterday,
                                            f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【中低开/大低开】,低开:{today_open_growth}%,设定委卖数量【半仓】")
                                        order_methods.sell_order_by_part_volume(0.5, symbol, position_volume_yesterday,
                                                                                current_price,
                                                                                k_line_data[0]['sec_name'], index)