"""
|
L1 Tick数据数据管理
|
"""
|
import queue
|
import threading
|
|
from local_api.util import tool
|
|
|
class TickQueueManager:
|
def __init__(self, queue_count, callback):
|
self.__queue_dict = {}
|
self.__code_queue_dict = {}
|
self.__queue_code_count_dict = {}
|
self.callback = callback
|
for i in range(queue_count):
|
_queue = queue.Queue()
|
_id = id(_queue)
|
self.__queue_dict[_id] = _queue
|
self.__queue_code_count_dict[_id] = 0
|
self.__latest_tick_time_dict = {}
|
|
def add_tick(self, code, data):
|
# 判断时间
|
if self.__latest_tick_time_dict.get(code) == data[7]:
|
return
|
self.__latest_tick_time_dict[code] = data[7]
|
# 分配队列
|
if code not in self.__code_queue_dict:
|
# 分配队列
|
tl = [(k, self.__queue_code_count_dict[k]) for k in self.__queue_code_count_dict]
|
tl.sort(key=lambda x: x[1])
|
queue_id = tl[0][0]
|
self.__code_queue_dict[code] = queue_id
|
self.__queue_code_count_dict[queue_id] += 1
|
queue_id = self.__code_queue_dict[code]
|
queue_: queue.Queue = self.__queue_dict[queue_id]
|
queue_.put_nowait(data)
|
|
def __read_queue(self, q: queue.Queue):
|
while True:
|
try:
|
data = q.get()
|
self.callback(data)
|
except:
|
pass
|
finally:
|
pass
|
|
def run(self):
|
for queue_id in self.__queue_dict:
|
threading.Thread(target=self.__read_queue, args=(self.__queue_dict[queue_id],), daemon=True).start()
|
|
|
if __name__ == '__main__':
|
def test_callback(data):
|
print(f"{tool.get_thread_id()}-{data}" )
|
|
|
tickQueueManager = TickQueueManager(3, test_callback)
|
for i in range(100):
|
tickQueueManager.add_tick(f"{i}", (i))
|
tickQueueManager.run()
|
|
input()
|