admin
2024-06-28 e0b77a03d87eb9bafd08e35492f918b5b8b0fbb5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""
L1 Tick数据数据管理
"""
import queue
import threading
 
from local_api.util import tool
 
 
class TickQueueManager:
    def __init__(self, queue_count, callback):
        self.__queue_dict = {}
        self.__code_queue_dict = {}
        self.__queue_code_count_dict = {}
        self.callback = callback
        for i in range(queue_count):
            _queue = queue.Queue()
            _id = id(_queue)
            self.__queue_dict[_id] = _queue
            self.__queue_code_count_dict[_id] = 0
        self.__latest_tick_time_dict = {}
 
    def add_tick(self, code, data):
        # 判断时间
        if self.__latest_tick_time_dict.get(code) == data[7]:
            return
        self.__latest_tick_time_dict[code] = data[7]
        # 分配队列
        if code not in self.__code_queue_dict:
            # 分配队列
            tl = [(k, self.__queue_code_count_dict[k]) for k in self.__queue_code_count_dict]
            tl.sort(key=lambda x: x[1])
            queue_id = tl[0][0]
            self.__code_queue_dict[code] = queue_id
            self.__queue_code_count_dict[queue_id] += 1
        queue_id = self.__code_queue_dict[code]
        queue_: queue.Queue = self.__queue_dict[queue_id]
        queue_.put_nowait(data)
 
    def __read_queue(self, q: queue.Queue):
        while True:
            try:
                data = q.get()
                self.callback(data)
            except:
                pass
            finally:
                pass
 
    def run(self):
        for queue_id in self.__queue_dict:
            threading.Thread(target=self.__read_queue, args=(self.__queue_dict[queue_id],), daemon=True).start()
 
 
if __name__ == '__main__':
    def test_callback(data):
        print(f"{tool.get_thread_id()}-{data}" )
 
 
    tickQueueManager = TickQueueManager(3, test_callback)
    for i in range(100):
        tickQueueManager.add_tick(f"{i}", (i))
    tickQueueManager.run()
 
    input()