Administrator
2023-11-14 5715545bec1d88fe9cc4ea79db0a5d1148694590
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
L2数据监听
"""
import multiprocessing
import threading
import time
 
from log_module import async_log_util
from log_module.log import logger_debug
 
__l2_order_active_time_dict = {}
__l2_transaction_active_time_dict = {}
 
 
class L2DataListenManager:
    TYPE_ORDER = "order"
    TYPE_TRANSACTION = "transaction"
    TYPE_MARKET = "market"
 
    def __init__(self, l2_data_callback):
        self.my_l2_data_callback = l2_data_callback
        self.__l2_order_active_time_dict = {}
        self.__l2_transaction_active_time_dict = {}
        self.__l2_market_active_time_dict = {}
 
    # 接收L2逐笔委托数据
    def __recive_l2_orders(self, q: multiprocessing.Queue):
        __id = id(q)
        count = 0
        while True:
            # datas_dict = {}
            try:
                if not q.empty():
                    item = q.get()
                    self.my_l2_data_callback.OnL2Order(item[0], item[1], item[2])
                else:
                    time.sleep(0.001)
 
                # while not q.empty():
                #     item = q.get()
                #     if item[0] not in datas_dict:
                #         datas_dict[item[0]] = []
                #     datas_dict[item[0]].append(item)
                # if datas_dict:
                #     for c in datas_dict:
                #         self.my_l2_data_callback.OnL2Order(c, datas_dict[c], datas_dict[c][0][10])
                # else:
                #     time.sleep(0.002)
            except Exception as e:
                async_log_util.exception(logger_debug, e)
            finally:
                # datas_dict.clear()
                count += 1
                if count > 100:
                    count = 0
                    # 记录活跃时间,每100次记录一次
                    self.__l2_order_active_time_dict[__id] = time.time()
 
    # 接收L2逐笔成交数据
    def __recive_transaction_orders(self, q: multiprocessing.Queue):
        __id = id(q)
        # datas_dict = {}
        count = 0
        while True:
            try:
                # while not q.empty():
                #     item = q.get()
                #     if item[0] not in datas_dict:
                #         datas_dict[item[0]] = []
                #     datas_dict[item[0]].append(item)
                # if datas_dict:
                #     for c in datas_dict:
                #         self.my_l2_data_callback.OnL2Transaction(c, datas_dict[c])
                # else:
                #     time.sleep(0.01)
                if not q.empty():
                    item = q.get()
                    self.my_l2_data_callback.OnL2Transaction(item[0], item[1])
                else:
                    time.sleep(0.005)
            except Exception as e:
                async_log_util.exception(logger_debug, e)
            finally:
                # datas_dict.clear()
                count += 1
                if count > 50:
                    count = 0
                    # 记录活跃时间,每100次记录一次
                    self.__l2_transaction_active_time_dict[__id] = time.time()
 
    def __recive_l2_markets(self, q: multiprocessing.Queue):
        __id = id(q)
        while True:
            try:
                if not q.empty():
                    item = q.get()
                    self.my_l2_data_callback.OnMarketData(item['securityID'], item)
                else:
                    time.sleep(0.002)
            except Exception as e:
                async_log_util.exception(logger_debug, e)
            finally:
                self.__l2_market_active_time_dict[__id] = time.time()
 
    # 接收L2数据
    def receive_l2_data(self, order_queues, transaction_queues, market_queue):
        for q in order_queues:
            t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
            t1.start()
        for q in transaction_queues:
            t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True)
            t2.start()
        t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True)
        t3.start()
 
    def get_active_count(self, type_):
        expire_time = time.time() - 5
        active_count = 0
        if type_ == self.TYPE_ORDER:
            for _id in self.__l2_order_active_time_dict:
                if self.__l2_order_active_time_dict[_id] > expire_time:
                    active_count += 1
        elif type_ == self.TYPE_TRANSACTION:
            for _id in self.__l2_transaction_active_time_dict:
                if self.__l2_transaction_active_time_dict[_id] > expire_time:
                    active_count += 1
        elif type_ == self.TYPE_MARKET:
            for _id in self.__l2_market_active_time_dict:
                if self.__l2_market_active_time_dict[_id] > expire_time:
                    active_count += 1
        return active_count