"""
|
看盘端消息管理器
|
"""
|
import json
|
import queue
|
import threading
|
import time
|
|
from log_module.log import logger_kp_msg
|
from utils import log_export
|
|
CLIENT_IDS = ["zjb", "hxh"]
|
|
__temp_msg_queue = queue.Queue()
|
|
|
class MsgQueueManager:
|
__queue_dict = {}
|
|
# 添加消息,2s内有效
|
def add_msg(self, client_id, msg):
|
if client_id not in self.__queue_dict:
|
self.__queue_dict[client_id] = queue.Queue()
|
|
self.__queue_dict[client_id].put_nowait((time.time() + 2, msg))
|
|
# 读取消息
|
def read_msg(self, client_id):
|
if client_id in self.__queue_dict:
|
try:
|
return self.__queue_dict[client_id].get(block=False)
|
except:
|
pass
|
return None
|
|
|
# 运行采集器
|
_MsgQueueManager = MsgQueueManager()
|
|
|
# 添加消息
|
def add_msg(msg):
|
#
|
__temp_msg_queue.put_nowait(msg)
|
# # 添加到日志
|
logger_kp_msg.info(msg)
|
|
|
def read_msg(client_id):
|
msg_data = _MsgQueueManager.read_msg(client_id)
|
if not msg_data:
|
return None
|
expire_time = msg_data[1]
|
msg = msg_data[2]
|
if expire_time < time.time():
|
# 过期
|
return None
|
return msg
|
|
|
# 读取本地消息列表
|
def list_msg_from_local():
|
return log_export.get_kp_msg_list()
|
|
|
# 运行采集
|
def run_capture():
|
def capture():
|
while True:
|
try:
|
msg = __temp_msg_queue.get()
|
if msg:
|
for c in CLIENT_IDS:
|
_MsgQueueManager.add_msg(c, msg)
|
except:
|
pass
|
time.sleep(0.01)
|
|
t1 = threading.Thread(target=lambda: capture())
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
|
|
if __name__ == "__main__":
|
for i in range(0, 10):
|
add_msg("600839", "买入成功")
|
run_capture()
|
input()
|