import datetime import json import os import time import dask import constant from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_debug, logger_kpl_market_sift_plate from strategy import kpl_api, data_cache, basic_methods from strategy.kpl_data_manager import KPLMarketStockHeatLogManager from utils import tool, hx_qc_value_util # 获取行情精选板块 强度排名 def get_market_sift_plate_its_stock_power(): @dask.delayed def batch_get_plate_codes(fs): return fs @dask.delayed def request_plate_codes(i): plate_name = i[1] its_stock = json.loads(kpl_api.getCodesByPlate(i[0])) # now_time_str = tool.get_now_time_str() # if data_cache.OPENING_TIME < now_time_str < data_cache.NOON_MARKET_TIME: log_data = {plate_name: its_stock['list']} # 尝试过滤掉无意义的概念板块 【代表着有无强度可能】 if (plate_name not in ['科创板', '北交所', '无', '并购重组', '国企改革', '超跌', '壳资源', '股权转让', '送转填权']) or ('次新' in plate_name or 'ST' in plate_name or '破净股' in plate_name): # print(f"{i[1]} 强度:{i[2]}") # 通过板块ID获取其下面的个股强度列表 # print(f"======={i[0]}=======") # its_stock_list_info = its_stock['list'] # logger.info(f"its_stock_list_info==={its_stock_list_info}") # 将板块强度下面对应的个股列表打印到日志中 # for i in its_stock_list_info: # if i[0] != 1: # logger.info( # f"l === 个股代码:{i[0]},公司名称:{i[1]},主力资金推测:{i[2]},未知0值:{i[3]},概念:{i[4]},最新价:{i[5]},当日当时涨幅:{i[6]}%," # f"成交额:{round(i[7] / 100000000, 2)} 亿,实际换手率:{i[8]}%,未知0值:{i[9]},实际流通:{round(i[10] / 100000000, 2)}亿," # f"主力买:{round(i[11] / 100000000, 2)}亿," # f"主力卖:{round(i[12] / 100000000, 2)}亿," # f"主力净额:{round(i[13] / 10000, 2)}万,买成占比:{i[14]}%,卖成占比:{i[15]}%,净成占比:{i[16]}%,买流占比:{i[17]}%,卖流占比:{i[18]}%,净流占比:{i[19]}%," # f"区间涨幅:{i[20]}%,量比:{i[21]},未知0:{i[22]},上板情况:{i[23]},上板排名:{i[24]},换手率:{i[25]}%," # f"未知空值:{i[26]},未知零值:{i[27]},收盘封单:{i[28]},最大封单:{i[29]},未知空值?:{i[30]}," # f"?:{i[30]}%,?:{i[31]},??:{i[32]},振幅:{i[33]}%,未知0????:{i[34]},未知0?????:{i[35]}," # f"?=:{i[36]},?总市值:{i[37]},?流通市值:{i[38]},最终归属概念(收盘后出数据?):{i[39]},领涨次数:{i[40]}," # f"41未知1值:{i[41]},上季度机构持仓【str数据勿用运算符】:{i[42]}万,?年预测净利润:{i[43]},上年预测净利润:{i[44]},年内预测净利润:{i[45]}" # ) # 初始化股票强度列表 stock_power_list = [] for s in its_stock['list']: # 过滤掉涨幅大于 当日涨幅s[6] < 0% 的 和 名称中包含ST的 和 涨速小于等于0%的 和 只要昨日未涨停 和 上证或深证的正股 and s[9] > 0.0025 上季度机构持仓 >0 if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find("板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1 and int(s[42]) >= 0: # print(f"{s[1]},个股代码:{s[0]}, 涨幅:{s[6]}% 涨速:{s[9]}% 概念:{s[4]} 主力资金推测:{s[2]} 领涨次数:{s[40]} 今日第几板:{s[23]} 是否破版{s[24]}") # 对个股强度 主要 属性列表进行装填 its_stock_power = [s[1], s[0], s[6], s[9], s[4], s[2], s[40]] # 逐个选择性添加its_stock中的元素到个股强度列表中 # print(f"its_stock_power===={its_stock_power}") # 整体将添加完善的个股强度列表添加到股票列表中 stock_power_list.append(its_stock_power) # print(f"stock_power_list===={stock_power_list}") # 过滤掉没有瞬时高强度个股的空概念 if len(stock_power_list) != 0: # 将对应板块的股票强度列表新建一个字典 stock_power_item = {i[1]: stock_power_list} # 并更新到精选板块个股字典中 market_sift_plate_stock_dict.update(stock_power_item) return log_data # 定义一个时间段,在这个时间段内才会执行下面的代码,主要就是把强度数据作为日志打印存储下来。 now_time = tool.get_now_time_str() data = (kpl_api.getMarketJingXuanRealRankingInfo()) market_sift_plate = json.loads(data) # print(f"market_sift_plate 数 ======{len(market_sift_plate['list'])}") # 精选板块【前20】 market_sift_plate['list'] ====== if data_cache.OPENING_TIME < now_time < data_cache.AFTER_CLOSING_TIME: logger_kpl_market_sift_plate.info(f"{market_sift_plate['list']}") # 总控制时间段 # TODO 测试 if not (data_cache.OPENING_TIME < now_time < data_cache.NOON_MARKET_TIME) or True: return # ['801235', '化工', 6996, 0.027, 2.43, 117836347690, -122548038, 8105997595, -8228545633, 0.92, 8595377775454, 0.09, 332297449, 9954902621130, -192457252, 24.0487, 17.1809, 6996, 0.027] # market_sift_plate['list'][0] = ['801062', '军工', 3520, -0.49, 0.666, 139133934669, 383864272, 9077352839, -8693488567, 1.183, 6129448037490,-0.12, 168245858, 7088854452019, -290614763, 50.2408, 30.3672, 3520, 0] # 行情精选板块列表 前20 中的 第一个板块列表数据 = 【代码,板块名称,强度,涨幅?,量比?,成交额?,现额?,主买,主卖,1.183?,流通值?,-0.12?,300W大单净额?,总市值?,上季度机构增仓,今年平均PE,次年平均PE,强度,未知0值】 # logger.info(f"market_sift_plate['list'][0] ======{market_sift_plate['list'][0]}") # 初始化精选板块对应个股字典 market_sift_plate_stock_dict = {} if 'list' in market_sift_plate: ds = [] for d in market_sift_plate['list']: ds.append(request_plate_codes(d)) dask_result = batch_get_plate_codes(ds) compute_results = dask_result.compute() log_datas = {} for r in compute_results: if not r: continue for b in r: log_datas[b] = r[b] # logger.info(f"精选板块股票强度数据更新 == {market_sift_plate_stock_dict}") # 只在盘中时间获取 KPLMarketStockHeatLogManager().add_log(market_sift_plate['list'], log_datas) # 行情》精选板块》排名前20中》对应个股》符合条件的个股 return market_sift_plate_stock_dict # 调用一下获取精选板块股票强度数据函数 【本模块内使用时调用】 # get_market_sift_plate_its_stock_power() def get_market_sift_plate_its_stock_power_process(callback): while True: try: # now = time.time() # print(f"kpl_limit_up_process开始了{now}") start_time = time.time() now_time = tool.get_now_time_str() # TODO 测试 if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME or True: its_stock_power = get_market_sift_plate_its_stock_power() time_str = datetime.datetime.now().strftime("%H%M%S") if 92900 < int(time_str) < 95000: # logger_kpl_jingxuan_in 打印的日志专用于开盘了数据的存储分析,不能轻易删除 logger_kpl_jingxuan_in.info(f"耗时:{time.time() - start_time} 数据:{its_stock_power}") callback(its_stock_power) # print(f"精选板块拉升个股更新===={its_stock_power}") except Exception as e: logger_debug.exception(f"开盘啦板块强度线程报错An error occurred: {e}") finally: time.sleep(2) # 获取涨停板块名称列表并存储本地的函数 def get_limit_up_block_names(): # 设定当前时间点 now_time = tool.get_now_time_str() # print(f"now_time===={now_time}") if data_cache.SERVER_RESTART_TIME < now_time < data_cache.UPDATE_DATA_TIME: # print(f"在时间内使用--------------------------") # 获取涨停信息列表 limit_up_info = kpl_api.get_limit_up_info() # print(f"limit_up_info=={limit_up_info}") data_cache.limit_up_info = kpl_api.get_limit_up_info() # 提取涨停列表中的板块名称 limit_up_block_names = [] # 循环添加涨停概念 for i in limit_up_info: limit_up_block_names.append(i[5]) # print(f"limit_up_block_names==={limit_up_block_names}") # return limit_up_block_names # # 使用Counter计算每个元素的出现次数 # counter = Counter(limit_up_block_names) # # 找出出现次数最多的元素及其次数 # most_common_element, most_common_count = counter.most_common(1)[0] # # 打印出现次数最多的元素 # print(f"主线概念:{most_common_element},出现了 {most_common_count} 次") return limit_up_block_names # 为开盘啦接口获取的涨停列表概念板块单独开一个进程 形参(callback) def kpl_limit_up_process(callback): while True: try: # now = time.time() # print(f"kpl_limit_up_process开始了{now}") limit_up_block_names = get_limit_up_block_names() callback(limit_up_block_names) # logger.info(f"涨停更新===={limit_up_block_names}") # print(f"涨停更新数量===={len(limit_up_block_names)}") # print(f"kpl_limit_up_process完成一下{now}") except Exception as e: logger_debug.error(f"开盘啦涨停板块概念线程报错An error occurred: {e}") finally: time.sleep(1.5) # kpl_limit_up_process() # 构建涨停信息读写对象 class DailyLimitUpInfoStorageManager: # 初始化文件路径 def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH): self.file_path = file_path # 添加单日涨停信息数据到文件中的一行 函数 def append_data_to_file(self, data_to_append): # print(f"data_to_append=={data_to_append}") # 读取所有行并解析为 JSON 对象列表 if os.path.exists(self.file_path): with open(self.file_path, 'r', encoding='utf-8') as file: # 获取当前日期并格式化 current_date = datetime.datetime.now().strftime('%Y-%m-%d') lines = [json.loads(line.strip()) for line in file if line.strip()] # print(f"lines type=={type(lines)}") # print(f"lines=={lines}") # 检查当前日期是否已存在于文件中 if lines: # 如果读取到的行文件列表不为空(为真) if lines[-1].get(current_date) is None: # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期) # 将日期和data_to_append转换为JSON格式的字符串 json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' # 打开文件并追加JSON行 with open(self.file_path, 'a', encoding='utf-8') as file: file.write(json_line) else: logger_common.info(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)") else: json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' # 打开文件并追加JSON行 with open(self.file_path, 'a', encoding='utf-8') as file: file.write(json_line) # 清理多余数据函数 def check_and_remove_oldest_entry(self, max_entries): # 读取所有行并解析为 JSON 对象列表 if os.path.exists(self.file_path): with open(self.file_path, 'r', encoding='utf-8') as file: lines = [json.loads(line.strip()) for line in file if line.strip()] else: lines = [] # 如果行数超过限制,移除最早的一些行 if len(lines) >= max_entries: # 截断列表,只保留最新的 max_entries 个对象 lines = lines[-max_entries:] # 重新打开文件以写入模式,并写入截断后的对象列表为 JSON Lines with open(self.file_path, 'w', encoding='utf-8') as file: for obj in lines: file.write(json.dumps(obj, ensure_ascii=False) + '\n') # file.write(json.dumps(obj, ensure_ascii=False)) # 隔行整理数据并合并装入一个字典数据中调用时返回这个字典数据 函数 def arrange_limit_up_info(self): limit_info = {} # 创建一个列表来存储所有解析的 JSON 对象 if os.path.exists(self.file_path): with open(self.file_path, 'r', encoding='utf-8') as file: for line in file: # 去除每行末尾的换行符(如果有的话) line = line.rstrip('\n') # 将每行解析为一个 JSON 对象 info = json.loads(line) # 假设每行都是一个字典数据,且只有一个键值对,其中键是日期 if isinstance(info, dict) and len(info) == 1: date, data = list(info.items())[0] limit_info[date] = data return limit_info # 构建一个获取读写存储本地的并整理涨停数据的函数 def get_arrange_limit_up_info(): # 实例化每日涨停信息整理方法 manager = DailyLimitUpInfoStorageManager() manager.append_data_to_file(kpl_api.get_limit_up_info()) manager.check_and_remove_oldest_entry(max_entries=1000) # 构建一个处理历史涨停涨停信息数据的函数 def get_handling_limit_up_info(): # 实例化每日涨停信息整理方法 history_limit_up_info = DailyLimitUpInfoStorageManager() data_cache.daily_limit_up_info = history_limit_up_info.arrange_limit_up_info() # logger.info(f"读本地的日更的历史涨停数据=={data_cache.daily_limit_up_info}") # print(f"daily_limit_up_info 类型==={type(data_cache.daily_limit_up_info)}") # 统计每日主线 daily_limit_up_info_len = len(data_cache.daily_limit_up_info) # print(f"daily_limit_up_info_len==={daily_limit_up_info_len}") historical_transaction_date_list = [] date_of_the_day = data_cache.DataCache().today_date for i in range(daily_limit_up_info_len): pre_date = hx_qc_value_util.get_previous_trading_date(date_of_the_day) # 获取前一个交易日API # target_date_str = basic_methods.pre_num_trading_day(data_cache.today_date, daily_limit_up_info_len) date_format = "%Y-%m-%d" target_date = datetime.datetime.strptime(pre_date, date_format).strftime("%Y-%m-%d") historical_transaction_date_list.append(target_date) date_of_the_day = pre_date # print(f"historical_transaction_date_list={historical_transaction_date_list}") history_sorted_plate_ranking_list = [] for key, value in data_cache.daily_limit_up_info.items(): # print(f"key=={key}") for i in historical_transaction_date_list: # print(f"i======={i}") # 找到每上一个交易日对应的本地数据的信息 if key == i: # print(f"{key}===找到了!value={value}") # plate_ranking_list = [] # 遍历交易日每一个涨停股的信息 for v in value: # print(f"v =={v}") # 将每一个涨停股的涨停概念和同班级数量 汇编为一个字典 plate_limit_up_num_dict = { v[5]: v[20] } # 将这个字典数据不重复的添加到概念排名列表中 if plate_limit_up_num_dict not in plate_ranking_list: plate_ranking_list.append(plate_limit_up_num_dict) # plate_ranking_set.add(v[20]) # print(f"plate_ranking_list={plate_ranking_list}") # 使用sorted函数和lambda表达式来根据字典的值进行排序 # 这里我们确保不修改原始字典,仅通过list(x.values())[0]来获取值 sorted_plate_ranking_list = sorted(plate_ranking_list, key=lambda x: list(x.values())[0], reverse=True) # logger.info(f"{key}=====>>>>{sorted_plate_ranking_list}") history_sorted_plate_ranking_list.append(sorted_plate_ranking_list) # print(f"history_sorted_plate_ranking_list={history_sorted_plate_ranking_list}") # for ranking_list in history_sorted_plate_ranking_list: # print(f"ranking_list={ranking_list}") # for i in ranking_list: # print(f"i={i}") # 计算历史涨停概念的连续出现次数函数 def count_key_occurrences(list_of_dicts_lists): # 创建一个字典来存储每个键的总出现次数 key_counts = {} # 遍历列表中的每个字典列表 for sublist in list_of_dicts_lists: # 遍历当前字典列表中的每个字典 for dict_item in sublist: # 遍历字典中的每个键 for key in dict_item: # 如果键不在key_counts中,则初始化计数为0 if key not in key_counts: key_counts[key] = 0 # 增加当前键的计数 key_counts[key] += 1 # 打印结果 for key, count in key_counts.items(): if count > 1: logger_common.info(f"'{key}' 连续出现 {count} 次") # 调用函数,传入整个列表 # count_key_occurrences(history_sorted_plate_ranking_list) # daily_limit_up_info_list = list(reversed(daily_limit_up_info_list)) # print(f"daily_limit_up_info_list==={daily_limit_up_info_list}") # 获取昨日涨停代码 (以便与K线对比) pre_trading_day_limit_up_info = data_cache.daily_limit_up_info.get(data_cache.DataCache().pre_trading_day) if pre_trading_day_limit_up_info is not None: yesterday_limit_up_code_list = [] for i in pre_trading_day_limit_up_info: symbol_code = basic_methods.format_stock_symbol(i[0]) limit_up_code = symbol_code yesterday_limit_up_code_list.append(limit_up_code) data_cache.yesterday_limit_up_code_list = yesterday_limit_up_code_list logger_common.info(f"昨日涨停股票数量=={len(data_cache.yesterday_limit_up_code_list)}") logger_common.info(f"昨日涨停代码列表=={yesterday_limit_up_code_list}") # code = pre_trading_day_limit_up_info[0][0] # logger.info(f"股票代码=={code}") # cor_name = pre_trading_day_limit_up_info[0][1] # logger.info(f"公司名称=={cor_name}") # unknown_zero_2 = pre_trading_day_limit_up_info[0][2] # logger.info(f"未知零值2=={unknown_zero_2}") # none_data = pre_trading_day_limit_up_info[0][3] # logger.info(f"空数据=={none_data}") # # 总市值(万)? # total_market_value = round((pre_trading_day_limit_up_info[0][4] / 10000), 2) # logger.info(f"总市值=={total_market_value}(万)?") # # 最相关概念 # the_most_relevant_plate = pre_trading_day_limit_up_info[0][5] # logger.info(f"最相关概念=={the_most_relevant_plate}") # # 收盘封单金额(万) # closing_amount = round((pre_trading_day_limit_up_info[0][6] / 10000), 2) # logger.info(f"收盘封单金额=={closing_amount}(万)") # # 最大封单金额(万) # maximum_blocked_amount = round((pre_trading_day_limit_up_info[0][7] / 10000), 2) # logger.info(f"最大封单金额=={maximum_blocked_amount}(万)") # # 主力净额 # main_net_amount = round((pre_trading_day_limit_up_info[0][8] / 10000), 2) # logger.info(f"主力净额=={main_net_amount}(万)") # # 主力买 # main_buyers = round((pre_trading_day_limit_up_info[0][9] / 10000), 2) # logger.info(f"主力买=={main_buyers}(万)") # # 主力卖 # main_sellers = round((pre_trading_day_limit_up_info[0][10] / 10000), 2) # logger.info(f"主力卖=={main_sellers}(万)") # # 成交额 # transaction_amount = round((pre_trading_day_limit_up_info[0][11] / 10000), 2) # logger.info(f"成交额=={transaction_amount}(万)") # # 所属精选板块 # selected_plate = pre_trading_day_limit_up_info[0][12] # logger.info(f"所属精选板块=={selected_plate}") # # 实际流通 # actual_circulation = round((pre_trading_day_limit_up_info[0][11] / 100000000), 2) # logger.info(f"实际流通=={actual_circulation}(亿)") # # 量比?(不是,不知道是什么) # equivalent_ratio = pre_trading_day_limit_up_info[0][14] # logger.info(f"量比?=={equivalent_ratio}") # # 领涨次数 # leading_increases_times = pre_trading_day_limit_up_info[0][15] # logger.info(f"领涨次数=={leading_increases_times}") # # 未知零值 # unknown_zero_16 = pre_trading_day_limit_up_info[0][16] # logger.info(f"未知零值16=={unknown_zero_16}") # # 未知零值 # unknown_zero_17 = pre_trading_day_limit_up_info[0][17] # logger.info(f"未知零值17=={unknown_zero_17}") # # 第几板(连续涨停天数) # continuous_limit_up_days = pre_trading_day_limit_up_info[0][18] # logger.info(f"第几板=={continuous_limit_up_days}") # # 最相关概念的代码 # the_most_relevant_plate_code = pre_trading_day_limit_up_info[0][19] # logger.info(f"最相关概念的代码=={the_most_relevant_plate_code}") # # 同班级的数量(同概念涨停数量) # the_same_class_amount = pre_trading_day_limit_up_info[0][20] # logger.info(f"同概念涨停数量=={the_same_class_amount}") # get_handling_limit_up_info() # 获取全部个股的板块并存储的函数 def get_all_stocks_plate_dict(stocks_list): all_stocks_plate_dict = {} # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中 for i in stocks_list: try: code = i.split('.')[1] # print(f"i==={i}") # 获取个股的自由市值 free_market_value = kpl_api.getZYLTAmount(code) # 获取个股的板块列表 selected_blocks = kpl_api.getStockIDPlate(code) # 提取精选板块中的板块名称 selected_plate_list = [block[1] for block in selected_blocks] # print(f"selected_block_names==={selected_block_list}") block_data = { # 添加自由市值 'free_market_value': free_market_value, # 添加精选板块 'plate': selected_plate_list } # 将code作为键,stocks_selected_block_data作为值添加到stocks_block_data字典中 all_stocks_plate_dict[code] = block_data # print(f"all_stocks_plate_dict==={all_stocks_plate_dict}") except Exception as e: print(f"获取全部个股的板块并存储的函数 An error occurred: {e}") finally: pass # return stocks_plate_data # print(f"all_stocks_plate_dict==={len(all_stocks_plate_dict)}") # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中 # 将字典转换为JSON格式的字符串 json_data = json.dumps(all_stocks_plate_dict) # 写入文件 with open(constant.ALL_STOCKS_PLATE_PATH, 'w', encoding='utf-8') as f: f.write(json_data) now_time = datetime.datetime.now() # 获取本机时间 logger_common.info(f"写入所有个股板块文件完成!::{now_time}") # 计算开盘啦昨日拉取的概念数据中为空的股票数量函数 def get_have_no_plate_num(): # 初始化无概念数量 have_no_plate_num = 0 plate_are_null_list = [] for k, v in data_cache.all_stocks_plate_dict.items(): pass # print(f"i==={i} T==={t}") if len(v['plate']) == 0: have_no_plate_num += 1 # print(f"{k}的概念为空") # logger.info(f"{k}的概念为空") # 股票代码格式转化为掘金格式 symbol = basic_methods.format_stock_symbol(k) sec_name = data_cache.all_stocks_all_K_line_property_dict.get(symbol) if sec_name is not None: plate_are_null_list.append(sec_name) logger_common.info(f"有{have_no_plate_num}只股票概念为空") print(f"有{have_no_plate_num}只股票概念为空") logger_common.info(f"个股有历史K线但概念为空的有:{plate_are_null_list}") # 获取全部个股的精选板块并存储的函数 def stocks_list_selected_blocks(min_stocks): stocks_selected_block_data = [] # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中 for i in min_stocks: try: code = i.split('.')[1] # 获取个股的自由市值 free_market_value = kpl_api.getZYLTAmount(code) # 获取个股的精选板块列表 # selected_blocks = getCodeJingXuanBlocks('000021') selected_blocks = kpl_api.getCodeJingXuanBlocks(code) # 提取精选板块中的板块名称 selected_block_list = [block[1] for block in selected_blocks] # print(f"selected_block_names==={selected_block_list}") stocks_selected_block_dict = { # 添加股票代码 'code': code, # 添加自由市值 'free_market_value': free_market_value, # 添加精选板块 'selected_block': selected_block_list } stocks_selected_block_data.append(stocks_selected_block_dict) # print(f"stocks_selected_block_data==={stocks_selected_block_dict}") except Exception as e: logger_debug.error(f"获取全部个股的精选板块并存储的函数 An error occurred: {e}") # print(f"stocks_selected_block_data==={len(stocks_selected_block_data)}") # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中 # 将字典转换为JSON格式的字符串 json_data = json.dumps(stocks_selected_block_data) # 写入文件 with open('local_storage_data/stocks_selected_block_data.json', 'w', encoding='utf-8') as f: f.write(json_data) now_time = datetime.datetime.now() # 获取本机时间 print(f"写入精选板块文件完成!::{now_time}") if __name__ == '__main__': # get_have_no_plate_num() pass