""" 环境信息 """ import json from strategy.strategy_variable_factory import DataLoader from third_data.history_k_data_manager import HistoryKDataManager from third_data.kpl_block_manager import KPLCodeJXBlocksManager from utils import tool @tool.singleton class RealTimeEnvInfo: """ 实时信息 """ def __init__(self): # 大单更新时间 self.big_order_update_time = '' # 板块流入信息 (更新时间, 数据数量) self.block_in = ('', 0) # 开盘啦实时涨停信息(更新时间, 数据数量) self.kpl_current_limit_up = ('', 0) # Tick数据(更新时间, 数据数量) self.ticks = ('', 0) def to_dict(self): d = self.__dict__ return d @classmethod def to_obj(cls, json_str): result = json.loads(json_str) obj = RealTimeEnvInfo() for k in result: setattr(obj, k, result[k]) return obj def get_leading_limit_up_block_codes_count(day): """ 获取领涨板块的代码数量 @param day: @return: """ codes_info = DataLoader(day).load_all_buy_plates_of_codes() return len(codes_info) def get_history_k_bars(day): """ 获取历史K线数量 @param day: @return: """ codes = HistoryKDataManager().get_history_bars_codes(day) count = len(codes) return count def get_kpl_code_jx_blocks(day): """ 获取开盘啦代码精选板块数量 @param day: @return: """ count = KPLCodeJXBlocksManager(day, set()).get_all_code_blocks_count() return count if __name__ == "__main__": RealTimeEnvInfo().kpl_current_limit_up = tool.get_now_time_str() print(RealTimeEnvInfo().to_dict())