main.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/buying_strategy.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/kpl_api.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/market_sentiment_analysis.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/selling_strategy.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
main.py
@@ -200,7 +200,7 @@ l2_data_callbacks = [] # 订阅持仓L2数据 def __subscript_position_l2(): """ 订阅持仓L2数据 strategy/buying_strategy.py
@@ -151,8 +151,8 @@ current_low = k_line_data[0]['close'] # logger_info(f"current_low == 0 赋值为K线昨收价") # 如果公司名称中不包含["ST", "退市", "退", "XD", "XR", "DR", "N"] 则继续逻辑判断 if not any(keyword in k_line_data[0]['sec_name'] for keyword in ["ST", "退市", "退", "XD", "XR", "DR", "N"]): # 如果公司名称中不包含["ST", "退市", "退", "XD", "XR", "DR", "N", "C"] 则继续逻辑判断C:次新股 if not any(keyword in k_line_data[0]['sec_name'] for keyword in ["ST", "退市", "退", "XD", "XR", "DR", "N", "C"]): # print(f"非ST, 退市, 退, XD, N==={k_line_data[0]['sec_name']} 继续判断") # print(f"{k_line_data[0]['sec_name']} 继续判断") if current_low is not None: @@ -331,11 +331,11 @@ f"【不利】自由市值小于6亿!不买!! 公司名称:{k_line_data[0]['sec_name']},最新价: {current_price}") elif len(intersection_plate) > 0: logger_info( f"【不利】同概念只买一次,不买了,重复相交概念==={intersection_plate}") f"【不利】同概念只买一次,不买了,公司名称:{k_line_data[0]['sec_name']},重复相交概念==={intersection_plate}") elif data_cache.have_plate_buy_times >= 3: logger_info(f"【不利】有概念买入已经 3 次了!不买了!!") logger_info(f"【不利】有概念买入已经 3 次了!不买了!!公司名称:{k_line_data[0]['sec_name']},") elif len(data_cache.addition_position_symbols_set) >= 3: logger_info(f"【不利】当日已经买了3只票!不买了!!") logger_info(f"【不利】当日已经买了3只票!不买了!!公司名称:{k_line_data[0]['sec_name']},") elif now_time < data_cache.OPENING_TIME or now_time > data_cache.NOON_MARKET_TIME: logger_info(f"【不利】不在9:30-13:05时间内!不买!!") else: @@ -423,11 +423,11 @@ f"【不利】自由市值小于6亿!不买!! 公司名称:{k_line_data[0]['sec_name']},最新价: {current_price}") elif len(intersection_plate) > 0: logger_info( f"【不利】同概念只买一次,不买了,重复相交概念==={intersection_plate}") f"【不利】同概念只买一次,不买了,公司名称:{k_line_data[0]['sec_name']},重复相交概念==={intersection_plate}") elif data_cache.have_plate_buy_times >= 1: logger_info(f"【不利】有概念无强度买入已经1次了!不买了!!") logger_info(f"【不利】有概念无强度买入已经1次了!不买了!!公司名称:{k_line_data[0]['sec_name']},") elif len(data_cache.addition_position_symbols_set) >= 4: logger_info(f"【不利】当日已经买了3只票!不买了!!") logger_info(f"【不利】当日已经买了3只票!不买了!!公司名称:{k_line_data[0]['sec_name']},") elif now_time < data_cache.OPENING_TIME or now_time > data_cache.NOON_MARKET_TIME: logger_info(f"【不利】不在9:30-13:05时间内!不买!!") else: @@ -519,9 +519,9 @@ logger_info( f"【不利】自由市值小于6亿!不买!! 公司名称:{k_line_data[0]['sec_name']},最新价: {current_price}") elif data_cache.have_strength_buy_times >= 1: logger_info(f"【不利】有强度买入 1 次了!不买了!!") logger_info(f"【不利】有强度买入 1 次了!不买了!!公司名称:{k_line_data[0]['sec_name']},") elif len(data_cache.addition_position_symbols_set) >= 3: logger_info(f"【不利】当日已经买了3只票!不买了!!") logger_info(f"【不利】当日已经买了3只票!不买了!!公司名称:{k_line_data[0]['sec_name']},") elif now_time < data_cache.OPENING_TIME or now_time > data_cache.NOON_MARKET_TIME: logger_info(f"【不利】不在9:30-13:05时间内!不买!!") else: @@ -625,10 +625,10 @@ logger_info( f"【不利】自由市值小于6亿!不买!! 公司名称:{k_line_data[0]['sec_name']},最新价: {current_price}") elif data_cache.have_small_turn_large_buy_times >= 1: logger_info(f"【不利】有小量换大涨幅买入已经 1 次了!不买了!!") logger_info(f"【不利】有小量换大涨幅买入已经 1 次了!不买了!!公司名称:{k_line_data[0]['sec_name']},") elif len(data_cache.addition_position_symbols_set) >= 4: logger_info( f"【不利】当日已经买了4只票!不买了!!") f"【不利】当日已经买了4只票!不买了!!公司名称:{k_line_data[0]['sec_name']},") elif ( data_cache.MORN_MARKET_TIME < now_time < data_cache.NOON_MARKET_TIME) is False or free_market_value < 100 or ( today_open_growth < 5 or today_growth < 5): strategy/kpl_api.py
@@ -233,513 +233,3 @@ print(f"MarketFelling==={MarketFelling}") changeStatistics = changeStatistics() print(f"changeStatistics==={changeStatistics}") # -------------------------------------------------------------------------------------------------------------------------------------------------------------- # # # 获取行情精选板块 强度排名 # def get_market_sift_plate_its_stock_power(): # @dask.delayed # def batch_get_plate_codes(fs): # return fs # # @dask.delayed # def request_plate_codes(i): # plate_name = i[1] # log_data = None # its_stock = json.loads(getCodesByPlate(i[0])) # now_time_str = tool.get_now_time_str() # if data_cache.OPENING_TIME < now_time_str < data_cache.NOON_MARKET_TIME: # log_data = {plate_name: its_stock['list']} # # 尝试过滤掉无意义的概念板块(plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革','超跌', '壳资源', '股权转让', '送转填权']) and '增长' in plate_name # if (plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革', '超跌', # '壳资源', '股权转让', '送转填权']) or ('增长' in plate_name): # # # print(f"{i[1]} 强度:{i[2]}") # # 通过板块ID获取其下面的个股强度列表 # # print(f"======={i[0]}=======") # # # its_stock_list_info = its_stock['list'] # # logger.info(f"its_stock_list_info==={its_stock_list_info}") # # 将板块强度下面对应的个股列表打印到日志中 # # for i in its_stock_list_info: # # if i[0] != 1: # # logger.info( # # f"l === 个股代码:{i[0]},公司名称:{i[1]},主力资金推测:{i[2]},未知0值:{i[3]},概念:{i[4]},最新价:{i[5]},当日当时涨幅:{i[6]}%," # # f"成交额:{round(i[7] / 100000000, 2)} 亿,实际换手率:{i[8]}%,未知0值:{i[9]},实际流通:{round(i[10] / 100000000, 2)}亿," # # f"主力买:{round(i[11] / 100000000, 2)}亿," # # f"主力卖:{round(i[12] / 100000000, 2)}亿," # # f"主力净额:{round(i[13] / 10000, 2)}万,买成占比:{i[14]}%,卖成占比:{i[15]}%,净成占比:{i[16]}%,买流占比:{i[17]}%,卖流占比:{i[18]}%,净流占比:{i[19]}%," # # f"区间涨幅:{i[20]}%,量比:{i[21]},未知0:{i[22]},上板情况:{i[23]},上板排名:{i[24]},换手率:{i[25]}%," # # f"未知空值:{i[26]},未知零值:{i[27]},收盘封单:{i[28]},最大封单:{i[29]},未知空值?:{i[30]}," # # f"?:{i[30]}%,?:{i[31]},??:{i[32]},振幅:{i[33]}%,未知0????:{i[34]},未知0?????:{i[35]}," # # f"?=:{i[36]},?总市值:{i[37]},?流通市值:{i[38]},最终归属概念(收盘后出数据?):{i[39]},领涨次数:{i[40]}," # # f"41未知1值:{i[41]},第三季度机构持仓【str数据勿用运算符】:{i[42]}万,?年预测净利润:{i[43]},上年预测净利润:{i[44]},年内预测净利润:{i[45]}" # # ) # # # 初始化股票强度列表 # stock_power_list = [] # for s in its_stock['list']: # # 过滤掉涨幅大于 and s[6] < 6.5 且小于0%的 和 名称中包含ST的 和 涨速小于等于0%的 和 只要昨日未涨停 和 上证或深证的正股 and s[9] > 0.0025 # if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find("板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1: # # print(f"{s[1]},个股代码:{s[0]}, 涨幅:{s[6]}% 涨速:{s[9]}% 概念:{s[4]} 主力资金推测:{s[2]} 领涨次数:{s[40]} 今日第几板:{s[23]} 是否破版{s[24]}") # # 对个股强度 主要 属性列表进行装填 # its_stock_power = [s[1], s[0], s[6], s[9], s[4], s[2], s[40]] # # 逐个选择性添加its_stock中的元素到个股强度列表中 # # print(f"its_stock_power===={its_stock_power}") # # 整体将添加完善的个股强度列表添加到股票列表中 # stock_power_list.append(its_stock_power) # # print(f"stock_power_list===={stock_power_list}") # # 过滤掉没有瞬时高强度个股的空概念 # if len(stock_power_list) != 0: # # 将对应板块的股票强度列表新建一个字典 # stock_power_item = {i[1]: stock_power_list} # # 并更新到精选板块个股字典中 # market_sift_plate_stock_dict.update(stock_power_item) # return log_data # # data = (getMarketJingXuanRealRankingInfo()) # market_sift_plate = json.loads(data) # # logger_kpl_jingxuan_in 打印的日志专用于开盘了数据的存储分析,不能轻易删除 # # print(f"market_sift_plate 数 ======{len(market_sift_plate['list'])}") # # 行情》精选板块》排名前20中》对应个股》符合条件的个股 # # logger.info(f"market_sift_plate['list']======{market_sift_plate['list']}") # # logger.info(f"market_sift_plate['list'][0] ======{market_sift_plate['list'][0]}") # # 初始化精选板块对应个股字典 # market_sift_plate_stock_dict = {} # if 'list' in market_sift_plate: # ds = [] # for d in market_sift_plate['list']: # ds.append(request_plate_codes(d)) # dask_result = batch_get_plate_codes(ds) # compute_results = dask_result.compute() # log_datas = {} # for r in compute_results: # if not r: # continue # for b in r: # log_datas[b] = r[b] # now_time = tool.get_now_time_str() # if data_cache.L1_DATA_START_TIME < now_time < data_cache.NOON_MARKET_TIME: # # logger.info(f"精选板块股票强度数据更新 == {market_sift_plate_stock_dict}") # # 只在盘中时间获取 # KPLStockOfMarketsPlateLogManager().add_log(market_sift_plate['list'], log_datas) # # return market_sift_plate_stock_dict # # # # 调用一下获取精选板块股票强度数据函数 【本模块内使用时调用】 # # get_market_sift_plate_its_stock_power() # # def get_market_sift_plate_its_stock_power_process(callback): # while True: # try: # # now = time.time() # # print(f"kpl_limit_up_process开始了{now}") # start_time = time.time() # now_time = tool.get_now_time_str() # if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME: # its_stock_power = get_market_sift_plate_its_stock_power() # time_str = datetime.datetime.now().strftime("%H%M%S") # if 92900 < int(time_str) < 95000: # logger_kpl_jingxuan_in.info(f"耗时:{time.time() - start_time} 数据:{its_stock_power}") # callback(its_stock_power) # # print(f"精选板块拉升个股更新===={its_stock_power}") # except Exception as e: # logger_debug.exception(e) # logger.error(f"开盘啦板块强度线程报错An error occurred: {e}") # finally: # time.sleep(2) # # # # 获取涨停板块名称列表并存储本地的函数 # def get_limit_up_block_names(): # # 设定当前时间点 # now_time = tool.get_now_time_str() # # print(f"now_time===={now_time}") # if data_cache.SERVER_RESTART_TIME < now_time < data_cache.UPDATE_DATA_TIME: # # print(f"在时间内使用--------------------------") # # 获取涨停信息列表 # limit_up_info = get_limit_up_info() # # print(f"limit_up_info=={limit_up_info}") # data_cache.limit_up_info = get_limit_up_info() # # 提取涨停列表中的板块名称 # limit_up_block_names = [] # # 循环添加涨停概念 # for i in limit_up_info: # limit_up_block_names.append(i[5]) # # print(f"limit_up_block_names==={limit_up_block_names}") # # return limit_up_block_names # # # 使用Counter计算每个元素的出现次数 # # counter = Counter(limit_up_block_names) # # # 找出出现次数最多的元素及其次数 # # most_common_element, most_common_count = counter.most_common(1)[0] # # # 打印出现次数最多的元素 # # print(f"主线概念:{most_common_element},出现了 {most_common_count} 次") # # return limit_up_block_names # # # # 为开盘啦接口获取的涨停列表概念板块单独开一个进程 形参(callback) # def kpl_limit_up_process(callback): # while True: # try: # # now = time.time() # # print(f"kpl_limit_up_process开始了{now}") # limit_up_block_names = get_limit_up_block_names() # callback(limit_up_block_names) # # logger.info(f"涨停更新===={limit_up_block_names}") # # print(f"涨停更新数量===={len(limit_up_block_names)}") # # print(f"kpl_limit_up_process完成一下{now}") # except Exception as e: # logger.error(f"开盘啦涨停板块概念线程报错An error occurred: {e}") # finally: # time.sleep(1.5) # # # # kpl_limit_up_process() # # # # 构建涨停信息读写对象 # class DailyLimitUpInfoStorageManager: # # 初始化文件路径 # def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH): # self.file_path = file_path # # # 添加单日涨停信息数据到文件中的一行 函数 # def append_data_to_file(self, data_to_append): # # print(f"data_to_append=={data_to_append}") # # 读取所有行并解析为 JSON 对象列表 # if os.path.exists(self.file_path): # with open(self.file_path, 'r', encoding='utf-8') as file: # # 获取当前日期并格式化 # current_date = datetime.datetime.now().strftime('%Y-%m-%d') # lines = [json.loads(line.strip()) for line in file if line.strip()] # # print(f"lines type=={type(lines)}") # # print(f"lines=={lines}") # # 检查当前日期是否已存在于文件中 # if lines: # 如果读取到的行文件列表不为空(为真) # if lines[-1].get(current_date) is None: # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期) # # 将日期和data_to_append转换为JSON格式的字符串 # json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' # # 打开文件并追加JSON行 # with open(self.file_path, 'a', encoding='utf-8') as file: # file.write(json_line) # else: # logger.info(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)") # else: # json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' # # 打开文件并追加JSON行 # with open(self.file_path, 'a', encoding='utf-8') as file: # file.write(json_line) # # # 清理多余数据函数 # def check_and_remove_oldest_entry(self, max_entries): # # 读取所有行并解析为 JSON 对象列表 # if os.path.exists(self.file_path): # with open(self.file_path, 'r', encoding='utf-8') as file: # lines = [json.loads(line.strip()) for line in file if line.strip()] # else: # lines = [] # # # 如果行数超过限制,移除最早的一些行 # if len(lines) >= max_entries: # # 截断列表,只保留最新的 max_entries 个对象 # lines = lines[-max_entries:] # # 重新打开文件以写入模式,并写入截断后的对象列表为 JSON Lines # with open(self.file_path, 'w', encoding='utf-8') as file: # for obj in lines: # file.write(json.dumps(obj, ensure_ascii=False) + '\n') # # file.write(json.dumps(obj, ensure_ascii=False)) # # # 隔行整理数据并合并装入一个字典数据中调用时返回这个字典数据 函数 # def arrange_limit_up_info(self): # limit_info = {} # # 创建一个列表来存储所有解析的 JSON 对象 # if os.path.exists(self.file_path): # with open(self.file_path, 'r', encoding='utf-8') as file: # for line in file: # # 去除每行末尾的换行符(如果有的话) # line = line.rstrip('\n') # # 将每行解析为一个 JSON 对象 # info = json.loads(line) # # 假设每行都是一个字典数据,且只有一个键值对,其中键是日期 # if isinstance(info, dict) and len(info) == 1: # date, data = list(info.items())[0] # limit_info[date] = data # return limit_info # # # # 构建一个获取读写存储本地的并整理涨停数据的函数 # def get_arrange_limit_up_info(): # # 实例化每日涨停信息整理方法 # manager = DailyLimitUpInfoStorageManager() # manager.append_data_to_file(get_limit_up_info()) # manager.check_and_remove_oldest_entry(max_entries=1000) # # # # 构建一个处理历史涨停涨停信息数据的函数 # def get_handling_limit_up_info(): # # 实例化每日涨停信息整理方法 # history_limit_up_info = DailyLimitUpInfoStorageManager() # data_cache.daily_limit_up_info = history_limit_up_info.arrange_limit_up_info() # # logger.info(f"读本地的日更的历史涨停数据=={data_cache.daily_limit_up_info}") # # # print(f"daily_limit_up_info 类型==={type(data_cache.daily_limit_up_info)}") # # 统计每日主线 # daily_limit_up_info_len = len(data_cache.daily_limit_up_info) # # print(f"daily_limit_up_info_len==={daily_limit_up_info_len}") # # historical_transaction_date_list = [] # date_of_the_day = data_cache.DataCache().today_date # for i in range(daily_limit_up_info_len): # pre_date = hx_qc_value_util.get_previous_trading_date(date_of_the_day) # 获取前一个交易日API # # target_date_str = basic_methods.pre_num_trading_day(data_cache.today_date, daily_limit_up_info_len) # date_format = "%Y-%m-%d" # target_date = datetime.datetime.strptime(pre_date, date_format).strftime("%Y-%m-%d") # historical_transaction_date_list.append(target_date) # date_of_the_day = pre_date # # # print(f"historical_transaction_date_list={historical_transaction_date_list}") # history_sorted_plate_ranking_list = [] # for key, value in data_cache.daily_limit_up_info.items(): # # print(f"key=={key}") # for i in historical_transaction_date_list: # # print(f"i======={i}") # # 找到每上一个交易日对应的本地数据的信息 # if key == i: # # print(f"{key}===找到了!value={value}") # # # plate_ranking_list = [] # # 遍历交易日每一个涨停股的信息 # for v in value: # # print(f"v =={v}") # # 将每一个涨停股的涨停概念和同班级数量 汇编为一个字典 # plate_limit_up_num_dict = { # v[5]: v[20] # } # # 将这个字典数据不重复的添加到概念排名列表中 # if plate_limit_up_num_dict not in plate_ranking_list: # plate_ranking_list.append(plate_limit_up_num_dict) # # plate_ranking_set.add(v[20]) # # print(f"plate_ranking_list={plate_ranking_list}") # # 使用sorted函数和lambda表达式来根据字典的值进行排序 # # 这里我们确保不修改原始字典,仅通过list(x.values())[0]来获取值 # sorted_plate_ranking_list = sorted(plate_ranking_list, key=lambda x: list(x.values())[0], reverse=True) # # logger.info(f"{key}=====>>>>{sorted_plate_ranking_list}") # history_sorted_plate_ranking_list.append(sorted_plate_ranking_list) # # # print(f"history_sorted_plate_ranking_list={history_sorted_plate_ranking_list}") # # for ranking_list in history_sorted_plate_ranking_list: # # print(f"ranking_list={ranking_list}") # # for i in ranking_list: # # print(f"i={i}") # # # 计算历史涨停概念的连续出现次数函数 # def count_key_occurrences(list_of_dicts_lists): # # 创建一个字典来存储每个键的总出现次数 # key_counts = {} # # 遍历列表中的每个字典列表 # for sublist in list_of_dicts_lists: # # 遍历当前字典列表中的每个字典 # for dict_item in sublist: # # 遍历字典中的每个键 # for key in dict_item: # # 如果键不在key_counts中,则初始化计数为0 # if key not in key_counts: # key_counts[key] = 0 # # 增加当前键的计数 # key_counts[key] += 1 # # 打印结果 # for key, count in key_counts.items(): # if count > 1: # logger.info(f"'{key}' 连续出现 {count} 次") # # # 调用函数,传入整个列表 # # count_key_occurrences(history_sorted_plate_ranking_list) # # # daily_limit_up_info_list = list(reversed(daily_limit_up_info_list)) # # print(f"daily_limit_up_info_list==={daily_limit_up_info_list}") # # # 获取昨日涨停代码 (以便与K线对比) # pre_trading_day_limit_up_info = data_cache.daily_limit_up_info.get(data_cache.DataCache().pre_trading_day) # if pre_trading_day_limit_up_info is not None: # yesterday_limit_up_code_list = [] # for i in pre_trading_day_limit_up_info: # symbol_code = basic_methods.format_stock_symbol(i[0]) # limit_up_code = symbol_code # yesterday_limit_up_code_list.append(limit_up_code) # data_cache.yesterday_limit_up_code_list = yesterday_limit_up_code_list # logger.info(f"昨日涨停股票数量=={len(data_cache.yesterday_limit_up_code_list)}") # logger.info(f"昨日涨停代码列表=={yesterday_limit_up_code_list}") # # # code = pre_trading_day_limit_up_info[0][0] # # logger.info(f"股票代码=={code}") # # cor_name = pre_trading_day_limit_up_info[0][1] # # logger.info(f"公司名称=={cor_name}") # # unknown_zero_2 = pre_trading_day_limit_up_info[0][2] # # logger.info(f"未知零值2=={unknown_zero_2}") # # none_data = pre_trading_day_limit_up_info[0][3] # # logger.info(f"空数据=={none_data}") # # # 总市值(万)? # # total_market_value = round((pre_trading_day_limit_up_info[0][4] / 10000), 2) # # logger.info(f"总市值=={total_market_value}(万)?") # # # 最相关概念 # # the_most_relevant_plate = pre_trading_day_limit_up_info[0][5] # # logger.info(f"最相关概念=={the_most_relevant_plate}") # # # 收盘封单金额(万) # # closing_amount = round((pre_trading_day_limit_up_info[0][6] / 10000), 2) # # logger.info(f"收盘封单金额=={closing_amount}(万)") # # # 最大封单金额(万) # # maximum_blocked_amount = round((pre_trading_day_limit_up_info[0][7] / 10000), 2) # # logger.info(f"最大封单金额=={maximum_blocked_amount}(万)") # # # 主力净额 # # main_net_amount = round((pre_trading_day_limit_up_info[0][8] / 10000), 2) # # logger.info(f"主力净额=={main_net_amount}(万)") # # # 主力买 # # main_buyers = round((pre_trading_day_limit_up_info[0][9] / 10000), 2) # # logger.info(f"主力买=={main_buyers}(万)") # # # 主力卖 # # main_sellers = round((pre_trading_day_limit_up_info[0][10] / 10000), 2) # # logger.info(f"主力卖=={main_sellers}(万)") # # # 成交额 # # transaction_amount = round((pre_trading_day_limit_up_info[0][11] / 10000), 2) # # logger.info(f"成交额=={transaction_amount}(万)") # # # 所属精选板块 # # selected_plate = pre_trading_day_limit_up_info[0][12] # # logger.info(f"所属精选板块=={selected_plate}") # # # 实际流通 # # actual_circulation = round((pre_trading_day_limit_up_info[0][11] / 100000000), 2) # # logger.info(f"实际流通=={actual_circulation}(亿)") # # # 量比?(不是,不知道是什么) # # equivalent_ratio = pre_trading_day_limit_up_info[0][14] # # logger.info(f"量比?=={equivalent_ratio}") # # # 领涨次数 # # leading_increases_times = pre_trading_day_limit_up_info[0][15] # # logger.info(f"领涨次数=={leading_increases_times}") # # # 未知零值 # # unknown_zero_16 = pre_trading_day_limit_up_info[0][16] # # logger.info(f"未知零值16=={unknown_zero_16}") # # # 未知零值 # # unknown_zero_17 = pre_trading_day_limit_up_info[0][17] # # logger.info(f"未知零值17=={unknown_zero_17}") # # # 第几板(连续涨停天数) # # continuous_limit_up_days = pre_trading_day_limit_up_info[0][18] # # logger.info(f"第几板=={continuous_limit_up_days}") # # # 最相关概念的代码 # # the_most_relevant_plate_code = pre_trading_day_limit_up_info[0][19] # # logger.info(f"最相关概念的代码=={the_most_relevant_plate_code}") # # # 同班级的数量(同概念涨停数量) # # the_same_class_amount = pre_trading_day_limit_up_info[0][20] # # logger.info(f"同概念涨停数量=={the_same_class_amount}") # # # # get_handling_limit_up_info() # # # # 获取全部个股的板块并存储的函数 # def get_all_stocks_plate_dict(stocks_list): # all_stocks_plate_dict = {} # # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中 # for i in stocks_list: # try: # code = i.split('.')[1] # # print(f"i==={i}") # # 获取个股的自由市值 # free_market_value = getZYLTAmount(code) # # 获取个股的板块列表 # selected_blocks = getStockIDPlate(code) # # 提取精选板块中的板块名称 # selected_plate_list = [block[1] for block in selected_blocks] # # print(f"selected_block_names==={selected_block_list}") # block_data = { # # 添加自由市值 # 'free_market_value': free_market_value, # # 添加精选板块 # 'plate': selected_plate_list # } # # 将code作为键,stocks_selected_block_data作为值添加到stocks_block_data字典中 # all_stocks_plate_dict[code] = block_data # # print(f"all_stocks_plate_dict==={all_stocks_plate_dict}") # except Exception as e: # print(f"获取全部个股的板块并存储的函数 An error occurred: {e}") # finally: # pass # # return stocks_plate_data # # print(f"all_stocks_plate_dict==={len(all_stocks_plate_dict)}") # # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中 # # 将字典转换为JSON格式的字符串 # json_data = json.dumps(all_stocks_plate_dict) # # 写入文件 # with open(constant.ALL_STOCKS_PLATE_PATH, 'w', encoding='utf-8') as f: # f.write(json_data) # now_time = datetime.datetime.now() # 获取本机时间 # logger.info(f"写入所有个股板块文件完成!::{now_time}") # # # # 计算开盘啦昨日拉取的概念数据中为空的股票数量函数 # def get_have_no_plate_num(): # # 初始化无概念数量 # have_no_plate_num = 0 # plate_are_null_list = [] # for k, v in data_cache.all_stocks_plate_dict.items(): # pass # # print(f"i==={i} T==={t}") # if len(v['plate']) == 0: # have_no_plate_num += 1 # # print(f"{k}的概念为空") # # logger.info(f"{k}的概念为空") # # 股票代码格式转化为掘金格式 # symbol = basic_methods.format_stock_symbol(k) # sec_name = data_cache.all_stocks_all_K_line_property_dict.get(symbol) # if sec_name is not None: # plate_are_null_list.append(sec_name) # logger.info(f"有{have_no_plate_num}只股票概念为空") # logger.info(f"个股有历史K线但概念为空的有:{plate_are_null_list}") # # # # 获取全部个股的精选板块并存储的函数 # def stocks_list_selected_blocks(min_stocks): # stocks_selected_block_data = [] # # 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中 # for i in min_stocks: # try: # code = i.split('.')[1] # # 获取个股的自由市值 # free_market_value = getZYLTAmount(code) # # 获取个股的精选板块列表 # # selected_blocks = getCodeJingXuanBlocks('000021') # selected_blocks = getCodeJingXuanBlocks(code) # # 提取精选板块中的板块名称 # selected_block_list = [block[1] for block in selected_blocks] # # print(f"selected_block_names==={selected_block_list}") # stocks_selected_block_dict = { # # 添加股票代码 # 'code': code, # # 添加自由市值 # 'free_market_value': free_market_value, # # 添加精选板块 # 'selected_block': selected_block_list # } # stocks_selected_block_data.append(stocks_selected_block_dict) # # print(f"stocks_selected_block_data==={stocks_selected_block_dict}") # except Exception as e: # logger.error(f"获取全部个股的精选板块并存储的函数 An error occurred: {e}") # # # print(f"stocks_selected_block_data==={len(stocks_selected_block_data)}") # # 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中 # # 将字典转换为JSON格式的字符串 # json_data = json.dumps(stocks_selected_block_data) # # 写入文件 # with open('local_storage_data/stocks_selected_block_data.json', 'w', encoding='utf-8') as f: # f.write(json_data) # now_time = datetime.datetime.now() # 获取本机时间 # print(f"写入精选板块文件完成!::{now_time}") # # # # kpl_stocks_list_selected_blocks_process() #在 kpl_api.py中可以调用 # # stocks_list_selected_blocks(min_stocks) #在 kpl_api.py中可以调用 # # list = ['SHSE.600805','SHSE.600804'] # # # # all_stocks_plate_dict(list) # strategy/market_sentiment_analysis.py
@@ -6,13 +6,9 @@ import datetime import json import time import constant from log_module import async_log_util from log_module.log import logger_common, logger_Overall_market_strength_score, logger_debug # import time # 引入掘金API # from gm.api import * from strategy import basic_methods, kpl_api from strategy import data_cache @@ -20,163 +16,6 @@ # 获取logger实例 logger = logger_common # ====================== # 模拟数据生成(完整版) # ====================== # data = { # 'total_stocks': 5000, # 全市场股票总数 # 'limit_up': 120, # 涨停股数量 # 'limit_down': 5, # 跌停股数量 # 'advance_count': 2800, # 上涨家数 # 'decline_count': 1500, # 下跌家数 # 'high_retreat_count': 30, # 当日回撤超10%的个股数量 # 'turnover': 1.2e12, # 当日成交额(元) # 'turnover_prev_day': 1.0e12, # 前一日成交额 # 'northbound_inflow': 8e9, # 北向资金净流入(元) # 'max_northbound_30d': 15e9, # 近30日最大北向净流入 # 'margin_buy': 8e10, # 融资买入额 # 'sector_gains': [3.5, 2.8, 1.9, -0.5], # 前4大板块涨幅(%) # 'max_continuous_boards': 7, # 最高连板数(如7连板) # 'continuous_boards': [7, 5, 3, 2], # 连板梯队(各层级连板数量) # } # # # # ====================== # # 因子计算与标准化处理 # # ====================== # def calculate_factors(data): # factors = {} # # # === 新增核心因子 === # # 1. 涨跌家数比(0-100分) # if data['decline_count'] == 0: # factors['advance_ratio'] = 100.0 # else: # factors['advance_ratio'] = min( # (data['advance_count'] / data['decline_count']) * 50, # 比值1:2对应100分 # 100.0 # ) # # # 2. 涨停强度(标准化到0-100) # factors['limit_strength'] = ( # (data['limit_up'] - data['limit_down']) / data['total_stocks'] * 1000 # 放大差异 # ) # # # 3. 大幅回撤比例(0-100分) # factors['high_retreat_ratio'] = ( # data['high_retreat_count'] / data['total_stocks'] * 1000 # 千分比更敏感 # ) # # # 4. 连板高度与梯队(根据历史极值归一化) # factors['max_continuous'] = ( # data['max_continuous_boards'] / 10 * 100 # 假设历史最高10连板 # ) # factors['board_ladder'] = ( # len(data['continuous_boards']) / 5 * 100 # 假设最多5个连板层级 # ) # # # === 原有因子优化 === # # 5. 量能变化(成交额增长率) # turnover_growth = ( # (data['turnover'] - data['turnover_prev_day']) / # data['turnover_prev_day'] * 100 # ) # factors['liquidity'] = turnover_growth # # # 6. 板块强度(前3板块平均涨幅) # top_sectors = sorted(data['sector_gains'], reverse=True)[:3] # sector_avg = sum(top_sectors) / len(top_sectors) # # factors['sector_strength'] = ((sector_avg - (-5.0)) / (10.0 - (-5.0)) * 100) # 历史范围-5%~10% # # 7. 北向资金强度 # factors['northbound_ratio'] = (data['northbound_inflow'] / data['max_northbound_30d'] * 100) # # # 8. 融资买入占比 # factors['margin_ratio'] = (data['margin_buy'] / data['turnover'] * 100) # # # # 强制所有因子在0-100范围内 # for key in factors: # factors[key] = max(0.0, min(factors[key], 100.0)) # # return factors # # # # ====================== # # 权重分配(总权重1.0) # # ====================== # def get_weights(): # return { # # 新增因子权重 # 'advance_ratio': 0.15, # 涨跌家数比 # 'limit_strength': 0.2, # 涨停强度 # 'high_retreat_ratio': 0.1, # 大幅回撤 # 'max_continuous': 0.1, # 连板高度 # 'board_ladder': 0.05, # 连板梯队 # # 原有因子权重 # 'liquidity': 0.15, # 量能变化 # 'sector_strength': 0.1, # 板块强度 # 'northbound_ratio': 0.1, # 北向资金 # 'margin_ratio': 0.05 # 融资买入 # } # # # # ====================== # # 综合强度分数计算(含动态修正) # # ====================== # def composite_strength_score(data): # factors = calculate_factors(data) # weights = get_weights() # # # 基础加权得分 # score = sum(factors[key] * weights[key] for key in factors) # # # === 动态修正规则 === # # 规则1:涨停数量超过100家时额外加分 # if data['limit_up'] > 100: # score += min((data['limit_up'] - 100) * 0.2, 10) # 最多加10分 # # # 规则2:连板梯队断裂时扣分(如最高板与次高板差距≥3) # continuous_boards = sorted(data['continuous_boards'], reverse=True) # if len(continuous_boards) >= 2 and (continuous_boards[0] - continuous_boards[1] >= 3): # score -= 15 # 梯队断裂惩罚 # # # 规则3:北向资金与涨跌家数背离时调整 # if (factors['northbound_ratio'] > 50) and (factors['advance_ratio'] < 40): # score *= 0.9 # 权重股拉升导致的虚假繁荣 # # return max(0.0, min(score, 100.0)) # ====================== # 执行计算与结果输出 # ====================== # final_score = composite_strength_score(data) # print("=== 综合强度分数 ===") # print(f"当前得分: {final_score:.1f}/100") # # # 输出因子贡献度分析 # factors = calculate_factors(data) # weights = get_weights() # print("\n=== 因子贡献度明细 ===") # for key in factors: # print(f"{key:20s}: {factors[key]:5.1f} × {weights[key]:.0%} = {factors[key] * weights[key]:5.1f}") ''' 代码输出示例: === 综合强度分数 === 当前得分: 78.4/100 === 因子贡献度明细 === advance_ratio : 93.3 × 15% = 14.0 limit_strength : 23.0 × 20% = 4.6 high_retreat_ratio : 6.0 × 10% = 0.6 max_continuous : 70.0 × 10% = 7.0 board_ladder : 80.0 × 5% = 4.0 liquidity : 20.0 × 15% = 3.0 sector_strength : 60.0 × 10% = 6.0 northbound_ratio : 53.3 × 10% = 5.3 margin_ratio : 10.0 × 5% = 0.5 ''' # 指数行情策略函数 @@ -189,67 +28,69 @@ 'TSXV': {} } if current_info is not None and len_current_info > 0: # print(f"current_info------------{current_info}") # 上证指数数据 Shanghai_index_data = current_info.get('000001') Shanghai_index = Shanghai_index_data[0] # 上证指数 Shanghai_index_volume = round(Shanghai_index_data[1]/100000000, 2) # 上证指数 当日当时成交量 Shanghai_index_turnover = round(Shanghai_index_data[2]/100000000, 2) # 上证指数 当日当时成交额度 Shanghai_Yesterday_closing_index = round(Shanghai_index_data[3], 2) # 上证指数 昨日收盘指数 logger.info(f"上证 指数:{Shanghai_index} 昨日收盘指数:{Shanghai_Yesterday_closing_index} 成交量:{Shanghai_index_volume}亿 手 成交额:{Shanghai_index_turnover}亿 元") now_time_str = tool.get_now_time_str() if data_cache.L1_DATA_START_TIME < now_time_str < data_cache.CLOSING_TIME: # print(f"current_info------------{current_info}") # 上证指数数据 Shanghai_index_data = current_info.get('000001') Shanghai_index = Shanghai_index_data[0] # 上证指数 Shanghai_index_volume = round(Shanghai_index_data[1]/100000000, 2) # 上证指数 当日当时成交量 Shanghai_index_turnover = round(Shanghai_index_data[2]/100000000, 2) # 上证指数 当日当时成交额度 Shanghai_Yesterday_closing_index = round(Shanghai_index_data[3], 2) # 上证指数 昨日收盘指数 logger.info(f"上证 指数:{Shanghai_index} 昨日收盘指数:{Shanghai_Yesterday_closing_index} 成交量:{Shanghai_index_volume}亿 手 成交额:{Shanghai_index_turnover}亿 元") # 深证指数数据 Shenzhen_index_data = current_info.get('399001') Shenzhen_index = Shenzhen_index_data[0] # 深证指数 Shenzhen_index_volume = round(Shenzhen_index_data[1]/100000000, 2) # 深证指数 当日当时成交量 Shenzhen_index_turnover = round(Shenzhen_index_data[2]/100000000, 2) # 深证指数 当日当时成交额度 Shenzhen_Yesterday_closing_index = round(Shenzhen_index_data[3], 2) # 深证指数 昨日收盘指数 logger.info(f"深证 指数:{Shenzhen_index} 昨日收盘指数:{Shenzhen_Yesterday_closing_index} 成交量:{Shenzhen_index_volume}亿 手 成交额:{Shenzhen_index_turnover}亿 元") # 深证指数数据 Shenzhen_index_data = current_info.get('399001') Shenzhen_index = Shenzhen_index_data[0] # 深证指数 Shenzhen_index_volume = round(Shenzhen_index_data[1]/100000000, 2) # 深证指数 当日当时成交量 Shenzhen_index_turnover = round(Shenzhen_index_data[2]/100000000, 2) # 深证指数 当日当时成交额度 Shenzhen_Yesterday_closing_index = round(Shenzhen_index_data[3], 2) # 深证指数 昨日收盘指数 logger.info(f"深证 指数:{Shenzhen_index} 昨日收盘指数:{Shenzhen_Yesterday_closing_index} 成交量:{Shenzhen_index_volume}亿 手 成交额:{Shenzhen_index_turnover}亿 元") # 创业板指数数据 TSXV_index_data = current_info.get('399006') TSXV_index = TSXV_index_data[0] # 创业板指 TSXV_index_volume = round(TSXV_index_data[1]/100000000, 2) # 创业板指 当日当时成交量 TSXV_index_turnover = round(TSXV_index_data[2]/100000000, 2) # 创业板指 当日当时成交额度 TSXV_Yesterday_closing_index = round(TSXV_index_data[3], 2) # 深证指数 昨日收盘指数 logger.info(f"创业板 指数:{TSXV_index} 昨日收盘指数:{TSXV_Yesterday_closing_index} 成交量:{TSXV_index_volume}亿 手 成交额:{TSXV_index_turnover}亿 元") # 创业板指数数据 TSXV_index_data = current_info.get('399006') TSXV_index = TSXV_index_data[0] # 创业板指 TSXV_index_volume = round(TSXV_index_data[1]/100000000, 2) # 创业板指 当日当时成交量 TSXV_index_turnover = round(TSXV_index_data[2]/100000000, 2) # 创业板指 当日当时成交额度 TSXV_Yesterday_closing_index = round(TSXV_index_data[3], 2) # 深证指数 昨日收盘指数 logger.info(f"创业板 指数:{TSXV_index} 昨日收盘指数:{TSXV_Yesterday_closing_index} 成交量:{TSXV_index_volume}亿 手 成交额:{TSXV_index_turnover}亿 元") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.Shanghai_tick_growth = basic_methods.tick_growth('000001', Shanghai_index) data_cache.Shanghai_today_growth = basic_methods.intraday_growth(Shanghai_index, Shanghai_Yesterday_closing_index) logger.info(f"上证指数 瞬时涨幅 ---- {round(data_cache.Shanghai_tick_growth, 4)}%") logger.info(f"上证指数 当日涨幅 ==== {round(data_cache.Shanghai_today_growth, 4)}%") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.Shenzhen_tick_growth = basic_methods.tick_growth('399001', Shanghai_index) data_cache.Shenzhen_today_growth = basic_methods.intraday_growth(Shenzhen_index, Shenzhen_Yesterday_closing_index) logger.info(f"深证指数 瞬时涨幅 ---- {round(data_cache.Shenzhen_tick_growth, 4)}%") logger.info(f"深证指数 当日涨幅 ==== {round(data_cache.Shenzhen_today_growth, 4)}%") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.TSXV_tick_growth = basic_methods.tick_growth('399006', Shanghai_index) data_cache.TSXV_today_growth = basic_methods.intraday_growth(TSXV_index, TSXV_Yesterday_closing_index) logger.info(f"创业板指 瞬时涨幅 ---- {round(data_cache.TSXV_tick_growth, 4)}%") logger.info(f"创业板指 当日涨幅 ==== {round(data_cache.TSXV_today_growth, 4)}%") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.Shanghai_tick_growth = basic_methods.tick_growth('000001', Shanghai_index) data_cache.Shanghai_today_growth = basic_methods.intraday_growth(Shanghai_index, Shanghai_Yesterday_closing_index) logger.info(f"上证指数 瞬时涨幅 ---- {round(data_cache.Shanghai_tick_growth, 4)}%") logger.info(f"上证指数 当日涨幅 ==== {round(data_cache.Shanghai_today_growth, 4)}%") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.Shenzhen_tick_growth = basic_methods.tick_growth('399001', Shanghai_index) data_cache.Shenzhen_today_growth = basic_methods.intraday_growth(Shenzhen_index, Shenzhen_Yesterday_closing_index) logger.info(f"深证指数 瞬时涨幅 ---- {round(data_cache.Shenzhen_tick_growth, 4)}%") logger.info(f"深证指数 当日涨幅 ==== {round(data_cache.Shenzhen_today_growth, 4)}%") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.TSXV_tick_growth = basic_methods.tick_growth('399006', Shanghai_index) data_cache.TSXV_today_growth = basic_methods.intraday_growth(TSXV_index, TSXV_Yesterday_closing_index) logger.info(f"创业板指 瞬时涨幅 ---- {round(data_cache.TSXV_tick_growth, 4)}%") logger.info(f"创业板指 当日涨幅 ==== {round(data_cache.TSXV_today_growth, 4)}%") # 在集合竞价时更新一下 各个指数的开盘涨幅 now_time = tool.get_now_time_str() # 9:25:06 < now_time < 9:25:12 记录开盘指数 及 开盘涨幅 if data_cache.LATER_OPEN_BIDDING_TIME < now_time < data_cache.AFTER_OPEN_BIDDING_TIME: index_K_line['Shanghai']['Shanghai_open_index'] = Shanghai_index index_K_line['Shenzhen']['Shenzhen_open_index'] = Shenzhen_index index_K_line['TSXV']['TSXV_open_index'] = TSXV_index data_cache.Shanghai_open_growth = data_cache.Shanghai_today_growth data_cache.Shenzhen_open_growth = data_cache.Shenzhen_today_growth data_cache.TSXV_open_growth = data_cache.TSXV_today_growth # 在集合竞价时更新一下 各个指数的开盘涨幅 now_time = tool.get_now_time_str() # 9:25:06 < now_time < 9:25:12 记录开盘指数 及 开盘涨幅 if data_cache.LATER_OPEN_BIDDING_TIME < now_time < data_cache.AFTER_OPEN_BIDDING_TIME: index_K_line['Shanghai']['Shanghai_open_index'] = Shanghai_index index_K_line['Shenzhen']['Shenzhen_open_index'] = Shenzhen_index index_K_line['TSXV']['TSXV_open_index'] = TSXV_index data_cache.Shanghai_open_growth = data_cache.Shanghai_today_growth data_cache.Shenzhen_open_growth = data_cache.Shenzhen_today_growth data_cache.TSXV_open_growth = data_cache.TSXV_today_growth # 15:00:00 < now_time < 15:01:00 记录收盘指数 if data_cache.CLOSING_TIME < now_time < data_cache.AFTER_CLOSING_TIME: index_K_line['Shanghai']['Shanghai_close_index'] = Shanghai_index index_K_line['Shenzhen']['Shenzhen_close_index'] = Shenzhen_index index_K_line['TSXV']['TSXV_close_index'] = TSXV_index # 15:00:00 < now_time < 15:01:00 记录收盘指数 if data_cache.CLOSING_TIME < now_time < data_cache.AFTER_CLOSING_TIME: index_K_line['Shanghai']['Shanghai_close_index'] = Shanghai_index index_K_line['Shenzhen']['Shenzhen_close_index'] = Shenzhen_index index_K_line['TSXV']['TSXV_close_index'] = TSXV_index logger.info(f"上证指数 开盘涨幅 ==== {round(data_cache.Shanghai_open_growth, 4)}%") logger.info(f"深证指数 开盘涨幅 ==== {round(data_cache.Shenzhen_open_growth, 4)}%") logger.info(f"创业板指 开盘涨幅 ==== {round(data_cache.TSXV_open_growth, 4)}%") logger.info(f"上证指数 开盘涨幅 ==== {round(data_cache.Shanghai_open_growth, 4)}%") logger.info(f"深证指数 开盘涨幅 ==== {round(data_cache.Shenzhen_open_growth, 4)}%") logger.info(f"创业板指 开盘涨幅 ==== {round(data_cache.TSXV_open_growth, 4)}%") # 大盘指数趋势预期函数(用于对大盘开盘后短期内的趋势预测) @@ -561,140 +402,162 @@ # 计算市场分布形态因子 函数 def calculate_market_distribution_pattern_factor(data): # ====================== 输入数据 ====================== data = {'-1': '210', '-10': '4', '-2': '134', '-3': '71', '-4': '45', '-5': '18', '-6': '15', '-7': '7', '-8': '6', '-9': '2', '0': '83', '1': '650', '10': '41', '2': '1242', '3': '1182', '4': '784', '5': '334', '6': '139', '7': '77', '8': '46', '9': '16', 'DT': 5, 'SJDT': '1', 'SJZT': '17', 'STDT': '4', 'STZT': '2', 'SZJS': 4530, 'XDJS': 517, 'ZSZDFB': '1929,247,56,435,61,3,259,29,4,44,5,1,38,9,3,239,51,10,', 'ZT': 19, 'sign': '市场人气较好', 'szln': 1723817, 'qscln': 5291241, 's_zrcs': 2413498, 'q_zrcs': 6183792, 's_zrtj': 75543024, 'q_zrtj': 169960513} # ====================== 数据预处理 ====================== def preprocess_data(data): """ 原生Python数据预处理 返回: sorted_bins : 排序后的涨跌幅区间(整数列表) sorted_counts : 对应区间的股票数量 total : 总股票数 pos_counts : 正涨跌区间的数量列表 neg_counts : 负涨跌区间的数量列表 """ # 将字符串键转为整数并排序 sorted_items = sorted(data.items(), key=lambda x: int(x[0])) sorted_bins = [int(k) for k, v in sorted_items] sorted_counts = [v for k, v in sorted_items] 预处理原始数据: 1. 转换所有数值字段为整数 2. 解析涨跌分布(ZSZDFB)为整数列表 3. 分离涨跌幅分布数据 """ processed = {} # 计算总股票数 total = sum(sorted_counts) # 分离正负区间(排除0%) pos_counts = [v for k, v in sorted_items if int(k) > 0] neg_counts = [v for k, v in sorted_items if int(k) < 0] return sorted_bins, sorted_counts, total, pos_counts, neg_counts # 执行预处理 bins, counts, total, pos_counts, neg_counts = calculate_market_distribution_pattern_factor(data_cache.rise_and_fall_statistics_dirt) # ====================== 因子计算模块 ====================== class NativeFactorCalculator: def __init__(self, bins, counts, total, pos_counts, neg_counts): self.bins = bins self.counts = counts # todo total 会默认为0的情况 if total != 0: self.total = total # 通用字段类型转换 for key, value in data.items(): if isinstance(value, str) and value.isdigit(): # 处理字符串数字 processed[key] = int(value) elif key == 'ZSZDFB': # 特殊处理涨跌分布 # 过滤空值并转换为整数列表 processed[key] = [int(x) for x in value.strip(',').split(',')] else: self.total = 1 self.pos_counts = pos_counts self.neg_counts = neg_counts processed[key] = value # 保留原始类型 def calculate_bdr(self): """原生涨跌比计算""" sum_pos = sum(self.pos_counts) sum_neg = sum(self.neg_counts) return sum_pos / sum_neg if sum_neg != 0 else float('nan') def calculate_extreme_ratio(self): """极端波动比例计算""" extreme_total = 0 for b, c in zip(self.bins, self.counts): if b >= 5 or b <= -5: extreme_total += c return extreme_total / self.total def calculate_market_breadth(self): """市场宽度因子计算""" diff = sum(self.pos_counts) - sum(self.neg_counts) return diff / self.total def _expand_distribution(self): """将分箱数据展开为原始数据点(用于计算统计指标)""" expanded = [] for value, count in zip(self.bins, self.counts): expanded.extend([value] * count) return expanded def calculate_distribution_metrics(self): """原生偏度与峰度计算""" data = self._expand_distribution() n = len(data) if n < 2: return (float('nan'), float('nan')) # 计算均值 mean = sum(data) / n # 计算标准差 variance = sum((x - mean) ** 2 for x in data) / (n - 1) std_dev = variance ** 0.5 # 计算偏度 skewness = (sum((x - mean) ** 3 for x in data) / n) / (std_dev ** 3) if std_dev != 0 else 0 # 计算峰度(Fisher定义,即正态分布峰度为0) kurtosis = (sum((x - mean) ** 4 for x in data) / n) / (std_dev ** 4) - 3 if std_dev != 0 else 0 return (skewness, kurtosis) # 分离涨跌幅分布数据(示例划分,具体区间需确认) # 假设ZSZDFB对应从-10%到10%的18个区间分布(根据数据长度推测) processed['price_distribution'] = { 'bins': list(range(-10, 11)), # 生成-10到10的整数区间标识 'counts': processed['ZSZDFB'] } return processed # 实例化并计算因子 calculator = NativeFactorCalculator(bins, counts, total, pos_counts, neg_counts) bdr = calculator.calculate_bdr() extreme_ratio = calculator.calculate_extreme_ratio() market_breadth = calculator.calculate_market_breadth() skewness, kurtosis = calculator.calculate_distribution_metrics() # data = preprocess_data(raw_data) # ====================== 策略信号生成模块 ====================== class NativeTradingStrategy: def __init__(self, bdr_threshold=1.5, extreme_threshold=0.15): self.bdr_threshold = bdr_threshold self.extreme_threshold = extreme_threshold # ====================== 核心因子计算 ====================== def calculate_factors(data): if data is not None: zero = int(data.get('0')) rise_one = int(data.get('1')) rise_two = int(data.get('2')) rise_three = int(data.get('3')) rise_four = int(data.get('4')) rise_five = int(data.get('5')) rise_six = int(data.get('6')) rise_seven = int(data.get('7')) rise_eight = int(data.get('8')) rise_nine = int(data.get('9')) rise_ten = int(data.get('10')) fall_one = int(data.get('-1')) fall_two = int(data.get('-2')) fall_three = int(data.get('-3')) fall_four = int(data.get('-4')) fall_five = int(data.get('-5')) fall_six = int(data.get('-6')) fall_seven = int(data.get('-7')) fall_eight = int(data.get('-8')) fall_nine = int(data.get('-9')) fall_ten = int(data.get('-10')) print(type(fall_ten)) def generate_signals(self, bdr, extreme_ratio, market_breadth, skewness): """生成交易信号(与之前相同,保持策略逻辑)""" signals = [] """计算市场关键指标因子""" factors = {} # 总票数 total_stocks = zero + rise_one + rise_two + rise_three + rise_four + rise_five + rise_six + rise_seven + rise_eight + rise_nine + rise_ten + fall_one + fall_two + fall_three + fall_four + fall_five + fall_six + fall_seven + fall_eight + fall_nine + fall_ten print(f"total_stocks = {total_stocks}") factors['total_stocks'] = total_stocks # 涨跌统计(排除0%) rise = rise_one + rise_two + rise_three + rise_four + rise_five + rise_six + rise_seven + rise_eight + rise_nine + rise_ten fall = fall_one + fall_two + fall_three + fall_four + fall_five + fall_six + fall_seven + fall_eight + fall_nine + fall_ten # 信号1:基于涨跌比 if bdr > self.bdr_threshold: signals.append("多头趋势增强 - 加仓指数ETF") elif bdr < 1 / self.bdr_threshold: signals.append("空头趋势增强 - 减仓或反向操作") else: signals.append("趋势中性 - 保持当前仓位") rise_dirt = { 'rise_one': rise_one, 'rise_two': rise_two, 'rise_three': rise_three, 'rise_four': rise_four, 'rise_five': rise_five, 'rise_six': rise_six, 'rise_seven': rise_seven, 'rise_eight': rise_eight, 'rise_nine': rise_nine, 'rise_ten': rise_ten, } fall_dirt = { 'fall_one': fall_one, 'fall_two': fall_two, 'fall_three': fall_three, 'fall_four': fall_four, 'fall_five': fall_five, 'fall_six': fall_six, 'fall_seven': fall_seven, 'fall_eight': fall_eight, 'fall_nine': fall_nine, 'fall_ten': fall_ten, } # 计算所有值的总和 total_sum = sum(rise_dirt.values()) # 计算每个键的值占总和的百分比 percentages = {key: round((value / total_sum) * 100, 2) for key, value in rise_dirt.items()} # 找到最大值对应的键 max_key = max(rise_dirt, key=rise_dirt.get) # 找到最小值对应的键 min_key = min(rise_dirt, key=rise_dirt.get) # 信号2:极端波动 if extreme_ratio > self.extreme_threshold: signals.append("波动风险升高 - 启动对冲或降低杠杆") else: signals.append("波动正常 - 维持现有风险敞口") # 涨跌比因子 --------------------------------------------------- factors['rise_vs_fall'] = { 'rise_vs_fall_ratio': rise / fall if fall > 0 else 0, # 涨跌比 'rise_gather_area': max_key, # 找到最大值 'rise_scattered_area': min_key, # 找到最小值 'percentages': percentages, # 找到最小值 } # 信号3:市场宽度与偏度 if market_breadth > 0.2 and skewness > 0.5: signals.append("市场普涨且偏度正向 - 超配行业龙头股") elif market_breadth < -0.2 and skewness < -0.5: signals.append("市场普跌且偏度负向 - 低估值防御型配置") return signals # 资金流向因子 -------------------------------------------------- factors['capital_flow'] = { 'buy': data['q_zrtj'], # 买方资金净流入 'sell': data['s_zrtj'], # 卖方资金净流出 'net': data['q_zrtj'] - data['s_zrtj'] # 资金净流入 } # 市场情绪因子 -------------------------------------------------- factors['sentiment'] = { 'zt_ratio': data['ZT'] / total_stocks, # 涨停占总比 'zt_vs_dt_ratio': data['ZT'] / data['DT'], # 涨停跌停比 'sign': 1 if '较好' in data['sign'] else 0, # 情绪标签量化 'rise_vs_all_stocks_ratio': data['SZJS'] / total_stocks, # 活跃股占比 'rise_vs_fall_ratio': data['SZJS'] / data['SZJS'], # 活跃股占比 'extreme_ratio': (data['ZT'] + data['DT']) / total_stocks, # 极端波动(涨停+跌停占总比) } return factors # 实例化策略并生成信号 strategy = NativeTradingStrategy() signals = strategy.generate_signals(bdr, extreme_ratio, market_breadth, skewness) # # # ====================== 策略信号生成 ====================== def generate_signals(factors): """生成多维度交易信号""" signals = [] # 信号1:趋势强度判断 if factors['sentiment']['rise_vs_fall_ratio'] > 1.5: signals.append("多头强势:涨跌比高于阈值1.5") elif factors['sentiment']['rise_vs_fall_ratio'] < 0.67: signals.append("空头强势:涨跌比低于阈值0.67") # 信号2:资金动能判断 if factors['capital_flow']['net'] > 1e8: # 假设1亿为阈值 signals.append("资金大幅净流入:关注主力进场") elif factors['capital_flow']['net'] < -5e7: signals.append("资金大幅净流出:警惕风险") # 信号3:极端波动预警 if factors['sentiment']['extreme_ratio'] > 0.1: signals.append("极端波动:涨停跌停股超10%") # 信号4:市场情绪综合 if factors['sentiment']['zt_ratio'] > 0.05 and factors['sentiment']['sign'] == 1: signals.append("高情绪热度:涨停股多且人气向好") return signals if signals else ["无显著信号:维持当前策略"] # 实时设置计划持仓数量 函数 @@ -792,12 +655,25 @@ # print(f"指数K线{data_cache.all_index_k_line_property_dict}") # all_index_k_line_dict_write() # ====================== 结果输出 ====================== print("========== 因子数值(原生计算) ==========") print(f"涨跌比(BDR): {bdr:.2f}") print(f"极端波动比例: {extreme_ratio:.2%}") print(f"市场宽度因子: {market_breadth:.2f}") print(f"分布偏度: {skewness:.2f}, 峰度: {kurtosis:.2f}") print(f"data=={data}") # 因子计算 factors = calculate_factors(data) # 生成信号 signals = generate_signals(factors) # 结果输出 print("========== 市场关键指标 ==========") print(f"总股票数: {factors['total_stocks']}") print(f"涨跌比(BDR): {factors['rise_vs_fall']['rise_vs_fall_ratio']:.2f}") print(f"极端波动比例: {factors['sentiment']['extreme_ratio']:.2%}") print(f"资金净流入(元): {factors['capital_flow']['net']:,}") print(f"涨停股占比: {factors['sentiment']['zt_ratio']:.2%}") print(f"市场情绪量化: {'积极' if factors['sentiment']['sign'] else '谨慎'}") print(f"上涨聚集区域:{factors['rise_vs_fall']['rise_gather_area']}") print(f"上涨零散区域:{factors['rise_vs_fall']['percentages']}") print("\n========== 策略信号 ==========") for i, signal in enumerate(signals, 1): print(f"信号{i}: {signal}") print(f"信号{i}: {signal}") strategy/selling_strategy.py
@@ -545,6 +545,7 @@ k_line_data[0]['sec_name'], index) # L2级数据触发的板上盯的炸板策略 def explosion_strategy(code): if code not in data_cache.available_symbols_set: return @@ -566,40 +567,35 @@ if not position_info: return position_volume_yesterday = position_info['availablePosition'] # 股份可用 (可用股份数量) # 昨日持仓量(可用持仓量) if current_volume < k_line_data[0]['volume'] * 0.6: logger_info( f"【开盘临机】-【 炸板中!!且当日量不足】【{k_line_data[0]['sec_name']}】 卖一数量不等于0,设定委卖数量【十分之一仓】,当日当时量:{current_volume}") order_methods.sell_order_by_part_volume(0.1, symbol, position_volume_yesterday, current_price, k_line_data[0]['sec_name'], index) # 如果 卖二 量炸出来了,那么就是彻底炸开了。 if current_quotes_sell[1][1] != 0: logger_info( f"【开盘临机】-【 炸板炸开了!!】【{k_line_data[0]['sec_name']}】 卖二数量不等于0,设定委卖数量【十分之二仓】,当日当时量:{current_volume}") order_methods.sell_order_by_part_volume(0.2, symbol, position_volume_yesterday, current_price, k_line_data[0]['sec_name'], index) if current_volume < k_line_data[0]['volume'] * 0.8: # 该股加入到板上盯卖的集合中 所以设定卖出全仓 if symbol in data_cache.LIMIT_UP_SELL_CODES: logger_info(f"板上盯卖 |开启| 进入卖票决策") if current_volume < k_line_data[0]['volume'] * 0.6: logger_info( f"【开盘临机】-【 炸板!!且当日量不足】【{k_line_data[0]['sec_name']}】 买盘小于1万 且 今日量小于昨日量的 0.8,当日当时量:{current_volume}") # 该股加入到板上盯卖的集合中 所以设定卖出全仓 if symbol in data_cache.LIMIT_UP_SELL_CODES: f"【开盘临机】-【 炸板中!!且当日量不足】【{k_line_data[0]['sec_name']}】 卖一数量不等于0,设定委卖数量【十分之一仓】,当日当时量:{current_volume}") order_methods.sell_order_by_part_volume(0.1, symbol, position_volume_yesterday, current_price, k_line_data[0]['sec_name'], index) # 如果 卖二 量炸出来了,那么就是彻底炸开了。 if current_quotes_sell[1][1] != 0: logger_info( f"【开盘临机】-【 炸板炸开了!!】【{k_line_data[0]['sec_name']}】 卖二数量不等于0,设定委卖数量【十分之一仓】,当日当时量:{current_volume}") order_methods.sell_order_by_part_volume(0.1, symbol, position_volume_yesterday, current_price, k_line_data[0]['sec_name'], index) if current_volume < k_line_data[0]['volume'] * 0.8: logger_info( f"【开盘临机】-【 炸板!!且当日量不足】【{k_line_data[0]['sec_name']}】 板上盯卖 |开启| 设定委卖【全仓】") f"【开盘临机】-【 炸板!!且当日量不足】【{k_line_data[0]['sec_name']}】 买盘小于1万 且 今日量小于昨日量的 0.8,当日当时量:{current_volume}") logger_info( f"【开盘临机】-【 炸板!!且当日量不足】【{k_line_data[0]['sec_name']}】 设定委卖【全仓】") order_methods.sell_order_by_part_volume(1, symbol, position_volume_yesterday, current_price, k_line_data[0]['sec_name'], index) else: logger_info( f"【开盘临机】-【 炸板!!且当日量不足】【{k_line_data[0]['sec_name']}】 板上盯卖 |关闭| 设定委卖【十分之一仓】") order_methods.sell_order_by_part_volume(0.1, symbol, position_volume_yesterday, current_price, k_line_data[0]['sec_name'], index) else: logger_info(f"板上盯卖 |关闭| 不做处理")