huaxin_client/l2_client.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
huaxin_client/l2_data_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
main.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/all_K_line.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/check_timer.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/data_cache.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/instant_time_market.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/kpl_api.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/local_data_management.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/logging_config.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/market_sentiment_analysis.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/plate_strength_analysis.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/selling_strategy.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
huaxin_client/l2_client.py
@@ -167,11 +167,12 @@ logger_l2_codes_subscript.exception(e) finally: # 保存一份最新的数据 self.__set_latest_datas(codes) # self.__set_latest_datas(codes) pass @classmethod def __set_latest_datas(cls, codes_data): data_str = json.dumps([tool.get_now_date_str(), codes_data]) data_str = json.dumps([tool.get_now_date_str(), list(codes_data)]) with open(constant.L2_CODES_INFO_PATH, mode='w') as f: f.write(data_str) huaxin_client/l2_data_manager.py
@@ -195,8 +195,7 @@ data = self.__real_time_buy1_data[code] # 如果最新的买1是原来买1的1/2时开始上传 if data[2] > 0 and data[3] / data[2] <= 0.5: self.data_callback_distribute_manager.get_distributed_callback(code).OnRealTimeBuy1Info(code, data) self.data_callback_distribute_manager.get_distributed_callback(code).OnRealTimeBuy1Info(code, data) except: pass finally: main.py
@@ -17,11 +17,11 @@ # 引入瞬时分时行情模块 # 引入账户管理模块【进行资金和仓位管理】 from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management, \ order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis, plate_strength_analysis from huaxin_client import l2_market_client, l2_client from log_module import async_log_util from trade import huaxin_trade_data_update, huaxin_trade_api from utils import hx_qc_value_util, huaxin_util from utils import hx_qc_value_util, huaxin_util, juejin_api, tool # 引入行情订阅模块 # import subscribe_market @@ -87,7 +87,7 @@ # 实时运行定时器线程【定时器函数目前 只管理 15:00 后运行一次 整理当日涨停信息 和 获取所有个股的板块概念】 threading.Thread(target=lambda: check_timer.check_time(), daemon=True).start() # 获取实时大盘行情情绪综合强度 [分数] 线程 threading.Thread(target=lambda: market_sentiment_analysis.get_real_time_market_strong(), daemon=True).start() threading.Thread(target=lambda: market_sentiment_analysis.set_plan_position_quantity(), daemon=True).start() # 实时检测是否拉取K线线程 threading.Thread(target=lambda: all_K_line.check_time_and_data_date(), daemon=True).start() # print(f"all_stocks_all_K_line_property_dict== {type(data_cache.all_stocks_all_K_line_property_dict)}") @@ -109,8 +109,8 @@ # 开启开盘啦 涨停列表 和 全盘个股概念板块 接口线程 # 涨停概念线程 # threading.Thread(target=kpl_api.kpl_limit_up_process, daemon=True).start() #该行代码为只运行单一线程不回调数据的方式 threading.Thread(target=kpl_api.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start() # threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, daemon=True).start() #该行代码为只运行单一线程不回调数据的方式 threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start() # # 开盘啦的板块强度下的个股强度回调函数 def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict): @@ -119,7 +119,7 @@ data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict # 板块强度下个股强度线程 threading.Thread(target=kpl_api.get_market_sift_plate_its_stock_power_process, threading.Thread(target=plate_strength_analysis.get_market_sift_plate_its_stock_power_process, args=(get_market_sift_plate_its_stock_power_process,), daemon=True).start() # 初始化get_current_data方法函数,下单买逻辑才会运行中。。。【核心主线程,随时考虑其启动顺序】>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @@ -127,13 +127,13 @@ try: # 计算开盘啦昨日拉取的概念数据中为空的股票数量 kpl_api.get_have_no_plate_num() plate_strength_analysis.get_have_no_plate_num() except Exception as e: logger_system.exception(e) # 获取历史涨停信息数据并整理 try: kpl_api.get_handling_limit_up_info() plate_strength_analysis.get_handling_limit_up_info() except Exception as e: logger_system.exception(e) @@ -143,19 +143,39 @@ except Exception as e: logger_system.exception(e) # # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】 # kpl_api.get_all_stocks_plate_dict(data_cache.min_stocks) # 持仓代码的L2数据回调 class MyPositionsL2DataCallback(L2DataCallBack): __last_price_dict = {} __pre_close_price_dict = {} # 昨日收盘价 def OnL2Transaction(self, code, datas): """ 昨日持仓L2逐笔成交回调 :param code: :param datas: :return: """ if datas: # 获取最近的成交价 price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3]) # TODO 涨停价变为非涨停价才处理 self.__last_price_dict[code] = price try: # 获取最近的成交价 if code not in self.__pre_close_price_dict: # 获取收盘价格 results = juejin_api.JueJinApi.history_n(tool.get_symbol(code), "1d", 1, 1, "close") if results: self.__pre_close_price_dict[code] = results[0]["close"] if self.__last_price_dict.get(code) == price: return limit_up_price = tool.get_limit_up_price(code, self.__pre_close_price_dict[code]) if code in self.__last_price_dict: if abs(limit_up_price - self.__last_price_dict[code]) < 0.0001 < abs(limit_up_price - price): # TODO 处理炸板逻辑 # 炸板 logger_debug.info(f"炸板:{code}-({price},{time_str})") finally: self.__last_price_dict[code] = price def OnMarketData(self, code, datas): # logger_debug.info(f"收到L2Market数据:{datas}") @@ -163,6 +183,7 @@ code = d["securityID"] buy1 = d["buy"][0] # 实时L2买1成交量 def OnRealTimeBuy1Info(self, code, buy1_info): # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量] async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}") strategy/all_K_line.py
@@ -12,9 +12,6 @@ import constant from log_module.log import logger_common # 引入掘金API # import utils.juejin_api # import kpl_api from strategy import data_cache # 引入基础算法模块 @@ -413,10 +410,10 @@ # # if now_time > data_cache.AFTER_CLOSING_TIME: # data_cache.execution = True # # 整理当日涨停信息并写入本地管理好本地数据 # kpl_api.get_arrange_limit_up_info() # plate_strength_analysis.get_arrange_limit_up_info() # logger.info(f"整理当日涨停信息 已经运行完成") # # # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】 # kpl_api.get_all_stocks_plate_dict(data_cache.min_stocks) # plate_strength_analysis.get_all_stocks_plate_dict(data_cache.min_stocks) # # logger.info(f"获取所有个股的板块概念 已经运行完成") # 构造一个循环检测K线子带你中所有个股代码下的日期是不是和某日一致的,如果不一致则返回False strategy/check_timer.py
@@ -5,7 +5,7 @@ import time from log_module.log import logger_common from strategy import data_cache, market_sentiment_analysis from strategy import data_cache, market_sentiment_analysis, plate_strength_analysis from strategy import kpl_api from utils import tool @@ -29,10 +29,10 @@ # print(f"now_time==={now_time}") if now_time > data_cache.AFTER_CLOSING_TIME and data_cache.execution is False: # 整理当日涨停信息并写入本地管理好本地数据 kpl_api.get_arrange_limit_up_info() plate_strength_analysis.get_arrange_limit_up_info() logger.info(f"整理当日涨停信息 已经运行完成") # # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】 kpl_api.get_all_stocks_plate_dict(data_cache.DataCache().filtered_stocks) plate_strength_analysis.get_all_stocks_plate_dict(data_cache.DataCache().filtered_stocks) logger.info(f"获取所有个股的板块概念 已经运行完成") # 完成了后将是否执行的开个标记为真 data_cache.execution = True strategy/data_cache.py
@@ -112,38 +112,29 @@ now_time = datetime.datetime.now().strftime("%H:%M:%S") # 定义并实时获取 当前时间 ''' 设定常用时间点【常量】 字符串格式=="09:25:12" ''' SERVER_RESTART_TIME = datetime.time(9, 00, 00).strftime("%H:%M:%S") # 定义9:00 L1_DATA_START_TIME = datetime.time(9, 15, 00).strftime("%H:%M:%S") # 定义9:15 BEFORE_OPEN_BIDDING_TIME = datetime.time(9, 20, 00).strftime("%H:%M:%S") # 定义9:20 OPEN_BIDDING_TIME = datetime.time(9, 25, 00).strftime("%H:%M:%S") # 定义 盘前 集合竞价 时间 LATER_OPEN_BIDDING_TIME = datetime.time(9, 25, 6).strftime("%H:%M:%S") # 定义 盘前 集合竞价 时间 AFTER_OPEN_BIDDING_TIME = datetime.time(9, 25, 12).strftime("%H:%M:%S") # 定义 集合竞价 开始后 时间 OPENING_TIME = datetime.time(9, 30, 00).strftime("%H:%M:%S") # 定义开盘时间 MORN_MARKET_TIME = datetime.time(9, 35, 00).strftime("%H:%M:%S") # 定义早盘时间 MORN_MARKET_CLOSING_TIME = datetime.time(11, 30, 00).strftime("%H:%M:%S") # 定义上午收盘时间 NOON_MARKET_OPENING_TIME = datetime.time(13, 0, 00).strftime("%H:%M:%S") # 定义下午开盘时间 NOON_MARKET_TIME = datetime.time(13, 5, 00).strftime("%H:%M:%S") # 定义午盘时间 CLOSE_POSITION_TIME = datetime.time(14, 55, 00).strftime("%H:%M:%S") # 定义平仓时间 WATCH_DISK_END_TIME = datetime.time(14, 56, 00).strftime("%H:%M:%S") # 定义 板上盯结束时间 CLOSE_BIDDING_TIME = datetime.time(14, 57, 00).strftime("%H:%M:%S") # 定义 盘后 集合竞价 时间 CLOSING_TIME = datetime.time(15, 00, 00).strftime("%H:%M:%S") # 定义 收盘时间 AFTER_CLOSING_TIME = datetime.time(15, 1, 00).strftime("%H:%M:%S") # 定义 收盘后时间 CHECKING_DATA_TIME = datetime.time(17, 00, 00).strftime("%H:%M:%S") # 定义 检查数据时间 UPDATE_DATA_TIME = datetime.time(18, 31, 00).strftime("%H:%M:%S") # 定义更新数据时间 PROGRAM_SLEEP_TIME = datetime.time(23, 00, 00).strftime("%H:%M:%S") # 定义程序休眠时间 # todo 2025-03-25 后无BUG即可彻底删除下处注释部分 # # 读取已经获取到并存储在本地的目标范围的个股的板块概念 # # 读取JSON文件并解析为字典 # if os.path.exists(constant.ALL_STOCKS_PLATE_PATH): # with open(constant.ALL_STOCKS_PLATE_PATH, 'r', # encoding='utf-8') as f: # json_data = f.read() # else: # json_data = "{}" # all_stocks_plate_dict = json.loads(json_data) # logger.info(f"all_stocks_plate_dict的数量={len(all_stocks_plate_dict)}") SERVER_RESTART_TIME = "09:00:00" # 服务器重启时间 L1_DATA_START_TIME = "09:15:00" # L1数据开始时间 BEFORE_OPEN_BIDDING_TIME = "09:20:00" # 【盘前】集合竞价开始前时间 OPEN_BIDDING_TIME = "09:25:00" # 【盘前】集合竞价开始时间 LATER_OPEN_BIDDING_TIME = "09:25:06" # 【盘前】集合竞价开始后瞬间 AFTER_OPEN_BIDDING_TIME = "09:25:12" # 【盘前】集合竞价开始后一会 OPENING_TIME = "09:30:00" # 上午开盘时间 MORN_MARKET_TIME = "09:35:00" # 早盘黄金五分钟 MORN_MARKET_CLOSING_TIME = "11:30:00" # 上午收盘时间 NOON_MARKET_OPENING_TIME = "13:00:00" # 下午开盘时间 NOON_MARKET_TIME = "13:05:00" # 午盘黄金五分钟 CLOSE_POSITION_TIME = "14:55:00" # 尾盘平仓时间 WATCH_DISK_END_TIME = "14:56:00" # 板上盯结束时间 CLOSE_BIDDING_TIME = "14:57:00" # 【盘后】集合竞价开始 CLOSING_TIME = "15:00:00" # 定义 收盘时间 AFTER_CLOSING_TIME = "15:01:00" # 下午收盘后时间 CHECKING_DATA_TIME = "17:00:00" # 检查数据时间 UPDATE_DATA_TIME = "18:31:00" # 更新数据时间 PROGRAM_SLEEP_TIME = "22:00:00" # 程序休眠时间【不能在23点之后仍运行,获取到的数据可能有谬误】 # 初始化当日当时最高价 high_price = 0 strategy/instant_time_market.py
@@ -344,7 +344,7 @@ # 分批处理数据 ds = [] total_count = len(current_infos) page = 15 page = 20 page_size = total_count // page + 1 for p in range(page): temp_list = current_infos[p * page_size:(p + 1) * page_size] strategy/kpl_api.py
@@ -10,10 +10,9 @@ import requests import constant from log_module import async_log_util from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_Overall_market_strength_score, \ logger_stock_of_markets_plate, logger_debug # import requests from strategy import data_cache from strategy import basic_methods from strategy.kpl_data_manager import KPLStockOfMarketsPlateLogManager @@ -185,8 +184,6 @@ return json.dumps({"errcode": 0, "list": fresults}) # 获取涨停信息数据 def get_limit_up_info(): # 获取涨停信息列表 @@ -239,510 +236,510 @@ # -------------------------------------------------------------------------------------------------------------------------------------------------------------- # 获取行情精选板块 强度排名 def get_market_sift_plate_its_stock_power(): @dask.delayed def batch_get_plate_codes(fs): return fs @dask.delayed def request_plate_codes(i): plate_name = i[1] log_data = None its_stock = json.loads(getCodesByPlate(i[0])) now_time_str = tool.get_now_time_str() if data_cache.OPENING_TIME < now_time_str < data_cache.NOON_MARKET_TIME: log_data = {plate_name: its_stock['list']} # 尝试过滤掉无意义的概念板块(plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革','超跌', '壳资源', '股权转让', '送转填权']) and '增长' in plate_name if (plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革', '超跌', '壳资源', '股权转让', '送转填权']) or ('增长' in plate_name): # print(f"{i[1]} 强度:{i[2]}") # 通过板块ID获取其下面的个股强度列表 # print(f"======={i[0]}=======") # its_stock_list_info = its_stock['list'] # logger.info(f"its_stock_list_info==={its_stock_list_info}") # 将板块强度下面对应的个股列表打印到日志中 # for i in its_stock_list_info: # if i[0] != 1: # logger.info( # f"l === 个股代码:{i[0]},公司名称:{i[1]},主力资金推测:{i[2]},未知0值:{i[3]},概念:{i[4]},最新价:{i[5]},当日当时涨幅:{i[6]}%," # f"成交额:{round(i[7] / 100000000, 2)} 亿,实际换手率:{i[8]}%,未知0值:{i[9]},实际流通:{round(i[10] / 100000000, 2)}亿," # f"主力买:{round(i[11] / 100000000, 2)}亿," # f"主力卖:{round(i[12] / 100000000, 2)}亿," # f"主力净额:{round(i[13] / 10000, 2)}万,买成占比:{i[14]}%,卖成占比:{i[15]}%,净成占比:{i[16]}%,买流占比:{i[17]}%,卖流占比:{i[18]}%,净流占比:{i[19]}%," # f"区间涨幅:{i[20]}%,量比:{i[21]},未知0:{i[22]},上板情况:{i[23]},上板排名:{i[24]},换手率:{i[25]}%," # f"未知空值:{i[26]},未知零值:{i[27]},收盘封单:{i[28]},最大封单:{i[29]},未知空值?:{i[30]}," # f"?:{i[30]}%,?:{i[31]},??:{i[32]},振幅:{i[33]}%,未知0????:{i[34]},未知0?????:{i[35]}," # f"?=:{i[36]},?总市值:{i[37]},?流通市值:{i[38]},最终归属概念(收盘后出数据?):{i[39]},领涨次数:{i[40]}," # f"41未知1值:{i[41]},第三季度机构持仓【str数据勿用运算符】:{i[42]}万,?年预测净利润:{i[43]},上年预测净利润:{i[44]},年内预测净利润:{i[45]}" # ) # 初始化股票强度列表 stock_power_list = [] for s in its_stock['list']: # 过滤掉涨幅大于 and s[6] < 6.5 且小于0%的 和 名称中包含ST的 和 涨速小于等于0%的 和 只要昨日未涨停 和 上证或深证的正股 and s[9] > 0.0025 if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find("板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1: # print(f"{s[1]},个股代码:{s[0]}, 涨幅:{s[6]}% 涨速:{s[9]}% 概念:{s[4]} 主力资金推测:{s[2]} 领涨次数:{s[40]} 今日第几板:{s[23]} 是否破版{s[24]}") # 对个股强度 主要 属性列表进行装填 its_stock_power = [s[1], s[0], s[6], s[9], s[4], s[2], s[40]] # 逐个选择性添加its_stock中的元素到个股强度列表中 # print(f"its_stock_power===={its_stock_power}") # 整体将添加完善的个股强度列表添加到股票列表中 stock_power_list.append(its_stock_power) # print(f"stock_power_list===={stock_power_list}") # 过滤掉没有瞬时高强度个股的空概念 if len(stock_power_list) != 0: # 将对应板块的股票强度列表新建一个字典 stock_power_item = {i[1]: stock_power_list} # 并更新到精选板块个股字典中 market_sift_plate_stock_dict.update(stock_power_item) return log_data data = (getMarketJingXuanRealRankingInfo()) market_sift_plate = json.loads(data) # logger_kpl_jingxuan_in 打印的日志专用于开盘了数据的存储分析,不能轻易删除 # print(f"market_sift_plate 数 ======{len(market_sift_plate['list'])}") # 行情》精选板块》排名前20中》对应个股》符合条件的个股 # logger.info(f"market_sift_plate['list']======{market_sift_plate['list']}") # logger.info(f"market_sift_plate['list'][0] ======{market_sift_plate['list'][0]}") # 初始化精选板块对应个股字典 market_sift_plate_stock_dict = {} if 'list' in market_sift_plate: ds = [] for d in market_sift_plate['list']: ds.append(request_plate_codes(d)) dask_result = batch_get_plate_codes(ds) compute_results = dask_result.compute() log_datas = {} for r in compute_results: if not r: continue for b in r: log_datas[b] = r[b] now_time = tool.get_now_time_str() if data_cache.L1_DATA_START_TIME < now_time < data_cache.NOON_MARKET_TIME: # logger.info(f"精选板块股票强度数据更新 == {market_sift_plate_stock_dict}") # 只在盘中时间获取 KPLStockOfMarketsPlateLogManager().add_log(market_sift_plate['list'], log_datas) return market_sift_plate_stock_dict # 调用一下获取精选板块股票强度数据函数 【本模块内使用时调用】 # get_market_sift_plate_its_stock_power() def get_market_sift_plate_its_stock_power_process(callback): while True: try: # now = time.time() # print(f"kpl_limit_up_process开始了{now}") start_time = time.time() now_time = tool.get_now_time_str() if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME: its_stock_power = get_market_sift_plate_its_stock_power() time_str = datetime.datetime.now().strftime("%H%M%S") if 92900 < int(time_str) < 95000: logger_kpl_jingxuan_in.info(f"耗时:{time.time() - start_time} 数据:{its_stock_power}") callback(its_stock_power) # print(f"精选板块拉升个股更新===={its_stock_power}") except Exception as e: logger_debug.exception(e) logger.error(f"开盘啦板块强度线程报错An error occurred: {e}") finally: time.sleep(2) # 获取涨停板块名称列表并存储本地的函数 def get_limit_up_block_names(): # 设定当前时间点 now_time = tool.get_now_time_str() # print(f"now_time===={now_time}") if data_cache.SERVER_RESTART_TIME < now_time < data_cache.UPDATE_DATA_TIME: # print(f"在时间内使用--------------------------") # 获取涨停信息列表 limit_up_info = get_limit_up_info() # print(f"limit_up_info=={limit_up_info}") data_cache.limit_up_info = get_limit_up_info() # 提取涨停列表中的板块名称 limit_up_block_names = [] # 循环添加涨停概念 for i in limit_up_info: limit_up_block_names.append(i[5]) # print(f"limit_up_block_names==={limit_up_block_names}") # return limit_up_block_names # # 使用Counter计算每个元素的出现次数 # counter = Counter(limit_up_block_names) # # 找出出现次数最多的元素及其次数 # most_common_element, most_common_count = counter.most_common(1)[0] # # 打印出现次数最多的元素 # print(f"主线概念:{most_common_element},出现了 {most_common_count} 次") return limit_up_block_names # 为开盘啦接口获取的涨停列表概念板块单独开一个进程 形参(callback) def kpl_limit_up_process(callback): while True: try: # now = time.time() # print(f"kpl_limit_up_process开始了{now}") limit_up_block_names = get_limit_up_block_names() callback(limit_up_block_names) # logger.info(f"涨停更新===={limit_up_block_names}") # print(f"涨停更新数量===={len(limit_up_block_names)}") # print(f"kpl_limit_up_process完成一下{now}") except Exception as e: logger.error(f"开盘啦涨停板块概念线程报错An error occurred: {e}") finally: time.sleep(1.5) # kpl_limit_up_process() # 构建涨停信息读写对象 class DailyLimitUpInfoStorageManager: # 初始化文件路径 def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH): self.file_path = file_path # 添加单日涨停信息数据到文件中的一行 函数 def append_data_to_file(self, data_to_append): # print(f"data_to_append=={data_to_append}") # 读取所有行并解析为 JSON 对象列表 if os.path.exists(self.file_path): with open(self.file_path, 'r', encoding='utf-8') as file: # 获取当前日期并格式化 current_date = datetime.datetime.now().strftime('%Y-%m-%d') lines = [json.loads(line.strip()) for line in file if line.strip()] # print(f"lines type=={type(lines)}") # print(f"lines=={lines}") # 检查当前日期是否已存在于文件中 if lines: # 如果读取到的行文件列表不为空(为真) if lines[-1].get(current_date) is None: # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期) # 将日期和data_to_append转换为JSON格式的字符串 json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' # 打开文件并追加JSON行 with open(self.file_path, 'a', encoding='utf-8') as file: file.write(json_line) else: logger.info(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)") else: json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' # 打开文件并追加JSON行 with open(self.file_path, 'a', encoding='utf-8') as file: file.write(json_line) # 清理多余数据函数 def check_and_remove_oldest_entry(self, max_entries): # 读取所有行并解析为 JSON 对象列表 if os.path.exists(self.file_path): with open(self.file_path, 'r', encoding='utf-8') as file: lines = [json.loads(line.strip()) for line in file if line.strip()] else: lines = [] # 如果行数超过限制,移除最早的一些行 if len(lines) >= max_entries: # 截断列表,只保留最新的 max_entries 个对象 lines = lines[-max_entries:] # 重新打开文件以写入模式,并写入截断后的对象列表为 JSON Lines with open(self.file_path, 'w', encoding='utf-8') as file: for obj in lines: file.write(json.dumps(obj, ensure_ascii=False) + '\n') # file.write(json.dumps(obj, ensure_ascii=False)) # 隔行整理数据并合并装入一个字典数据中调用时返回这个字典数据 函数 def arrange_limit_up_info(self): limit_info = {} # 创建一个列表来存储所有解析的 JSON 对象 if os.path.exists(self.file_path): with open(self.file_path, 'r', encoding='utf-8') as file: for line in file: # 去除每行末尾的换行符(如果有的话) line = line.rstrip('\n') # 将每行解析为一个 JSON 对象 info = json.loads(line) # 假设每行都是一个字典数据,且只有一个键值对,其中键是日期 if isinstance(info, dict) and len(info) == 1: date, data = list(info.items())[0] limit_info[date] = data return limit_info # 构建一个获取读写存储本地的并整理涨停数据的函数 def get_arrange_limit_up_info(): # 实例化每日涨停信息整理方法 manager = DailyLimitUpInfoStorageManager() manager.append_data_to_file(get_limit_up_info()) manager.check_and_remove_oldest_entry(max_entries=1000) # 构建一个处理历史涨停涨停信息数据的函数 def get_handling_limit_up_info(): # 实例化每日涨停信息整理方法 history_limit_up_info = DailyLimitUpInfoStorageManager() data_cache.daily_limit_up_info = history_limit_up_info.arrange_limit_up_info() # logger.info(f"读本地的日更的历史涨停数据=={data_cache.daily_limit_up_info}") # print(f"daily_limit_up_info 类型==={type(data_cache.daily_limit_up_info)}") # 统计每日主线 daily_limit_up_info_len = len(data_cache.daily_limit_up_info) # print(f"daily_limit_up_info_len==={daily_limit_up_info_len}") historical_transaction_date_list = [] date_of_the_day = data_cache.DataCache().today_date for i in range(daily_limit_up_info_len): pre_date = hx_qc_value_util.get_previous_trading_date(date_of_the_day) # 获取前一个交易日API # target_date_str = basic_methods.pre_num_trading_day(data_cache.today_date, daily_limit_up_info_len) date_format = "%Y-%m-%d" target_date = datetime.datetime.strptime(pre_date, date_format).strftime("%Y-%m-%d") historical_transaction_date_list.append(target_date) date_of_the_day = pre_date # print(f"historical_transaction_date_list={historical_transaction_date_list}") history_sorted_plate_ranking_list = [] for key, value in data_cache.daily_limit_up_info.items(): # print(f"key=={key}") for i in historical_transaction_date_list: # print(f"i======={i}") # 找到每上一个交易日对应的本地数据的信息 if key == i: # print(f"{key}===找到了!value={value}") # plate_ranking_list = [] # 遍历交易日每一个涨停股的信息 for v in value: # print(f"v =={v}") # 将每一个涨停股的涨停概念和同班级数量 汇编为一个字典 plate_limit_up_num_dict = { v[5]: v[20] } # 将这个字典数据不重复的添加到概念排名列表中 if plate_limit_up_num_dict not in plate_ranking_list: plate_ranking_list.append(plate_limit_up_num_dict) # plate_ranking_set.add(v[20]) # print(f"plate_ranking_list={plate_ranking_list}") # 使用sorted函数和lambda表达式来根据字典的值进行排序 # 这里我们确保不修改原始字典,仅通过list(x.values())[0]来获取值 sorted_plate_ranking_list = sorted(plate_ranking_list, key=lambda x: list(x.values())[0], reverse=True) # logger.info(f"{key}=====>>>>{sorted_plate_ranking_list}") history_sorted_plate_ranking_list.append(sorted_plate_ranking_list) # print(f"history_sorted_plate_ranking_list={history_sorted_plate_ranking_list}") # for ranking_list in history_sorted_plate_ranking_list: # print(f"ranking_list={ranking_list}") # for i in ranking_list: # print(f"i={i}") # 计算历史涨停概念的连续出现次数函数 def count_key_occurrences(list_of_dicts_lists): # 创建一个字典来存储每个键的总出现次数 key_counts = {} # 遍历列表中的每个字典列表 for sublist in list_of_dicts_lists: # 遍历当前字典列表中的每个字典 for dict_item in sublist: # 遍历字典中的每个键 for key in dict_item: # 如果键不在key_counts中,则初始化计数为0 if key not in key_counts: key_counts[key] = 0 # 增加当前键的计数 key_counts[key] += 1 # 打印结果 for key, count in key_counts.items(): if count > 1: logger.info(f"'{key}' 连续出现 {count} 次") # 调用函数,传入整个列表 # count_key_occurrences(history_sorted_plate_ranking_list) # daily_limit_up_info_list = list(reversed(daily_limit_up_info_list)) # print(f"daily_limit_up_info_list==={daily_limit_up_info_list}") # 获取昨日涨停代码 (以便与K线对比) pre_trading_day_limit_up_info = data_cache.daily_limit_up_info.get(data_cache.DataCache().pre_trading_day) if pre_trading_day_limit_up_info is not None: yesterday_limit_up_code_list = [] for i in pre_trading_day_limit_up_info: symbol_code = basic_methods.format_stock_symbol(i[0]) limit_up_code = symbol_code yesterday_limit_up_code_list.append(limit_up_code) data_cache.yesterday_limit_up_code_list = yesterday_limit_up_code_list logger.info(f"昨日涨停股票数量=={len(data_cache.yesterday_limit_up_code_list)}") logger.info(f"昨日涨停代码列表=={yesterday_limit_up_code_list}") # code = pre_trading_day_limit_up_info[0][0] # logger.info(f"股票代码=={code}") # cor_name = pre_trading_day_limit_up_info[0][1] # logger.info(f"公司名称=={cor_name}") # unknown_zero_2 = pre_trading_day_limit_up_info[0][2] # logger.info(f"未知零值2=={unknown_zero_2}") # none_data = pre_trading_day_limit_up_info[0][3] # logger.info(f"空数据=={none_data}") # # 总市值(万)? # total_market_value = round((pre_trading_day_limit_up_info[0][4] / 10000), 2) # logger.info(f"总市值=={total_market_value}(万)?") # # 最相关概念 # the_most_relevant_plate = pre_trading_day_limit_up_info[0][5] # logger.info(f"最相关概念=={the_most_relevant_plate}") # # 收盘封单金额(万) # closing_amount = round((pre_trading_day_limit_up_info[0][6] / 10000), 2) # logger.info(f"收盘封单金额=={closing_amount}(万)") # # 最大封单金额(万) # maximum_blocked_amount = round((pre_trading_day_limit_up_info[0][7] / 10000), 2) # logger.info(f"最大封单金额=={maximum_blocked_amount}(万)") # # 主力净额 # main_net_amount = round((pre_trading_day_limit_up_info[0][8] / 10000), 2) # logger.info(f"主力净额=={main_net_amount}(万)") # # 主力买 # main_buyers = round((pre_trading_day_limit_up_info[0][9] / 10000), 2) # logger.info(f"主力买=={main_buyers}(万)") # # 主力卖 # main_sellers = round((pre_trading_day_limit_up_info[0][10] / 10000), 2) # logger.info(f"主力卖=={main_sellers}(万)") # # 成交额 # transaction_amount = round((pre_trading_day_limit_up_info[0][11] / 10000), 2) # logger.info(f"成交额=={transaction_amount}(万)") # # 所属精选板块 # selected_plate = pre_trading_day_limit_up_info[0][12] # logger.info(f"所属精选板块=={selected_plate}") # # 实际流通 # actual_circulation = round((pre_trading_day_limit_up_info[0][11] / 100000000), 2) # logger.info(f"实际流通=={actual_circulation}(亿)") # # 量比?(不是,不知道是什么) # equivalent_ratio = pre_trading_day_limit_up_info[0][14] # logger.info(f"量比?=={equivalent_ratio}") # # 领涨次数 # leading_increases_times = pre_trading_day_limit_up_info[0][15] # logger.info(f"领涨次数=={leading_increases_times}") # # 未知零值 # unknown_zero_16 = pre_trading_day_limit_up_info[0][16] # logger.info(f"未知零值16=={unknown_zero_16}") # # 未知零值 # unknown_zero_17 = pre_trading_day_limit_up_info[0][17] # logger.info(f"未知零值17=={unknown_zero_17}") # # 第几板(连续涨停天数) # continuous_limit_up_days = pre_trading_day_limit_up_info[0][18] # logger.info(f"第几板=={continuous_limit_up_days}") # # 最相关概念的代码 # the_most_relevant_plate_code = pre_trading_day_limit_up_info[0][19] # logger.info(f"最相关概念的代码=={the_most_relevant_plate_code}") # # 同班级的数量(同概念涨停数量) # the_same_class_amount = pre_trading_day_limit_up_info[0][20] # logger.info(f"同概念涨停数量=={the_same_class_amount}") # get_handling_limit_up_info() # 获取全部个股的板块并存储的函数 def get_all_stocks_plate_dict(stocks_list): all_stocks_plate_dict = {} # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中 for i in stocks_list: try: code = i.split('.')[1] # print(f"i==={i}") # 获取个股的自由市值 free_market_value = getZYLTAmount(code) # 获取个股的板块列表 selected_blocks = getStockIDPlate(code) # 提取精选板块中的板块名称 selected_plate_list = [block[1] for block in selected_blocks] # print(f"selected_block_names==={selected_block_list}") block_data = { # 添加自由市值 'free_market_value': free_market_value, # 添加精选板块 'plate': selected_plate_list } # 将code作为键,stocks_selected_block_data作为值添加到stocks_block_data字典中 all_stocks_plate_dict[code] = block_data # print(f"all_stocks_plate_dict==={all_stocks_plate_dict}") except Exception as e: print(f"获取全部个股的板块并存储的函数 An error occurred: {e}") finally: pass # return stocks_plate_data # print(f"all_stocks_plate_dict==={len(all_stocks_plate_dict)}") # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中 # 将字典转换为JSON格式的字符串 json_data = json.dumps(all_stocks_plate_dict) # 写入文件 with open(constant.ALL_STOCKS_PLATE_PATH, 'w', encoding='utf-8') as f: f.write(json_data) now_time = datetime.datetime.now() # 获取本机时间 logger.info(f"写入所有个股板块文件完成!::{now_time}") # 计算开盘啦昨日拉取的概念数据中为空的股票数量函数 def get_have_no_plate_num(): # 初始化无概念数量 have_no_plate_num = 0 plate_are_null_list = [] for k, v in data_cache.all_stocks_plate_dict.items(): pass # print(f"i==={i} T==={t}") if len(v['plate']) == 0: have_no_plate_num += 1 # print(f"{k}的概念为空") # logger.info(f"{k}的概念为空") # 股票代码格式转化为掘金格式 symbol = basic_methods.format_stock_symbol(k) sec_name = data_cache.all_stocks_all_K_line_property_dict.get(symbol) if sec_name is not None: plate_are_null_list.append(sec_name) logger.info(f"有{have_no_plate_num}只股票概念为空") logger.info(f"个股有历史K线但概念为空的有:{plate_are_null_list}") # 获取全部个股的精选板块并存储的函数 def stocks_list_selected_blocks(min_stocks): stocks_selected_block_data = [] # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中 for i in min_stocks: try: code = i.split('.')[1] # 获取个股的自由市值 free_market_value = getZYLTAmount(code) # 获取个股的精选板块列表 # selected_blocks = getCodeJingXuanBlocks('000021') selected_blocks = getCodeJingXuanBlocks(code) # 提取精选板块中的板块名称 selected_block_list = [block[1] for block in selected_blocks] # print(f"selected_block_names==={selected_block_list}") stocks_selected_block_dict = { # 添加股票代码 'code': code, # 添加自由市值 'free_market_value': free_market_value, # 添加精选板块 'selected_block': selected_block_list } stocks_selected_block_data.append(stocks_selected_block_dict) # print(f"stocks_selected_block_data==={stocks_selected_block_dict}") except Exception as e: logger.error(f"获取全部个股的精选板块并存储的函数 An error occurred: {e}") # print(f"stocks_selected_block_data==={len(stocks_selected_block_data)}") # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中 # 将字典转换为JSON格式的字符串 json_data = json.dumps(stocks_selected_block_data) # 写入文件 with open('local_storage_data/stocks_selected_block_data.json', 'w', encoding='utf-8') as f: f.write(json_data) now_time = datetime.datetime.now() # 获取本机时间 print(f"写入精选板块文件完成!::{now_time}") # kpl_stocks_list_selected_blocks_process() #在 kpl_api.py中可以调用 # stocks_list_selected_blocks(min_stocks) #在 kpl_api.py中可以调用 # list = ['SHSE.600805','SHSE.600804'] # # all_stocks_plate_dict(list) # # 获取行情精选板块 强度排名 # def get_market_sift_plate_its_stock_power(): # @dask.delayed # def batch_get_plate_codes(fs): # return fs # # @dask.delayed # def request_plate_codes(i): # plate_name = i[1] # log_data = None # its_stock = json.loads(getCodesByPlate(i[0])) # now_time_str = tool.get_now_time_str() # if data_cache.OPENING_TIME < now_time_str < data_cache.NOON_MARKET_TIME: # log_data = {plate_name: its_stock['list']} # # 尝试过滤掉无意义的概念板块(plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革','超跌', '壳资源', '股权转让', '送转填权']) and '增长' in plate_name # if (plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革', '超跌', # '壳资源', '股权转让', '送转填权']) or ('增长' in plate_name): # # # print(f"{i[1]} 强度:{i[2]}") # # 通过板块ID获取其下面的个股强度列表 # # print(f"======={i[0]}=======") # # # its_stock_list_info = its_stock['list'] # # logger.info(f"its_stock_list_info==={its_stock_list_info}") # # 将板块强度下面对应的个股列表打印到日志中 # # for i in its_stock_list_info: # # if i[0] != 1: # # logger.info( # # f"l === 个股代码:{i[0]},公司名称:{i[1]},主力资金推测:{i[2]},未知0值:{i[3]},概念:{i[4]},最新价:{i[5]},当日当时涨幅:{i[6]}%," # # f"成交额:{round(i[7] / 100000000, 2)} 亿,实际换手率:{i[8]}%,未知0值:{i[9]},实际流通:{round(i[10] / 100000000, 2)}亿," # # f"主力买:{round(i[11] / 100000000, 2)}亿," # # f"主力卖:{round(i[12] / 100000000, 2)}亿," # # f"主力净额:{round(i[13] / 10000, 2)}万,买成占比:{i[14]}%,卖成占比:{i[15]}%,净成占比:{i[16]}%,买流占比:{i[17]}%,卖流占比:{i[18]}%,净流占比:{i[19]}%," # # f"区间涨幅:{i[20]}%,量比:{i[21]},未知0:{i[22]},上板情况:{i[23]},上板排名:{i[24]},换手率:{i[25]}%," # # f"未知空值:{i[26]},未知零值:{i[27]},收盘封单:{i[28]},最大封单:{i[29]},未知空值?:{i[30]}," # # f"?:{i[30]}%,?:{i[31]},??:{i[32]},振幅:{i[33]}%,未知0????:{i[34]},未知0?????:{i[35]}," # # f"?=:{i[36]},?总市值:{i[37]},?流通市值:{i[38]},最终归属概念(收盘后出数据?):{i[39]},领涨次数:{i[40]}," # # f"41未知1值:{i[41]},第三季度机构持仓【str数据勿用运算符】:{i[42]}万,?年预测净利润:{i[43]},上年预测净利润:{i[44]},年内预测净利润:{i[45]}" # # ) # # # 初始化股票强度列表 # stock_power_list = [] # for s in its_stock['list']: # # 过滤掉涨幅大于 and s[6] < 6.5 且小于0%的 和 名称中包含ST的 和 涨速小于等于0%的 和 只要昨日未涨停 和 上证或深证的正股 and s[9] > 0.0025 # if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find("板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1: # # print(f"{s[1]},个股代码:{s[0]}, 涨幅:{s[6]}% 涨速:{s[9]}% 概念:{s[4]} 主力资金推测:{s[2]} 领涨次数:{s[40]} 今日第几板:{s[23]} 是否破版{s[24]}") # # 对个股强度 主要 属性列表进行装填 # its_stock_power = [s[1], s[0], s[6], s[9], s[4], s[2], s[40]] # # 逐个选择性添加its_stock中的元素到个股强度列表中 # # print(f"its_stock_power===={its_stock_power}") # # 整体将添加完善的个股强度列表添加到股票列表中 # stock_power_list.append(its_stock_power) # # print(f"stock_power_list===={stock_power_list}") # # 过滤掉没有瞬时高强度个股的空概念 # if len(stock_power_list) != 0: # # 将对应板块的股票强度列表新建一个字典 # stock_power_item = {i[1]: stock_power_list} # # 并更新到精选板块个股字典中 # market_sift_plate_stock_dict.update(stock_power_item) # return log_data # # data = (getMarketJingXuanRealRankingInfo()) # market_sift_plate = json.loads(data) # # logger_kpl_jingxuan_in 打印的日志专用于开盘了数据的存储分析,不能轻易删除 # # print(f"market_sift_plate 数 ======{len(market_sift_plate['list'])}") # # 行情》精选板块》排名前20中》对应个股》符合条件的个股 # # logger.info(f"market_sift_plate['list']======{market_sift_plate['list']}") # # logger.info(f"market_sift_plate['list'][0] ======{market_sift_plate['list'][0]}") # # 初始化精选板块对应个股字典 # market_sift_plate_stock_dict = {} # if 'list' in market_sift_plate: # ds = [] # for d in market_sift_plate['list']: # ds.append(request_plate_codes(d)) # dask_result = batch_get_plate_codes(ds) # compute_results = dask_result.compute() # log_datas = {} # for r in compute_results: # if not r: # continue # for b in r: # log_datas[b] = r[b] # now_time = tool.get_now_time_str() # if data_cache.L1_DATA_START_TIME < now_time < data_cache.NOON_MARKET_TIME: # # logger.info(f"精选板块股票强度数据更新 == {market_sift_plate_stock_dict}") # # 只在盘中时间获取 # KPLStockOfMarketsPlateLogManager().add_log(market_sift_plate['list'], log_datas) # # return market_sift_plate_stock_dict # # # # 调用一下获取精选板块股票强度数据函数 【本模块内使用时调用】 # # get_market_sift_plate_its_stock_power() # # def get_market_sift_plate_its_stock_power_process(callback): # while True: # try: # # now = time.time() # # print(f"kpl_limit_up_process开始了{now}") # start_time = time.time() # now_time = tool.get_now_time_str() # if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME: # its_stock_power = get_market_sift_plate_its_stock_power() # time_str = datetime.datetime.now().strftime("%H%M%S") # if 92900 < int(time_str) < 95000: # logger_kpl_jingxuan_in.info(f"耗时:{time.time() - start_time} 数据:{its_stock_power}") # callback(its_stock_power) # # print(f"精选板块拉升个股更新===={its_stock_power}") # except Exception as e: # logger_debug.exception(e) # logger.error(f"开盘啦板块强度线程报错An error occurred: {e}") # finally: # time.sleep(2) # # # # 获取涨停板块名称列表并存储本地的函数 # def get_limit_up_block_names(): # # 设定当前时间点 # now_time = tool.get_now_time_str() # # print(f"now_time===={now_time}") # if data_cache.SERVER_RESTART_TIME < now_time < data_cache.UPDATE_DATA_TIME: # # print(f"在时间内使用--------------------------") # # 获取涨停信息列表 # limit_up_info = get_limit_up_info() # # print(f"limit_up_info=={limit_up_info}") # data_cache.limit_up_info = get_limit_up_info() # # 提取涨停列表中的板块名称 # limit_up_block_names = [] # # 循环添加涨停概念 # for i in limit_up_info: # limit_up_block_names.append(i[5]) # # print(f"limit_up_block_names==={limit_up_block_names}") # # return limit_up_block_names # # # 使用Counter计算每个元素的出现次数 # # counter = Counter(limit_up_block_names) # # # 找出出现次数最多的元素及其次数 # # most_common_element, most_common_count = counter.most_common(1)[0] # # # 打印出现次数最多的元素 # # print(f"主线概念:{most_common_element},出现了 {most_common_count} 次") # # return limit_up_block_names # # # # 为开盘啦接口获取的涨停列表概念板块单独开一个进程 形参(callback) # def kpl_limit_up_process(callback): # while True: # try: # # now = time.time() # # print(f"kpl_limit_up_process开始了{now}") # limit_up_block_names = get_limit_up_block_names() # callback(limit_up_block_names) # # logger.info(f"涨停更新===={limit_up_block_names}") # # print(f"涨停更新数量===={len(limit_up_block_names)}") # # print(f"kpl_limit_up_process完成一下{now}") # except Exception as e: # logger.error(f"开盘啦涨停板块概念线程报错An error occurred: {e}") # finally: # time.sleep(1.5) # # # # kpl_limit_up_process() # # # # 构建涨停信息读写对象 # class DailyLimitUpInfoStorageManager: # # 初始化文件路径 # def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH): # self.file_path = file_path # # # 添加单日涨停信息数据到文件中的一行 函数 # def append_data_to_file(self, data_to_append): # # print(f"data_to_append=={data_to_append}") # # 读取所有行并解析为 JSON 对象列表 # if os.path.exists(self.file_path): # with open(self.file_path, 'r', encoding='utf-8') as file: # # 获取当前日期并格式化 # current_date = datetime.datetime.now().strftime('%Y-%m-%d') # lines = [json.loads(line.strip()) for line in file if line.strip()] # # print(f"lines type=={type(lines)}") # # print(f"lines=={lines}") # # 检查当前日期是否已存在于文件中 # if lines: # 如果读取到的行文件列表不为空(为真) # if lines[-1].get(current_date) is None: # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期) # # 将日期和data_to_append转换为JSON格式的字符串 # json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' # # 打开文件并追加JSON行 # with open(self.file_path, 'a', encoding='utf-8') as file: # file.write(json_line) # else: # logger.info(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)") # else: # json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' # # 打开文件并追加JSON行 # with open(self.file_path, 'a', encoding='utf-8') as file: # file.write(json_line) # # # 清理多余数据函数 # def check_and_remove_oldest_entry(self, max_entries): # # 读取所有行并解析为 JSON 对象列表 # if os.path.exists(self.file_path): # with open(self.file_path, 'r', encoding='utf-8') as file: # lines = [json.loads(line.strip()) for line in file if line.strip()] # else: # lines = [] # # # 如果行数超过限制,移除最早的一些行 # if len(lines) >= max_entries: # # 截断列表,只保留最新的 max_entries 个对象 # lines = lines[-max_entries:] # # 重新打开文件以写入模式,并写入截断后的对象列表为 JSON Lines # with open(self.file_path, 'w', encoding='utf-8') as file: # for obj in lines: # file.write(json.dumps(obj, ensure_ascii=False) + '\n') # # file.write(json.dumps(obj, ensure_ascii=False)) # # # 隔行整理数据并合并装入一个字典数据中调用时返回这个字典数据 函数 # def arrange_limit_up_info(self): # limit_info = {} # # 创建一个列表来存储所有解析的 JSON 对象 # if os.path.exists(self.file_path): # with open(self.file_path, 'r', encoding='utf-8') as file: # for line in file: # # 去除每行末尾的换行符(如果有的话) # line = line.rstrip('\n') # # 将每行解析为一个 JSON 对象 # info = json.loads(line) # # 假设每行都是一个字典数据,且只有一个键值对,其中键是日期 # if isinstance(info, dict) and len(info) == 1: # date, data = list(info.items())[0] # limit_info[date] = data # return limit_info # # # # 构建一个获取读写存储本地的并整理涨停数据的函数 # def get_arrange_limit_up_info(): # # 实例化每日涨停信息整理方法 # manager = DailyLimitUpInfoStorageManager() # manager.append_data_to_file(get_limit_up_info()) # manager.check_and_remove_oldest_entry(max_entries=1000) # # # # 构建一个处理历史涨停涨停信息数据的函数 # def get_handling_limit_up_info(): # # 实例化每日涨停信息整理方法 # history_limit_up_info = DailyLimitUpInfoStorageManager() # data_cache.daily_limit_up_info = history_limit_up_info.arrange_limit_up_info() # # logger.info(f"读本地的日更的历史涨停数据=={data_cache.daily_limit_up_info}") # # # print(f"daily_limit_up_info 类型==={type(data_cache.daily_limit_up_info)}") # # 统计每日主线 # daily_limit_up_info_len = len(data_cache.daily_limit_up_info) # # print(f"daily_limit_up_info_len==={daily_limit_up_info_len}") # # historical_transaction_date_list = [] # date_of_the_day = data_cache.DataCache().today_date # for i in range(daily_limit_up_info_len): # pre_date = hx_qc_value_util.get_previous_trading_date(date_of_the_day) # 获取前一个交易日API # # target_date_str = basic_methods.pre_num_trading_day(data_cache.today_date, daily_limit_up_info_len) # date_format = "%Y-%m-%d" # target_date = datetime.datetime.strptime(pre_date, date_format).strftime("%Y-%m-%d") # historical_transaction_date_list.append(target_date) # date_of_the_day = pre_date # # # print(f"historical_transaction_date_list={historical_transaction_date_list}") # history_sorted_plate_ranking_list = [] # for key, value in data_cache.daily_limit_up_info.items(): # # print(f"key=={key}") # for i in historical_transaction_date_list: # # print(f"i======={i}") # # 找到每上一个交易日对应的本地数据的信息 # if key == i: # # print(f"{key}===找到了!value={value}") # # # plate_ranking_list = [] # # 遍历交易日每一个涨停股的信息 # for v in value: # # print(f"v =={v}") # # 将每一个涨停股的涨停概念和同班级数量 汇编为一个字典 # plate_limit_up_num_dict = { # v[5]: v[20] # } # # 将这个字典数据不重复的添加到概念排名列表中 # if plate_limit_up_num_dict not in plate_ranking_list: # plate_ranking_list.append(plate_limit_up_num_dict) # # plate_ranking_set.add(v[20]) # # print(f"plate_ranking_list={plate_ranking_list}") # # 使用sorted函数和lambda表达式来根据字典的值进行排序 # # 这里我们确保不修改原始字典,仅通过list(x.values())[0]来获取值 # sorted_plate_ranking_list = sorted(plate_ranking_list, key=lambda x: list(x.values())[0], reverse=True) # # logger.info(f"{key}=====>>>>{sorted_plate_ranking_list}") # history_sorted_plate_ranking_list.append(sorted_plate_ranking_list) # # # print(f"history_sorted_plate_ranking_list={history_sorted_plate_ranking_list}") # # for ranking_list in history_sorted_plate_ranking_list: # # print(f"ranking_list={ranking_list}") # # for i in ranking_list: # # print(f"i={i}") # # # 计算历史涨停概念的连续出现次数函数 # def count_key_occurrences(list_of_dicts_lists): # # 创建一个字典来存储每个键的总出现次数 # key_counts = {} # # 遍历列表中的每个字典列表 # for sublist in list_of_dicts_lists: # # 遍历当前字典列表中的每个字典 # for dict_item in sublist: # # 遍历字典中的每个键 # for key in dict_item: # # 如果键不在key_counts中,则初始化计数为0 # if key not in key_counts: # key_counts[key] = 0 # # 增加当前键的计数 # key_counts[key] += 1 # # 打印结果 # for key, count in key_counts.items(): # if count > 1: # logger.info(f"'{key}' 连续出现 {count} 次") # # # 调用函数,传入整个列表 # # count_key_occurrences(history_sorted_plate_ranking_list) # # # daily_limit_up_info_list = list(reversed(daily_limit_up_info_list)) # # print(f"daily_limit_up_info_list==={daily_limit_up_info_list}") # # # 获取昨日涨停代码 (以便与K线对比) # pre_trading_day_limit_up_info = data_cache.daily_limit_up_info.get(data_cache.DataCache().pre_trading_day) # if pre_trading_day_limit_up_info is not None: # yesterday_limit_up_code_list = [] # for i in pre_trading_day_limit_up_info: # symbol_code = basic_methods.format_stock_symbol(i[0]) # limit_up_code = symbol_code # yesterday_limit_up_code_list.append(limit_up_code) # data_cache.yesterday_limit_up_code_list = yesterday_limit_up_code_list # logger.info(f"昨日涨停股票数量=={len(data_cache.yesterday_limit_up_code_list)}") # logger.info(f"昨日涨停代码列表=={yesterday_limit_up_code_list}") # # # code = pre_trading_day_limit_up_info[0][0] # # logger.info(f"股票代码=={code}") # # cor_name = pre_trading_day_limit_up_info[0][1] # # logger.info(f"公司名称=={cor_name}") # # unknown_zero_2 = pre_trading_day_limit_up_info[0][2] # # logger.info(f"未知零值2=={unknown_zero_2}") # # none_data = pre_trading_day_limit_up_info[0][3] # # logger.info(f"空数据=={none_data}") # # # 总市值(万)? # # total_market_value = round((pre_trading_day_limit_up_info[0][4] / 10000), 2) # # logger.info(f"总市值=={total_market_value}(万)?") # # # 最相关概念 # # the_most_relevant_plate = pre_trading_day_limit_up_info[0][5] # # logger.info(f"最相关概念=={the_most_relevant_plate}") # # # 收盘封单金额(万) # # closing_amount = round((pre_trading_day_limit_up_info[0][6] / 10000), 2) # # logger.info(f"收盘封单金额=={closing_amount}(万)") # # # 最大封单金额(万) # # maximum_blocked_amount = round((pre_trading_day_limit_up_info[0][7] / 10000), 2) # # logger.info(f"最大封单金额=={maximum_blocked_amount}(万)") # # # 主力净额 # # main_net_amount = round((pre_trading_day_limit_up_info[0][8] / 10000), 2) # # logger.info(f"主力净额=={main_net_amount}(万)") # # # 主力买 # # main_buyers = round((pre_trading_day_limit_up_info[0][9] / 10000), 2) # # logger.info(f"主力买=={main_buyers}(万)") # # # 主力卖 # # main_sellers = round((pre_trading_day_limit_up_info[0][10] / 10000), 2) # # logger.info(f"主力卖=={main_sellers}(万)") # # # 成交额 # # transaction_amount = round((pre_trading_day_limit_up_info[0][11] / 10000), 2) # # logger.info(f"成交额=={transaction_amount}(万)") # # # 所属精选板块 # # selected_plate = pre_trading_day_limit_up_info[0][12] # # logger.info(f"所属精选板块=={selected_plate}") # # # 实际流通 # # actual_circulation = round((pre_trading_day_limit_up_info[0][11] / 100000000), 2) # # logger.info(f"实际流通=={actual_circulation}(亿)") # # # 量比?(不是,不知道是什么) # # equivalent_ratio = pre_trading_day_limit_up_info[0][14] # # logger.info(f"量比?=={equivalent_ratio}") # # # 领涨次数 # # leading_increases_times = pre_trading_day_limit_up_info[0][15] # # logger.info(f"领涨次数=={leading_increases_times}") # # # 未知零值 # # unknown_zero_16 = pre_trading_day_limit_up_info[0][16] # # logger.info(f"未知零值16=={unknown_zero_16}") # # # 未知零值 # # unknown_zero_17 = pre_trading_day_limit_up_info[0][17] # # logger.info(f"未知零值17=={unknown_zero_17}") # # # 第几板(连续涨停天数) # # continuous_limit_up_days = pre_trading_day_limit_up_info[0][18] # # logger.info(f"第几板=={continuous_limit_up_days}") # # # 最相关概念的代码 # # the_most_relevant_plate_code = pre_trading_day_limit_up_info[0][19] # # logger.info(f"最相关概念的代码=={the_most_relevant_plate_code}") # # # 同班级的数量(同概念涨停数量) # # the_same_class_amount = pre_trading_day_limit_up_info[0][20] # # logger.info(f"同概念涨停数量=={the_same_class_amount}") # # # # get_handling_limit_up_info() # # # # 获取全部个股的板块并存储的函数 # def get_all_stocks_plate_dict(stocks_list): # all_stocks_plate_dict = {} # # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中 # for i in stocks_list: # try: # code = i.split('.')[1] # # print(f"i==={i}") # # 获取个股的自由市值 # free_market_value = getZYLTAmount(code) # # 获取个股的板块列表 # selected_blocks = getStockIDPlate(code) # # 提取精选板块中的板块名称 # selected_plate_list = [block[1] for block in selected_blocks] # # print(f"selected_block_names==={selected_block_list}") # block_data = { # # 添加自由市值 # 'free_market_value': free_market_value, # # 添加精选板块 # 'plate': selected_plate_list # } # # 将code作为键,stocks_selected_block_data作为值添加到stocks_block_data字典中 # all_stocks_plate_dict[code] = block_data # # print(f"all_stocks_plate_dict==={all_stocks_plate_dict}") # except Exception as e: # print(f"获取全部个股的板块并存储的函数 An error occurred: {e}") # finally: # pass # # return stocks_plate_data # # print(f"all_stocks_plate_dict==={len(all_stocks_plate_dict)}") # # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中 # # 将字典转换为JSON格式的字符串 # json_data = json.dumps(all_stocks_plate_dict) # # 写入文件 # with open(constant.ALL_STOCKS_PLATE_PATH, 'w', encoding='utf-8') as f: # f.write(json_data) # now_time = datetime.datetime.now() # 获取本机时间 # logger.info(f"写入所有个股板块文件完成!::{now_time}") # # # # 计算开盘啦昨日拉取的概念数据中为空的股票数量函数 # def get_have_no_plate_num(): # # 初始化无概念数量 # have_no_plate_num = 0 # plate_are_null_list = [] # for k, v in data_cache.all_stocks_plate_dict.items(): # pass # # print(f"i==={i} T==={t}") # if len(v['plate']) == 0: # have_no_plate_num += 1 # # print(f"{k}的概念为空") # # logger.info(f"{k}的概念为空") # # 股票代码格式转化为掘金格式 # symbol = basic_methods.format_stock_symbol(k) # sec_name = data_cache.all_stocks_all_K_line_property_dict.get(symbol) # if sec_name is not None: # plate_are_null_list.append(sec_name) # logger.info(f"有{have_no_plate_num}只股票概念为空") # logger.info(f"个股有历史K线但概念为空的有:{plate_are_null_list}") # # # # 获取全部个股的精选板块并存储的函数 # def stocks_list_selected_blocks(min_stocks): # stocks_selected_block_data = [] # # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中 # for i in min_stocks: # try: # code = i.split('.')[1] # # 获取个股的自由市值 # free_market_value = getZYLTAmount(code) # # 获取个股的精选板块列表 # # selected_blocks = getCodeJingXuanBlocks('000021') # selected_blocks = getCodeJingXuanBlocks(code) # # 提取精选板块中的板块名称 # selected_block_list = [block[1] for block in selected_blocks] # # print(f"selected_block_names==={selected_block_list}") # stocks_selected_block_dict = { # # 添加股票代码 # 'code': code, # # 添加自由市值 # 'free_market_value': free_market_value, # # 添加精选板块 # 'selected_block': selected_block_list # } # stocks_selected_block_data.append(stocks_selected_block_dict) # # print(f"stocks_selected_block_data==={stocks_selected_block_dict}") # except Exception as e: # logger.error(f"获取全部个股的精选板块并存储的函数 An error occurred: {e}") # # # print(f"stocks_selected_block_data==={len(stocks_selected_block_data)}") # # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中 # # 将字典转换为JSON格式的字符串 # json_data = json.dumps(stocks_selected_block_data) # # 写入文件 # with open('local_storage_data/stocks_selected_block_data.json', 'w', encoding='utf-8') as f: # f.write(json_data) # now_time = datetime.datetime.now() # 获取本机时间 # print(f"写入精选板块文件完成!::{now_time}") # # # # kpl_stocks_list_selected_blocks_process() #在 kpl_api.py中可以调用 # # stocks_list_selected_blocks(min_stocks) #在 kpl_api.py中可以调用 # # list = ['SHSE.600805','SHSE.600804'] # # # # all_stocks_plate_dict(list) # strategy/local_data_management.py
@@ -3,11 +3,7 @@ import constant from strategy import data_cache from log_module.log import logger_common # 获取logger实例 logger = logger_common # 读取本地的所有带属性K线数据(所有个股K线及指数线) @@ -40,7 +36,7 @@ else: json_data = "{}" data_cache.all_stocks_plate_dict = json.loads(json_data) logger.info(f"all_stocks_plate_dict的数量={len(data_cache.all_stocks_plate_dict)}") logger_common.info(f"all_stocks_plate_dict的数量={len(data_cache.all_stocks_plate_dict)}") if __name__ == '__main__': strategy/logging_config.py
@@ -6,6 +6,7 @@ from datetime import datetime import constant # 构造一个loger对象函数 def get_logger(name='my_logger', log_dir=constant.LOG_PATH, log_level=logging.INFO): """ strategy/market_sentiment_analysis.py
@@ -548,7 +548,6 @@ # 对于未知类型,你可以选择保留原样、跳过或引发异常 # 这里我们选择保留原样 return obj try: json_data = json.dumps(convert_datetime(all_index_k_line_property_dict), ensure_ascii=False, indent=4) # 将转换后的JSON字符串写入文件 @@ -561,8 +560,145 @@ logger.info(f"加属性的指数k线写完了!{tool.get_now_time_str()}") # 获取实时大盘行情情绪综合强度 [分数] 函数 并 计算当日计划持仓数量 def get_real_time_market_strong(): # 计算市场分布形态因子 函数 def calculate_market_distribution_pattern_factor(data): """ 原生Python数据预处理 返回: sorted_bins : 排序后的涨跌幅区间(整数列表) sorted_counts : 对应区间的股票数量 total : 总股票数 pos_counts : 正涨跌区间的数量列表 neg_counts : 负涨跌区间的数量列表 """ # 将字符串键转为整数并排序 sorted_items = sorted(data.items(), key=lambda x: int(x[0])) sorted_bins = [int(k) for k, v in sorted_items] sorted_counts = [v for k, v in sorted_items] # 计算总股票数 total = sum(sorted_counts) # 分离正负区间(排除0%) pos_counts = [v for k, v in sorted_items if int(k) > 0] neg_counts = [v for k, v in sorted_items if int(k) < 0] return sorted_bins, sorted_counts, total, pos_counts, neg_counts # 执行预处理 bins, counts, total, pos_counts, neg_counts = calculate_market_distribution_pattern_factor(data_cache.rise_and_fall_statistics_dirt) # ====================== 因子计算模块 ====================== class NativeFactorCalculator: def __init__(self, bins, counts, total, pos_counts, neg_counts): self.bins = bins self.counts = counts # todo total 会默认为0的情况 if total != 0: self.total = total else: self.total = 1 self.pos_counts = pos_counts self.neg_counts = neg_counts def calculate_bdr(self): """原生涨跌比计算""" sum_pos = sum(self.pos_counts) sum_neg = sum(self.neg_counts) return sum_pos / sum_neg if sum_neg != 0 else float('nan') def calculate_extreme_ratio(self): """极端波动比例计算""" extreme_total = 0 for b, c in zip(self.bins, self.counts): if b >= 5 or b <= -5: extreme_total += c return extreme_total / self.total def calculate_market_breadth(self): """市场宽度因子计算""" diff = sum(self.pos_counts) - sum(self.neg_counts) return diff / self.total def _expand_distribution(self): """将分箱数据展开为原始数据点(用于计算统计指标)""" expanded = [] for value, count in zip(self.bins, self.counts): expanded.extend([value] * count) return expanded def calculate_distribution_metrics(self): """原生偏度与峰度计算""" data = self._expand_distribution() n = len(data) if n < 2: return (float('nan'), float('nan')) # 计算均值 mean = sum(data) / n # 计算标准差 variance = sum((x - mean) ** 2 for x in data) / (n - 1) std_dev = variance ** 0.5 # 计算偏度 skewness = (sum((x - mean) ** 3 for x in data) / n) / (std_dev ** 3) if std_dev != 0 else 0 # 计算峰度(Fisher定义,即正态分布峰度为0) kurtosis = (sum((x - mean) ** 4 for x in data) / n) / (std_dev ** 4) - 3 if std_dev != 0 else 0 return (skewness, kurtosis) # 实例化并计算因子 calculator = NativeFactorCalculator(bins, counts, total, pos_counts, neg_counts) bdr = calculator.calculate_bdr() extreme_ratio = calculator.calculate_extreme_ratio() market_breadth = calculator.calculate_market_breadth() skewness, kurtosis = calculator.calculate_distribution_metrics() # ====================== 策略信号生成模块 ====================== class NativeTradingStrategy: def __init__(self, bdr_threshold=1.5, extreme_threshold=0.15): self.bdr_threshold = bdr_threshold self.extreme_threshold = extreme_threshold def generate_signals(self, bdr, extreme_ratio, market_breadth, skewness): """生成交易信号(与之前相同,保持策略逻辑)""" signals = [] # 信号1:基于涨跌比 if bdr > self.bdr_threshold: signals.append("多头趋势增强 - 加仓指数ETF") elif bdr < 1 / self.bdr_threshold: signals.append("空头趋势增强 - 减仓或反向操作") else: signals.append("趋势中性 - 保持当前仓位") # 信号2:极端波动 if extreme_ratio > self.extreme_threshold: signals.append("波动风险升高 - 启动对冲或降低杠杆") else: signals.append("波动正常 - 维持现有风险敞口") # 信号3:市场宽度与偏度 if market_breadth > 0.2 and skewness > 0.5: signals.append("市场普涨且偏度正向 - 超配行业龙头股") elif market_breadth < -0.2 and skewness < -0.5: signals.append("市场普跌且偏度负向 - 低估值防御型配置") return signals # 实例化策略并生成信号 strategy = NativeTradingStrategy() signals = strategy.generate_signals(bdr, extreme_ratio, market_breadth, skewness) # 实时设置计划持仓数量 函数 def set_plan_position_quantity(): while True: try: if data_cache.position_automatic_management_switch is True: @@ -571,26 +707,24 @@ # 获取大盘综合强度分数 data_cache.real_time_market_strong = kpl_api.get_market_strong() # 获取市场情绪字典【完整】,并整理 data_cache.real_time_market_sentiment_dirt = kpl_api.changeStatistics() date_today = data_cache.real_time_market_sentiment_dirt.get('Day', None) significant_drawdown = data_cache.real_time_market_sentiment_dirt.get('df_num', None) sentiment_indicators = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) limit_up_amount = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) connecting_board_height = data_cache.real_time_market_sentiment_dirt.get('lbgd', None) data_cache.real_time_market_sentiment_dirt = kpl_api.changeStatistics() # 市场情绪字典字典 date_today = data_cache.real_time_market_sentiment_dirt.get('Day', None) # 情绪数据日期 significant_drawdown = data_cache.real_time_market_sentiment_dirt.get('df_num', None) # 大幅回撤 sentiment_indicators = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) # 情绪指标 limit_up_amount = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) # 涨停家数 connecting_board_height = data_cache.real_time_market_sentiment_dirt.get('lbgd', None) # 连板高度 # 获取市场情绪-涨跌统计 data_cache.rise_and_fall_statistics_dirt = kpl_api.getMarketFelling() limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('ZT', None) actual_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJZT', None) ST_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('STZT', None) limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('DT', None) actual_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJDT', None) ST_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('STDT', None) data_cache.rise_and_fall_statistics_dirt = kpl_api.getMarketFelling() # 涨跌统计字典 limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('ZT', None) # 涨停家数 actual_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJZT', None) # 实际涨停家数 ST_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('STZT', None) # ST涨停家数 limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('DT', None) # 跌停家数 actual_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJDT', None) # 实际跌停家数 ST_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('STDT', None) # ST跌停家数 rise_numbers = data_cache.rise_and_fall_statistics_dirt.get('SZJS', None) fall_numbers = data_cache.rise_and_fall_statistics_dirt.get('XDJS', None) # 该logger.info的的日志不再需要打印,后续将转入到GUI客户端上直接显示,该数据的打印交由下方的打印机制异步执行单独存储,以便后续可视化呈现后进行更高效的数据分析 # logger.info(f"大盘行情情绪综合强度 [分数]==={data_cache.real_time_market_strong}分") # 控制打印日志的时间段 if data_cache.MORN_MARKET_CLOSING_TIME < now_time < data_cache.NOON_MARKET_OPENING_TIME: pass logger.info(f"午间休市时间内 不打印大盘综合强度分数") @@ -599,7 +733,7 @@ # logger_Overall_market_strength_score.info(data_cache.real_time_market_strong) async_log_util.info(logger_Overall_market_strength_score, f"{data_cache.real_time_market_strong}") logger.info(f"日期:{date_today},情绪指标:{sentiment_indicators}分,大幅回撤:{significant_drawdown},涨停家数:{limit_up_amount},连板高度:{connecting_board_height}") logger.info(f"上涨家数:{rise_numbers}分,下跌家数:{fall_numbers},实际涨停家数:{actual_limit_up_numbers},实际跌停家数:{actual_limit_down_numbers}") logger.info(f"上涨家数:{rise_numbers},下跌家数:{fall_numbers},实际涨停家数:{actual_limit_up_numbers},实际跌停家数:{actual_limit_down_numbers}") logger.info(f"涨跌统计字典{data_cache.rise_and_fall_statistics_dirt}") usefulMoney = data_cache.account_finance_dict[0].get('usefulMoney', 0) @@ -642,8 +776,8 @@ except Exception as error: logger_debug.exception(error) logger.error(f"获取实时大盘行情情绪综合强度[分数] 函数报错: {error}") print(f"获取实时大盘行情情绪综合强度[分数] 函数报错: {error}") logger.error(f"实时设置计划持仓数量 函数报错: {error}") print(f"实时设置计划持仓数量 函数报错: {error}") finally: time.sleep(3) @@ -651,9 +785,19 @@ if __name__ == '__main__': # market_strong = kpl_api.get_market_strong() # print(f"{market_strong}") get_real_time_market_strong() # get_real_time_market_strong() # all_index_K_line_dict = get_index_K_line() # all_index_k_line_dict_write() # print(f"指数K线{data_cache.all_index_k_line_property_dict}") # all_index_k_line_dict_write() # all_index_k_line_dict_write() # ====================== 结果输出 ====================== print("========== 因子数值(原生计算) ==========") print(f"涨跌比(BDR): {bdr:.2f}") print(f"极端波动比例: {extreme_ratio:.2%}") print(f"市场宽度因子: {market_breadth:.2f}") print(f"分布偏度: {skewness:.2f}, 峰度: {kurtosis:.2f}") print("\n========== 策略信号 ==========") for i, signal in enumerate(signals, 1): print(f"信号{i}: {signal}") strategy/plate_strength_analysis.py
New file @@ -0,0 +1,514 @@ import datetime import json import os import time import dask import constant from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_debug from strategy import kpl_api, data_cache, basic_methods from utils import tool, hx_qc_value_util # 获取行情精选板块 强度排名 def get_market_sift_plate_its_stock_power(): @dask.delayed def batch_get_plate_codes(fs): return fs @dask.delayed def request_plate_codes(i): plate_name = i[1] log_data = None its_stock = json.loads(kpl_api.getCodesByPlate(i[0])) now_time_str = tool.get_now_time_str() if data_cache.OPENING_TIME < now_time_str < data_cache.NOON_MARKET_TIME: log_data = {plate_name: its_stock['list']} # 尝试过滤掉无意义的概念板块(plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革','超跌', '壳资源', '股权转让', '送转填权']) and '增长' in plate_name if (plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革', '超跌', '壳资源', '股权转让', '送转填权']) or ('增长' in plate_name): # print(f"{i[1]} 强度:{i[2]}") # 通过板块ID获取其下面的个股强度列表 # print(f"======={i[0]}=======") # its_stock_list_info = its_stock['list'] # logger.info(f"its_stock_list_info==={its_stock_list_info}") # 将板块强度下面对应的个股列表打印到日志中 # for i in its_stock_list_info: # if i[0] != 1: # logger.info( # f"l === 个股代码:{i[0]},公司名称:{i[1]},主力资金推测:{i[2]},未知0值:{i[3]},概念:{i[4]},最新价:{i[5]},当日当时涨幅:{i[6]}%," # f"成交额:{round(i[7] / 100000000, 2)} 亿,实际换手率:{i[8]}%,未知0值:{i[9]},实际流通:{round(i[10] / 100000000, 2)}亿," # f"主力买:{round(i[11] / 100000000, 2)}亿," # f"主力卖:{round(i[12] / 100000000, 2)}亿," # f"主力净额:{round(i[13] / 10000, 2)}万,买成占比:{i[14]}%,卖成占比:{i[15]}%,净成占比:{i[16]}%,买流占比:{i[17]}%,卖流占比:{i[18]}%,净流占比:{i[19]}%," # f"区间涨幅:{i[20]}%,量比:{i[21]},未知0:{i[22]},上板情况:{i[23]},上板排名:{i[24]},换手率:{i[25]}%," # f"未知空值:{i[26]},未知零值:{i[27]},收盘封单:{i[28]},最大封单:{i[29]},未知空值?:{i[30]}," # f"?:{i[30]}%,?:{i[31]},??:{i[32]},振幅:{i[33]}%,未知0????:{i[34]},未知0?????:{i[35]}," # f"?=:{i[36]},?总市值:{i[37]},?流通市值:{i[38]},最终归属概念(收盘后出数据?):{i[39]},领涨次数:{i[40]}," # f"41未知1值:{i[41]},第三季度机构持仓【str数据勿用运算符】:{i[42]}万,?年预测净利润:{i[43]},上年预测净利润:{i[44]},年内预测净利润:{i[45]}" # ) # 初始化股票强度列表 stock_power_list = [] for s in its_stock['list']: # 过滤掉涨幅大于 and s[6] < 6.5 且小于0%的 和 名称中包含ST的 和 涨速小于等于0%的 和 只要昨日未涨停 和 上证或深证的正股 and s[9] > 0.0025 if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find("板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1: # print(f"{s[1]},个股代码:{s[0]}, 涨幅:{s[6]}% 涨速:{s[9]}% 概念:{s[4]} 主力资金推测:{s[2]} 领涨次数:{s[40]} 今日第几板:{s[23]} 是否破版{s[24]}") # 对个股强度 主要 属性列表进行装填 its_stock_power = [s[1], s[0], s[6], s[9], s[4], s[2], s[40]] # 逐个选择性添加its_stock中的元素到个股强度列表中 # print(f"its_stock_power===={its_stock_power}") # 整体将添加完善的个股强度列表添加到股票列表中 stock_power_list.append(its_stock_power) # print(f"stock_power_list===={stock_power_list}") # 过滤掉没有瞬时高强度个股的空概念 if len(stock_power_list) != 0: # 将对应板块的股票强度列表新建一个字典 stock_power_item = {i[1]: stock_power_list} # 并更新到精选板块个股字典中 market_sift_plate_stock_dict.update(stock_power_item) return log_data data = (kpl_api.getMarketJingXuanRealRankingInfo()) market_sift_plate = json.loads(data) # logger_kpl_jingxuan_in 打印的日志专用于开盘了数据的存储分析,不能轻易删除 # print(f"market_sift_plate 数 ======{len(market_sift_plate['list'])}") # 行情》精选板块》排名前20中》对应个股》符合条件的个股 # logger.info(f"market_sift_plate['list']======{market_sift_plate['list']}") # logger.info(f"market_sift_plate['list'][0] ======{market_sift_plate['list'][0]}") # 初始化精选板块对应个股字典 market_sift_plate_stock_dict = {} if 'list' in market_sift_plate: ds = [] for d in market_sift_plate['list']: ds.append(request_plate_codes(d)) dask_result = batch_get_plate_codes(ds) compute_results = dask_result.compute() log_datas = {} for r in compute_results: if not r: continue for b in r: log_datas[b] = r[b] now_time = tool.get_now_time_str() if data_cache.L1_DATA_START_TIME < now_time < data_cache.NOON_MARKET_TIME: # logger.info(f"精选板块股票强度数据更新 == {market_sift_plate_stock_dict}") # 只在盘中时间获取 kpl_api.KPLStockOfMarketsPlateLogManager().add_log(market_sift_plate['list'], log_datas) return market_sift_plate_stock_dict # 调用一下获取精选板块股票强度数据函数 【本模块内使用时调用】 # get_market_sift_plate_its_stock_power() def get_market_sift_plate_its_stock_power_process(callback): while True: try: # now = time.time() # print(f"kpl_limit_up_process开始了{now}") start_time = time.time() now_time = tool.get_now_time_str() if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME: its_stock_power = get_market_sift_plate_its_stock_power() time_str = datetime.datetime.now().strftime("%H%M%S") if 92900 < int(time_str) < 95000: logger_kpl_jingxuan_in.info(f"耗时:{time.time() - start_time} 数据:{its_stock_power}") callback(its_stock_power) # print(f"精选板块拉升个股更新===={its_stock_power}") except Exception as e: logger_debug.exception(f"开盘啦板块强度线程报错An error occurred: {e}") finally: time.sleep(2) # 获取涨停板块名称列表并存储本地的函数 def get_limit_up_block_names(): # 设定当前时间点 now_time = tool.get_now_time_str() # print(f"now_time===={now_time}") if data_cache.SERVER_RESTART_TIME < now_time < data_cache.UPDATE_DATA_TIME: # print(f"在时间内使用--------------------------") # 获取涨停信息列表 limit_up_info = kpl_api.get_limit_up_info() # print(f"limit_up_info=={limit_up_info}") data_cache.limit_up_info = kpl_api.get_limit_up_info() # 提取涨停列表中的板块名称 limit_up_block_names = [] # 循环添加涨停概念 for i in limit_up_info: limit_up_block_names.append(i[5]) # print(f"limit_up_block_names==={limit_up_block_names}") # return limit_up_block_names # # 使用Counter计算每个元素的出现次数 # counter = Counter(limit_up_block_names) # # 找出出现次数最多的元素及其次数 # most_common_element, most_common_count = counter.most_common(1)[0] # # 打印出现次数最多的元素 # print(f"主线概念:{most_common_element},出现了 {most_common_count} 次") return limit_up_block_names # 为开盘啦接口获取的涨停列表概念板块单独开一个进程 形参(callback) def kpl_limit_up_process(callback): while True: try: # now = time.time() # print(f"kpl_limit_up_process开始了{now}") limit_up_block_names = get_limit_up_block_names() callback(limit_up_block_names) # logger.info(f"涨停更新===={limit_up_block_names}") # print(f"涨停更新数量===={len(limit_up_block_names)}") # print(f"kpl_limit_up_process完成一下{now}") except Exception as e: logger_debug.error(f"开盘啦涨停板块概念线程报错An error occurred: {e}") finally: time.sleep(1.5) # kpl_limit_up_process() # 构建涨停信息读写对象 class DailyLimitUpInfoStorageManager: # 初始化文件路径 def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH): self.file_path = file_path # 添加单日涨停信息数据到文件中的一行 函数 def append_data_to_file(self, data_to_append): # print(f"data_to_append=={data_to_append}") # 读取所有行并解析为 JSON 对象列表 if os.path.exists(self.file_path): with open(self.file_path, 'r', encoding='utf-8') as file: # 获取当前日期并格式化 current_date = datetime.datetime.now().strftime('%Y-%m-%d') lines = [json.loads(line.strip()) for line in file if line.strip()] # print(f"lines type=={type(lines)}") # print(f"lines=={lines}") # 检查当前日期是否已存在于文件中 if lines: # 如果读取到的行文件列表不为空(为真) if lines[-1].get(current_date) is None: # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期) # 将日期和data_to_append转换为JSON格式的字符串 json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' # 打开文件并追加JSON行 with open(self.file_path, 'a', encoding='utf-8') as file: file.write(json_line) else: logger_common.info(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)") else: json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' # 打开文件并追加JSON行 with open(self.file_path, 'a', encoding='utf-8') as file: file.write(json_line) # 清理多余数据函数 def check_and_remove_oldest_entry(self, max_entries): # 读取所有行并解析为 JSON 对象列表 if os.path.exists(self.file_path): with open(self.file_path, 'r', encoding='utf-8') as file: lines = [json.loads(line.strip()) for line in file if line.strip()] else: lines = [] # 如果行数超过限制,移除最早的一些行 if len(lines) >= max_entries: # 截断列表,只保留最新的 max_entries 个对象 lines = lines[-max_entries:] # 重新打开文件以写入模式,并写入截断后的对象列表为 JSON Lines with open(self.file_path, 'w', encoding='utf-8') as file: for obj in lines: file.write(json.dumps(obj, ensure_ascii=False) + '\n') # file.write(json.dumps(obj, ensure_ascii=False)) # 隔行整理数据并合并装入一个字典数据中调用时返回这个字典数据 函数 def arrange_limit_up_info(self): limit_info = {} # 创建一个列表来存储所有解析的 JSON 对象 if os.path.exists(self.file_path): with open(self.file_path, 'r', encoding='utf-8') as file: for line in file: # 去除每行末尾的换行符(如果有的话) line = line.rstrip('\n') # 将每行解析为一个 JSON 对象 info = json.loads(line) # 假设每行都是一个字典数据,且只有一个键值对,其中键是日期 if isinstance(info, dict) and len(info) == 1: date, data = list(info.items())[0] limit_info[date] = data return limit_info # 构建一个获取读写存储本地的并整理涨停数据的函数 def get_arrange_limit_up_info(): # 实例化每日涨停信息整理方法 manager = DailyLimitUpInfoStorageManager() manager.append_data_to_file(kpl_api.get_limit_up_info()) manager.check_and_remove_oldest_entry(max_entries=1000) # 构建一个处理历史涨停涨停信息数据的函数 def get_handling_limit_up_info(): # 实例化每日涨停信息整理方法 history_limit_up_info = DailyLimitUpInfoStorageManager() data_cache.daily_limit_up_info = history_limit_up_info.arrange_limit_up_info() # logger.info(f"读本地的日更的历史涨停数据=={data_cache.daily_limit_up_info}") # print(f"daily_limit_up_info 类型==={type(data_cache.daily_limit_up_info)}") # 统计每日主线 daily_limit_up_info_len = len(data_cache.daily_limit_up_info) # print(f"daily_limit_up_info_len==={daily_limit_up_info_len}") historical_transaction_date_list = [] date_of_the_day = data_cache.DataCache().today_date for i in range(daily_limit_up_info_len): pre_date = hx_qc_value_util.get_previous_trading_date(date_of_the_day) # 获取前一个交易日API # target_date_str = basic_methods.pre_num_trading_day(data_cache.today_date, daily_limit_up_info_len) date_format = "%Y-%m-%d" target_date = datetime.datetime.strptime(pre_date, date_format).strftime("%Y-%m-%d") historical_transaction_date_list.append(target_date) date_of_the_day = pre_date # print(f"historical_transaction_date_list={historical_transaction_date_list}") history_sorted_plate_ranking_list = [] for key, value in data_cache.daily_limit_up_info.items(): # print(f"key=={key}") for i in historical_transaction_date_list: # print(f"i======={i}") # 找到每上一个交易日对应的本地数据的信息 if key == i: # print(f"{key}===找到了!value={value}") # plate_ranking_list = [] # 遍历交易日每一个涨停股的信息 for v in value: # print(f"v =={v}") # 将每一个涨停股的涨停概念和同班级数量 汇编为一个字典 plate_limit_up_num_dict = { v[5]: v[20] } # 将这个字典数据不重复的添加到概念排名列表中 if plate_limit_up_num_dict not in plate_ranking_list: plate_ranking_list.append(plate_limit_up_num_dict) # plate_ranking_set.add(v[20]) # print(f"plate_ranking_list={plate_ranking_list}") # 使用sorted函数和lambda表达式来根据字典的值进行排序 # 这里我们确保不修改原始字典,仅通过list(x.values())[0]来获取值 sorted_plate_ranking_list = sorted(plate_ranking_list, key=lambda x: list(x.values())[0], reverse=True) # logger.info(f"{key}=====>>>>{sorted_plate_ranking_list}") history_sorted_plate_ranking_list.append(sorted_plate_ranking_list) # print(f"history_sorted_plate_ranking_list={history_sorted_plate_ranking_list}") # for ranking_list in history_sorted_plate_ranking_list: # print(f"ranking_list={ranking_list}") # for i in ranking_list: # print(f"i={i}") # 计算历史涨停概念的连续出现次数函数 def count_key_occurrences(list_of_dicts_lists): # 创建一个字典来存储每个键的总出现次数 key_counts = {} # 遍历列表中的每个字典列表 for sublist in list_of_dicts_lists: # 遍历当前字典列表中的每个字典 for dict_item in sublist: # 遍历字典中的每个键 for key in dict_item: # 如果键不在key_counts中,则初始化计数为0 if key not in key_counts: key_counts[key] = 0 # 增加当前键的计数 key_counts[key] += 1 # 打印结果 for key, count in key_counts.items(): if count > 1: logger_common.info(f"'{key}' 连续出现 {count} 次") # 调用函数,传入整个列表 # count_key_occurrences(history_sorted_plate_ranking_list) # daily_limit_up_info_list = list(reversed(daily_limit_up_info_list)) # print(f"daily_limit_up_info_list==={daily_limit_up_info_list}") # 获取昨日涨停代码 (以便与K线对比) pre_trading_day_limit_up_info = data_cache.daily_limit_up_info.get(data_cache.DataCache().pre_trading_day) if pre_trading_day_limit_up_info is not None: yesterday_limit_up_code_list = [] for i in pre_trading_day_limit_up_info: symbol_code = basic_methods.format_stock_symbol(i[0]) limit_up_code = symbol_code yesterday_limit_up_code_list.append(limit_up_code) data_cache.yesterday_limit_up_code_list = yesterday_limit_up_code_list logger_common.info(f"昨日涨停股票数量=={len(data_cache.yesterday_limit_up_code_list)}") logger_common.info(f"昨日涨停代码列表=={yesterday_limit_up_code_list}") # code = pre_trading_day_limit_up_info[0][0] # logger.info(f"股票代码=={code}") # cor_name = pre_trading_day_limit_up_info[0][1] # logger.info(f"公司名称=={cor_name}") # unknown_zero_2 = pre_trading_day_limit_up_info[0][2] # logger.info(f"未知零值2=={unknown_zero_2}") # none_data = pre_trading_day_limit_up_info[0][3] # logger.info(f"空数据=={none_data}") # # 总市值(万)? # total_market_value = round((pre_trading_day_limit_up_info[0][4] / 10000), 2) # logger.info(f"总市值=={total_market_value}(万)?") # # 最相关概念 # the_most_relevant_plate = pre_trading_day_limit_up_info[0][5] # logger.info(f"最相关概念=={the_most_relevant_plate}") # # 收盘封单金额(万) # closing_amount = round((pre_trading_day_limit_up_info[0][6] / 10000), 2) # logger.info(f"收盘封单金额=={closing_amount}(万)") # # 最大封单金额(万) # maximum_blocked_amount = round((pre_trading_day_limit_up_info[0][7] / 10000), 2) # logger.info(f"最大封单金额=={maximum_blocked_amount}(万)") # # 主力净额 # main_net_amount = round((pre_trading_day_limit_up_info[0][8] / 10000), 2) # logger.info(f"主力净额=={main_net_amount}(万)") # # 主力买 # main_buyers = round((pre_trading_day_limit_up_info[0][9] / 10000), 2) # logger.info(f"主力买=={main_buyers}(万)") # # 主力卖 # main_sellers = round((pre_trading_day_limit_up_info[0][10] / 10000), 2) # logger.info(f"主力卖=={main_sellers}(万)") # # 成交额 # transaction_amount = round((pre_trading_day_limit_up_info[0][11] / 10000), 2) # logger.info(f"成交额=={transaction_amount}(万)") # # 所属精选板块 # selected_plate = pre_trading_day_limit_up_info[0][12] # logger.info(f"所属精选板块=={selected_plate}") # # 实际流通 # actual_circulation = round((pre_trading_day_limit_up_info[0][11] / 100000000), 2) # logger.info(f"实际流通=={actual_circulation}(亿)") # # 量比?(不是,不知道是什么) # equivalent_ratio = pre_trading_day_limit_up_info[0][14] # logger.info(f"量比?=={equivalent_ratio}") # # 领涨次数 # leading_increases_times = pre_trading_day_limit_up_info[0][15] # logger.info(f"领涨次数=={leading_increases_times}") # # 未知零值 # unknown_zero_16 = pre_trading_day_limit_up_info[0][16] # logger.info(f"未知零值16=={unknown_zero_16}") # # 未知零值 # unknown_zero_17 = pre_trading_day_limit_up_info[0][17] # logger.info(f"未知零值17=={unknown_zero_17}") # # 第几板(连续涨停天数) # continuous_limit_up_days = pre_trading_day_limit_up_info[0][18] # logger.info(f"第几板=={continuous_limit_up_days}") # # 最相关概念的代码 # the_most_relevant_plate_code = pre_trading_day_limit_up_info[0][19] # logger.info(f"最相关概念的代码=={the_most_relevant_plate_code}") # # 同班级的数量(同概念涨停数量) # the_same_class_amount = pre_trading_day_limit_up_info[0][20] # logger.info(f"同概念涨停数量=={the_same_class_amount}") # get_handling_limit_up_info() # 获取全部个股的板块并存储的函数 def get_all_stocks_plate_dict(stocks_list): all_stocks_plate_dict = {} # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中 for i in stocks_list: try: code = i.split('.')[1] # print(f"i==={i}") # 获取个股的自由市值 free_market_value = kpl_api.getZYLTAmount(code) # 获取个股的板块列表 selected_blocks = kpl_api.getStockIDPlate(code) # 提取精选板块中的板块名称 selected_plate_list = [block[1] for block in selected_blocks] # print(f"selected_block_names==={selected_block_list}") block_data = { # 添加自由市值 'free_market_value': free_market_value, # 添加精选板块 'plate': selected_plate_list } # 将code作为键,stocks_selected_block_data作为值添加到stocks_block_data字典中 all_stocks_plate_dict[code] = block_data # print(f"all_stocks_plate_dict==={all_stocks_plate_dict}") except Exception as e: print(f"获取全部个股的板块并存储的函数 An error occurred: {e}") finally: pass # return stocks_plate_data # print(f"all_stocks_plate_dict==={len(all_stocks_plate_dict)}") # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中 # 将字典转换为JSON格式的字符串 json_data = json.dumps(all_stocks_plate_dict) # 写入文件 with open(constant.ALL_STOCKS_PLATE_PATH, 'w', encoding='utf-8') as f: f.write(json_data) now_time = datetime.datetime.now() # 获取本机时间 logger_common.info(f"写入所有个股板块文件完成!::{now_time}") # 计算开盘啦昨日拉取的概念数据中为空的股票数量函数 def get_have_no_plate_num(): # 初始化无概念数量 have_no_plate_num = 0 plate_are_null_list = [] for k, v in data_cache.all_stocks_plate_dict.items(): pass # print(f"i==={i} T==={t}") if len(v['plate']) == 0: have_no_plate_num += 1 # print(f"{k}的概念为空") # logger.info(f"{k}的概念为空") # 股票代码格式转化为掘金格式 symbol = basic_methods.format_stock_symbol(k) sec_name = data_cache.all_stocks_all_K_line_property_dict.get(symbol) if sec_name is not None: plate_are_null_list.append(sec_name) logger_common.info(f"有{have_no_plate_num}只股票概念为空") print(f"有{have_no_plate_num}只股票概念为空") logger_common.info(f"个股有历史K线但概念为空的有:{plate_are_null_list}") # 获取全部个股的精选板块并存储的函数 def stocks_list_selected_blocks(min_stocks): stocks_selected_block_data = [] # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中 for i in min_stocks: try: code = i.split('.')[1] # 获取个股的自由市值 free_market_value = kpl_api.getZYLTAmount(code) # 获取个股的精选板块列表 # selected_blocks = getCodeJingXuanBlocks('000021') selected_blocks = kpl_api.getCodeJingXuanBlocks(code) # 提取精选板块中的板块名称 selected_block_list = [block[1] for block in selected_blocks] # print(f"selected_block_names==={selected_block_list}") stocks_selected_block_dict = { # 添加股票代码 'code': code, # 添加自由市值 'free_market_value': free_market_value, # 添加精选板块 'selected_block': selected_block_list } stocks_selected_block_data.append(stocks_selected_block_dict) # print(f"stocks_selected_block_data==={stocks_selected_block_dict}") except Exception as e: logger_debug.error(f"获取全部个股的精选板块并存储的函数 An error occurred: {e}") # print(f"stocks_selected_block_data==={len(stocks_selected_block_data)}") # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中 # 将字典转换为JSON格式的字符串 json_data = json.dumps(stocks_selected_block_data) # 写入文件 with open('local_storage_data/stocks_selected_block_data.json', 'w', encoding='utf-8') as f: f.write(json_data) now_time = datetime.datetime.now() # 获取本机时间 print(f"写入精选板块文件完成!::{now_time}") if __name__ == '__main__': # get_have_no_plate_num() pass strategy/selling_strategy.py
@@ -311,22 +311,22 @@ # 平开视界 < ±1% if -1 < today_open_growth < 1: logger_info( f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【浮盈】【平开】,平开:{today_open_growth}%,设定委卖数量【十分之五仓】") order_methods.sell_order_by_part_volume(0.5, symbol, position_volume_yesterday, f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【浮盈】【平开】,平开:{today_open_growth}%,设定委卖数量【四分之一仓】") order_methods.sell_order_by_part_volume(0.25, symbol, position_volume_yesterday, current_price, k_line_data[0]['sec_name'], index) # 小低开视界 if -4 < today_open_growth <= -1: logger_info( f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【浮盈】【小低开】,低开:{today_open_growth}%,设定委卖数量【十分之七仓】") order_methods.sell_order_by_part_volume(0.7, symbol, position_volume_yesterday, f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【浮盈】【小低开】,低开:{today_open_growth}%,设定委卖数量【十分之三仓】") order_methods.sell_order_by_part_volume(0.3, symbol, position_volume_yesterday, current_price, k_line_data[0]['sec_name'], index) # 中低开/大低开视界 if today_open_growth <= -4: logger_info( f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【中低开/大低开】,低开:{today_open_growth}%,设定委卖数量【全仓】") order_methods.sell_order_by_part_volume(1, symbol, position_volume_yesterday, f"【集合竞价】【{k_line_data[0]['sec_name']}】【昨日炸板】【中低开/大低开】,低开:{today_open_growth}%,设定委卖数量【半仓】") order_methods.sell_order_by_part_volume(0.5, symbol, position_volume_yesterday, current_price, k_line_data[0]['sec_name'], index)