Administrator
2024-05-09 77fea830c2e11f94c364a37b8bda792a51d11eec
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import threading
 
import constant
from log_module import async_log_util
from log_module.log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_trade_record, logger_l2_trade, \
    logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel, logger_l2_error, logger_l2_d_cancel, logger_l2_f_cancel, \
    logger_l2_g_cancel, logger_l2_j_cancel
 
 
# 日志队列分配管理器
# 为每一个代码分配专门的管理器
class CodeLogQueueDistributeManager:
    def __init__(self, l2_channel_count):
        self.async_log_managers = [async_log_util.AsyncLogManager() for i in range(l2_channel_count)]
        # 存放代码分配的日志索引 如:{"000333":2}
        self.distributed_log_dict = {}
 
    def __get_avaiable_log_manager(self):
        distributed_indexes = set([self.distributed_log_dict[x] for x in self.distributed_log_dict])
        all_indexes = set([i for i in range(0, len(self.async_log_managers))])
        available_indexes = all_indexes - distributed_indexes
        if not available_indexes:
            raise Exception("已分配完")
        return available_indexes.pop()
 
    def distribute_log_manager(self, code):
        if code in self.distributed_log_dict:
            return
        log_manager_index = self.__get_avaiable_log_manager()
        if log_manager_index is not None:
            self.distributed_log_dict[code] = log_manager_index
 
    def realase_log_manager(self, code):
        if code in self.distributed_log_dict:
            self.distributed_log_dict.pop(code)
 
    # 设置L2订阅代码
    def set_l2_subscript_codes(self, codes):
        codes = set(codes)
        now_codes = set([code for code in self.distributed_log_dict])
        del_codes = now_codes - codes
        add_codes = codes - now_codes
        for c in del_codes:
            self.realase_log_manager(c)
        for c in add_codes:
            self.distribute_log_manager(c)
 
    def get_log_manager(self, code):
        if code in self.distributed_log_dict:
            return self.async_log_managers[self.distributed_log_dict[code]]
        return None
 
    # 运行同步服务
    def run_async(self):
        for m in self.async_log_managers:
            threading.Thread(target=lambda: m.run_sync(True), daemon=True).start()
 
 
codeLogQueueDistributeManager = CodeLogQueueDistributeManager(constant.HUAXIN_L2_MAX_CODES_COUNT)
 
threadIds = {}
 
 
def __add_async_log(logger_, code, content, *args):
    try:
        full_content = ""
        if len(args) > 0:
            full_content = ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args)
        else:
            full_content = "thread-id={} code={}  ".format(threadIds.get(code), code) + content
        async_log_manager = codeLogQueueDistributeManager.get_log_manager(code)
        if async_log_manager:
            async_log_manager.debug(logger_, full_content)
        else:
            async_log_util.debug(logger_, full_content)
    except Exception as e:
        logger_l2_error.exception(e)
 
 
def debug(code, content, *args):
    __add_async_log(logger_l2_trade, code, content, *args)
 
 
def info(code, logger_, content, *args):
    __add_async_log(logger_, code, content, *args)
 
 
def buy_debug(code, content, *args):
    __add_async_log(logger_l2_trade_buy, code, content, *args)
 
 
def cancel_debug(code, content, *args):
    __add_async_log(logger_l2_trade_cancel, code, content, *args)
 
 
def s_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_s_cancel, code, content, *args)
 
 
def h_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_h_cancel, code, content, *args)
 
 
def l_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_l_cancel, code, content, *args)
 
 
def d_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_d_cancel, code, content, *args)
 
 
def f_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_f_cancel, code, content, *args)
 
 
def g_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_g_cancel, code, content, *args)
 
 
def j_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_j_cancel, code, content, *args)
 
 
# 交易记录
def trade_record(code, type, content, *args):
    if len(args) > 0:
        async_log_util.debug(logger_trade_record,
                             ("thread-id={} code={} type={} data=".format(threadIds.get(code), code,
                                                                          type) + content).format(*args))
    else:
        async_log_util.debug(logger_trade_record,
                             "thread-id={} code={} type={} data=".format(threadIds.get(code), code,
                                                                         type) + content)
 
 
if __name__ == "__main__":
    codes = []
    for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
        codes.append(f"{i}" * 6)
    codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
    codes[0] = "123456"
    codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
    codeLogQueueDistributeManager.get_log_manager("000333")