""" L1 Tick数据数据管理 """ import queue import threading from local_api.util import tool class TickQueueManager: def __init__(self, queue_count, callback): self.__queue_dict = {} self.__code_queue_dict = {} self.__queue_code_count_dict = {} self.callback = callback for i in range(queue_count): _queue = queue.Queue() _id = id(_queue) self.__queue_dict[_id] = _queue self.__queue_code_count_dict[_id] = 0 self.__latest_tick_time_dict = {} def add_tick(self, code, data): # 判断时间 if self.__latest_tick_time_dict.get(code) == data[7]: return self.__latest_tick_time_dict[code] = data[7] # 分配队列 if code not in self.__code_queue_dict: # 分配队列 tl = [(k, self.__queue_code_count_dict[k]) for k in self.__queue_code_count_dict] tl.sort(key=lambda x: x[1]) queue_id = tl[0][0] self.__code_queue_dict[code] = queue_id self.__queue_code_count_dict[queue_id] += 1 queue_id = self.__code_queue_dict[code] queue_: queue.Queue = self.__queue_dict[queue_id] queue_.put_nowait(data) def __read_queue(self, q: queue.Queue): while True: try: data = q.get() self.callback(data) except: pass finally: pass def run(self): for queue_id in self.__queue_dict: threading.Thread(target=self.__read_queue, args=(self.__queue_dict[queue_id],), daemon=True).start() if __name__ == '__main__': def test_callback(data): print(f"{tool.get_thread_id()}-{data}" ) tickQueueManager = TickQueueManager(3, test_callback) for i in range(100): tickQueueManager.add_tick(f"{i}", (i)) tickQueueManager.run() input()