"""
|
环境信息
|
"""
|
import json
|
|
from strategy.strategy_variable_factory import DataLoader
|
from third_data.history_k_data_manager import HistoryKDataManager
|
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
|
from utils import tool
|
|
|
@tool.singleton
|
class RealTimeEnvInfo:
|
"""
|
实时信息
|
"""
|
|
def __init__(self):
|
# 大单更新时间
|
self.big_order_update_time = ''
|
# 板块流入信息 (更新时间, 数据数量)
|
self.block_in = ('', 0)
|
# 开盘啦实时涨停信息(更新时间, 数据数量)
|
self.kpl_current_limit_up = ('', 0)
|
# Tick数据(更新时间, 数据数量)
|
self.ticks = ('', 0)
|
|
def to_dict(self):
|
d = self.__dict__
|
return d
|
|
@classmethod
|
def to_obj(cls, json_str):
|
result = json.loads(json_str)
|
obj = RealTimeEnvInfo()
|
for k in result:
|
setattr(obj, k, result[k])
|
return obj
|
|
|
def get_leading_limit_up_block_codes_count(day):
|
"""
|
获取领涨板块的代码数量
|
@param day:
|
@return:
|
"""
|
codes_info = DataLoader(day).load_all_buy_plates_of_codes()
|
return len(codes_info)
|
|
|
def get_history_k_bars(day):
|
"""
|
获取历史K线数量
|
@param day:
|
@return:
|
"""
|
codes = HistoryKDataManager().get_history_bars_codes(day)
|
count = len(codes)
|
return count
|
|
|
def get_kpl_code_jx_blocks(day):
|
"""
|
获取开盘啦代码精选板块数量
|
@param day:
|
@return:
|
"""
|
count = KPLCodeJXBlocksManager(day, set()).get_all_code_blocks_count()
|
return count
|
|
|
if __name__ == "__main__":
|
print(get_history_k_bars("2025-06-04"))
|