servers/data_server.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test/test_block.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
third_data/code_plate_key_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
third_data/kpl_limit_up_data_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
servers/data_server.py
@@ -9,6 +9,7 @@ from code_attribute.gpcode_manager import BlackListCodeManager from l2.l2_transaction_data_manager import HuaXinBuyOrderManager from log_module.log import logger_system, logger_debug, logger_kpl_limit_up, logger_request_api from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager from utils import global_util, tool, data_export_util from code_attribute import gpcode_manager from log_module import log, log_analyse, log_export, async_log_util @@ -16,7 +17,7 @@ from cancel_strategy.s_l_h_cancel_strategy import HourCancelBigNumComputer, LCancelRateManager from output.limit_up_data_filter import IgnoreCodeManager from third_data import kpl_util, kpl_data_manager, kpl_api, block_info from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager, LatestLimitUpBlockManager from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager from third_data.history_k_data_util import HistoryKDatasUtils from third_data.kpl_data_manager import KPLDataManager, KPLLimitUpDataRecordManager, \ KPLCodeLimitUpReasonManager test/test_block.py
@@ -1,5 +1,5 @@ from third_data import kpl_data_manager, kpl_util from third_data.code_plate_key_manager import LatestLimitUpBlockManager from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager from utils import tool third_data/code_plate_key_manager.py
@@ -8,9 +8,8 @@ import time import constant from code_attribute import code_nature_analyse from db.redis_manager_delegate import RedisUtils from third_data import kpl_block_util, kpl_api, kpl_util, kpl_limit_up_data_manager from third_data import kpl_block_util, kpl_api, kpl_util from settings.trade_setting import MarketSituationManager from third_data.history_k_data_manager import HistoryKDataManager from third_data.history_k_data_util import HistoryKDatasUtils @@ -1016,207 +1015,6 @@ # 保存板块计算结果 cls.__can_buy_compute_result_dict[code] = ( can_buy_blocks, unique, msg, can_buy_strong_blocks, keys, active_buy_blocks) class LatestLimitUpBlockManager: """ 最近涨停的板块管理 """ # 看最近7天 __LATEST_DAY_COUNT = 7 __days = [] # 目前涨停 __current_limit_up_day_datas = {} # 曾涨停 __history_limit_up_day_datas = {} # K线数据 __k_datas = {} # 代码的最高涨幅 __k_max_rate = {} __code_name_dict = {} # 统计板块数据:{"day":{"板块":[(涨停数,破板数, 代码集合)]}} __block_day_datas = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(LatestLimitUpBlockManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): # 加载最近7天的数据 __days = HistoryKDatasUtils.get_latest_trading_date_cache(cls.__LATEST_DAY_COUNT - 1) now_day = tool.get_now_date_str() if __days[0] != now_day: __days.insert(0, now_day) cls.__days = __days # 加载之前6天的涨停,曾涨停,曾涨停代码的最近6天的K线 for day in __days: if day == now_day: continue limit_up_records = kpl_limit_up_data_manager.get_history_limit_up_datas(day) cls.__history_limit_up_day_datas[day] = limit_up_records current_limit_up_datas = kpl_limit_up_data_manager.get_current_limit_up_datas(day) cls.__current_limit_up_day_datas[day] = current_limit_up_datas # 获取代码的k线 __total_codes = set() for d in cls.__current_limit_up_day_datas: __total_codes |= set([dd[3] for dd in cls.__history_limit_up_day_datas[d]]) # 获取最近7天的k线情况 for code in __total_codes: cls.__get_bars(code) # 统计前6天的板块信息 for day in __days: if day == now_day: continue cls.__block_day_datas[day] = cls.__statistics_limit_up_block_infos_by_day(day) def set_current_limit_up_data(self, day, datas): self.__current_limit_up_day_datas[day] = datas self.__history_limit_up_day_datas[day] = kpl_limit_up_data_manager.get_today_history_limit_up_datas_cache() # 加载代码K线数据 __total_codes = set([d[0] for d in datas]) __total_codes |= set([d[3] for d in self.__history_limit_up_day_datas[day]]) for code in __total_codes: self.__get_bars(code) @classmethod def __statistics_limit_up_block_infos_by_day(cls, day): """ 统计涨停代码信息 @return: """ # 统计板块的 current_code_dict = {d[0]: d for d in cls.__current_limit_up_day_datas[day]} # history_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas[day]} block_codes_dict = {} for h in cls.__history_limit_up_day_datas[day]: cls.__code_name_dict[h[3]] = h[4] if h[2] not in block_codes_dict: block_codes_dict[h[2]] = set() block_codes_dict[h[2]].add(h[3]) fdata = {} for b in block_codes_dict: limit_up_count = 0 open_limit_up_count = 0 for code in block_codes_dict[b]: if code in current_code_dict: limit_up_count += 1 else: open_limit_up_count += 1 fdata[b] = (limit_up_count, open_limit_up_count, block_codes_dict[b]) return fdata def statistics_limit_up_block_infos(self): """ 统计涨停板块数据 @return: """ # 只统计今天的数据 now_day = tool.get_now_date_str() block_dict = self.__statistics_limit_up_block_infos_by_day(now_day) self.__block_day_datas[now_day] = block_dict # 板块出现的天数 block_count_dict = {} # 板块出现的代码 block_codes_dict = {} for day in self.__block_day_datas: for b in self.__block_day_datas[day]: if b not in block_count_dict: block_count_dict[b] = set() if b not in block_codes_dict: block_codes_dict[b] = set() block_count_dict[b].add(day) block_codes_dict[b] |= self.__block_day_datas[day][b][2] block_count_list = [(k, block_count_dict[k]) for k in block_count_dict] block_count_list.sort(key=lambda x: x[1], reverse=True) block_count_list = block_count_list[:20] # [(涨停原因,累计涨停次数,连续次数)] fdatas = [] today_records_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas.get(now_day)} for d in block_count_list: b = d[0] fdata = [d[0], len(d[1])] temp = [] max_continue_count = 0 for day in self.__days: if d[0] in self.__block_day_datas[day]: temp.append(day) else: c = len(temp) if c > max_continue_count: max_continue_count = c temp.clear() c = len(temp) if c > max_continue_count: max_continue_count = c temp.clear() # 最大连续次数 fdata.append(max_continue_count) # 最高板 max_rate_info = None for code in block_codes_dict[d[0]]: if max_rate_info is None: max_rate_info = (code, self.__k_max_rate.get(code), self.__code_name_dict.get(code)) if max_rate_info[1] < self.__k_max_rate.get(code): max_rate_info = (code, self.__k_max_rate.get(code), self.__code_name_dict.get(code)) fdata.append(max_rate_info) # 统计今天这个板块中大于二板的代码数量 limit_up_counts = 0 for code in block_codes_dict[d[0]]: if code in today_records_code_dict and today_records_code_dict[code][12] != '首板': limit_up_counts += 1 fdata.append(limit_up_counts) # 获取每天的数量 days_datas = [] for day in self.__days: binfo = self.__block_day_datas[day].get(b) if not binfo: days_datas.append((0, 0)) else: days_datas.append((binfo[0], binfo[1])) fdata.append(days_datas) fdatas.append(fdata) return fdatas @classmethod def __get_bars(cls, code): """ 获取K线 @param code: @return: """ if code in cls.__k_datas: return cls.__k_datas[code] volumes_data = None if cls.__days: volumes_data = HistoryKDataManager().get_history_bars(code, cls.__days[1]) if volumes_data: volumes_data = volumes_data[:cls.__LATEST_DAY_COUNT - 1] cls.__k_datas[code] = volumes_data if not volumes_data: volumes_data = init_data_util.get_volumns_by_code(code, cls.__LATEST_DAY_COUNT - 1) if volumes_data: cls.__k_datas[code] = volumes_data # 获取最大涨幅 min_price = volumes_data[-1]["low"] for d in volumes_data: if min_price > d["low"]: min_price = d["low"] rate = int((volumes_data[0]["close"] - min_price) * 100 / min_price) cls.__k_max_rate[code] = rate return cls.__k_datas.get(code) if __name__ == "__main__": third_data/kpl_limit_up_data_manager.py
@@ -2,6 +2,9 @@ 开盘啦涨停数据管理 """ from third_data import kpl_util, kpl_data_manager from third_data.history_k_data_manager import HistoryKDataManager from third_data.history_k_data_util import HistoryKDatasUtils from utils import tool, init_data_util def get_current_limit_up_datas(day): @@ -30,3 +33,206 @@ @return: """ return kpl_data_manager.KPLLimitUpDataRecordManager.total_datas class LatestLimitUpBlockManager: """ 最近涨停的板块管理 """ # 看最近7天 __LATEST_DAY_COUNT = 7 __days = [] # 目前涨停 __current_limit_up_day_datas = {} # 曾涨停 __history_limit_up_day_datas = {} # K线数据 __k_datas = {} # 代码的最高涨幅 __k_max_rate = {} __code_name_dict = {} # 统计板块数据:{"day":{"板块":[(涨停数,破板数, 代码集合)]}} __block_day_datas = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(LatestLimitUpBlockManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): # 加载最近7天的数据 __days = HistoryKDatasUtils.get_latest_trading_date_cache(cls.__LATEST_DAY_COUNT - 1) now_day = tool.get_now_date_str() if __days[0] != now_day: __days.insert(0, now_day) cls.__days = __days # 加载之前6天的涨停,曾涨停,曾涨停代码的最近6天的K线 for day in __days: if day == now_day: continue limit_up_records = get_history_limit_up_datas(day) cls.__history_limit_up_day_datas[day] = limit_up_records current_limit_up_datas = get_current_limit_up_datas(day) cls.__current_limit_up_day_datas[day] = current_limit_up_datas # 获取代码的k线 __total_codes = set() for d in cls.__current_limit_up_day_datas: __total_codes |= set([dd[3] for dd in cls.__history_limit_up_day_datas[d]]) # 获取最近7天的k线情况 for code in __total_codes: cls.__get_bars(code) # 统计前6天的板块信息 for day in __days: if day == now_day: continue cls.__block_day_datas[day] = cls.__statistics_limit_up_block_infos_by_day(day) def set_current_limit_up_data(self, day, datas): self.__current_limit_up_day_datas[day] = datas self.__history_limit_up_day_datas[day] = get_today_history_limit_up_datas_cache() # 加载代码K线数据 __total_codes = set([d[0] for d in datas]) __total_codes |= set([d[3] for d in self.__history_limit_up_day_datas[day]]) for code in __total_codes: self.__get_bars(code) @classmethod def __statistics_limit_up_block_infos_by_day(cls, day): """ 统计涨停代码信息 @return: """ # 统计板块的 current_code_dict = {d[0]: d for d in cls.__current_limit_up_day_datas[day]} # history_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas[day]} block_codes_dict = {} for h in cls.__history_limit_up_day_datas[day]: cls.__code_name_dict[h[3]] = h[4] if h[2] not in block_codes_dict: block_codes_dict[h[2]] = set() block_codes_dict[h[2]].add(h[3]) fdata = {} for b in block_codes_dict: limit_up_count = 0 open_limit_up_count = 0 for code in block_codes_dict[b]: if code in current_code_dict: limit_up_count += 1 else: open_limit_up_count += 1 fdata[b] = (limit_up_count, open_limit_up_count, block_codes_dict[b]) return fdata def statistics_limit_up_block_infos(self): """ 统计涨停板块数据 @return: """ # 只统计今天的数据 now_day = tool.get_now_date_str() block_dict = self.__statistics_limit_up_block_infos_by_day(now_day) self.__block_day_datas[now_day] = block_dict # 板块出现的天数 block_count_dict = {} # 板块出现的代码 block_codes_dict = {} for day in self.__block_day_datas: for b in self.__block_day_datas[day]: if b not in block_count_dict: block_count_dict[b] = set() if b not in block_codes_dict: block_codes_dict[b] = set() block_count_dict[b].add(day) block_codes_dict[b] |= self.__block_day_datas[day][b][2] block_count_list = [(k, block_count_dict[k]) for k in block_count_dict] block_count_list.sort(key=lambda x: x[1], reverse=True) block_count_list = block_count_list[:20] # [(涨停原因,累计涨停次数,连续次数)] fdatas = [] today_records_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas.get(now_day)} for d in block_count_list: b = d[0] fdata = [d[0], len(d[1])] temp = [] max_continue_count = 0 for day in self.__days: if d[0] in self.__block_day_datas[day]: temp.append(day) else: c = len(temp) if c > max_continue_count: max_continue_count = c temp.clear() c = len(temp) if c > max_continue_count: max_continue_count = c temp.clear() # 最大连续次数 fdata.append(max_continue_count) # 最高板 max_rate_info = None for code in block_codes_dict[d[0]]: if max_rate_info is None: max_rate_info = (code, self.__k_max_rate.get(code), self.__code_name_dict.get(code)) if max_rate_info[1] < self.__k_max_rate.get(code): max_rate_info = (code, self.__k_max_rate.get(code), self.__code_name_dict.get(code)) fdata.append(max_rate_info) # 统计今天这个板块中大于二板的代码数量 limit_up_counts = 0 for code in block_codes_dict[d[0]]: if code in today_records_code_dict and today_records_code_dict[code][12] != '首板': limit_up_counts += 1 fdata.append(limit_up_counts) # 获取每天的数量 days_datas = [] for day in self.__days: binfo = self.__block_day_datas[day].get(b) if not binfo: days_datas.append((0, 0)) else: days_datas.append((binfo[0], binfo[1])) fdata.append(days_datas) fdatas.append(fdata) return fdatas @classmethod def __get_bars(cls, code): """ 获取K线 @param code: @return: """ if code in cls.__k_datas: return cls.__k_datas[code] volumes_data = None if cls.__days: volumes_data = HistoryKDataManager().get_history_bars(code, cls.__days[1]) if volumes_data: volumes_data = volumes_data[:cls.__LATEST_DAY_COUNT - 1] cls.__k_datas[code] = volumes_data if not volumes_data: volumes_data = init_data_util.get_volumns_by_code(code, cls.__LATEST_DAY_COUNT - 1) if volumes_data: cls.__k_datas[code] = volumes_data # 获取最大涨幅 min_price = volumes_data[-1]["low"] for d in volumes_data: if min_price > d["low"]: min_price = d["low"] rate = int((volumes_data[0]["close"] - min_price) * 100 / min_price) cls.__k_max_rate[code] = rate return cls.__k_datas.get(code)