Administrator
4 天以前 f7b498d426fc560f0268d5739880e6261b4b6d9d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""
环境信息
"""
import json
 
from strategy.strategy_variable_factory import DataLoader
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
from utils import tool
 
 
@tool.singleton
class RealTimeEnvInfo:
    """
    实时信息
    """
 
    def __init__(self):
        # 大单更新时间
        self.big_order_update_time = ''
        # 板块流入信息 (更新时间, 数据数量)
        self.block_in = ('', 0)
        # 开盘啦实时涨停信息(更新时间, 数据数量)
        self.kpl_current_limit_up = ('', 0)
        # Tick数据(更新时间, 数据数量)
        self.ticks = ('', 0)
 
    def to_dict(self):
        d = self.__dict__
        return d
 
    @classmethod
    def to_obj(cls, json_str):
        result = json.loads(json_str)
        obj = RealTimeEnvInfo()
        for k in result:
            setattr(obj, k, result[k])
        return obj
 
 
def get_leading_limit_up_block_codes_count(day):
    """
    获取领涨板块的代码数量
    @param day:
    @return:
    """
    codes_info = DataLoader(day).load_all_buy_plates_of_codes()
    return len(codes_info)
 
 
def get_history_k_bars(day):
    """
    获取历史K线数量
    @param day:
    @return:
    """
    codes = HistoryKDataManager().get_history_bars_codes(day)
    count = len(codes)
    return count
 
 
def get_kpl_code_jx_blocks(day):
    """
    获取开盘啦代码精选板块数量
    @param day:
    @return:
    """
    count = KPLCodeJXBlocksManager(day, set()).get_all_code_blocks_count()
    return count
 
 
if __name__ == "__main__":
    RealTimeEnvInfo().kpl_current_limit_up = tool.get_now_time_str()
    print(RealTimeEnvInfo().to_dict())