""" 买入策略 """ import constant from log_module import async_log_util from log_module.log import logger_buy_strategy from third_data import kpl_api from utils import tool class OpenLimitUpGoodBlocksBuyStrategy: """ 开1的好板块买入策略 """ __excuted = False @classmethod def set_current_limit_up_data(cls, datas): """ 设置当前的涨停数据 @param datas: @return: """ now_time = int(tool.get_now_time_str().replace(":", "")) if now_time < int("092800") or now_time > int("093000"): return if cls.__excuted: return cls.__excuted = True # 形如: {"801038":['801038', '维生素', 7.325]} reason_id_info_dict = {} # 形如: {"801038":{}} reason_id_codes = {} for d in datas: code = d[0] limit_up_reason = d[5] # [['801038', '维生素', 7.325]] reasons = cls.__load_blocks(code, limit_up_reason) for r in reasons: reason_id_info_dict[r[0]] = r if r[0] not in reason_id_codes: reason_id_codes[r[0]] = set() reason_id_codes[r[0]].add(code) reason_list = [reason_id_info_dict[x] for x in reason_id_info_dict] # 按涨幅倒序排 reason_list.sort(key=lambda x: x[2], reverse=True) async_log_util.info(logger_buy_strategy, f"原因排序:{reason_list}") async_log_util.info(logger_buy_strategy, f"原因代码:{reason_id_codes}") max_lenth = len(reason_list) # 剔除宽泛原因 for i in range(0, max_lenth): if i >= len(reason_list): break if reason_list[i][1] in constant.KPL_INVALID_BLOCKS: del reason_list[i] i -= 1 @classmethod def __load_blocks(cls, code, limit_up_reason): """ 加载板块 @param code: @return: """ limit_up_reason_info = None jx_blocks = kpl_api.getCodeJingXuanBlocks(code) for x in jx_blocks: if x[1] == limit_up_reason: limit_up_reason_info = x break recommend_reasons = jx_blocks if len(jx_blocks) > 2: recommend_reasons = jx_blocks[:2] reasons = [] if limit_up_reason_info: reasons.append(limit_up_reason_info) for r in recommend_reasons: if limit_up_reason_info and r[0] == limit_up_reason_info[0]: continue reasons.append(r) return reasons if __name__ == "__main__": with open("C:\\Users\\Administrator\\Desktop\\新建文本文档 (2).txt", encoding="utf-8") as f: line = f.readline() line = eval(line) OpenLimitUpGoodBlocksBuyStrategy.set_current_limit_up_data(line)