"""
|
看盘端消息管理器
|
"""
|
import json
|
import queue
|
import threading
|
import time
|
|
from code_attribute import gpcode_manager
|
from log_module import log_export
|
from utils import middle_api_protocol, tool
|
|
CLIENT_IDS = ["zjb", "hxh"]
|
|
__temp_msg_queue = queue.Queue(maxsize=1000)
|
|
|
class MsgQueueManager:
|
__queue_dict = {}
|
|
# 添加消息,2s内有效
|
def add_msg(self, client_id, msg):
|
if client_id not in self.__queue_dict:
|
self.__queue_dict[client_id] = queue.Queue(maxsize=1000)
|
|
self.__queue_dict[client_id].put_nowait((time.time() + 2, msg))
|
|
# 读取消息
|
def read_msg(self, client_id):
|
if client_id in self.__queue_dict:
|
try:
|
return self.__queue_dict[client_id].get(block=False)
|
except:
|
pass
|
return None
|
|
|
# 运行采集器
|
__MsgQueueManager = MsgQueueManager()
|
|
|
# 添加消息
|
@tool.async_call
|
def add_msg(code, msg):
|
# 根据代码获取名称
|
name = gpcode_manager.get_code_name(code)
|
msg = f"【{name}({code})】{msg}"
|
fdata = middle_api_protocol.load_kp_msg(msg)
|
middle_api_protocol.request(fdata)
|
#
|
# __temp_msg_queue.put_nowait(msg)
|
# # 添加到日志
|
# logger_kp_msg.info(msg)
|
|
|
def read_msg(client_id):
|
msg_data = __MsgQueueManager.read_msg(client_id)
|
if not msg_data:
|
return None
|
expire_time = msg_data[1]
|
msg = msg_data[2]
|
if expire_time < time.time():
|
# 过期
|
return None
|
return msg
|
|
|
# 读取本地消息列表
|
def list_msg_from_local():
|
return log_export.get_kp_msg_list()
|
|
|
# 运行采集
|
def run_capture():
|
def capture():
|
while True:
|
try:
|
msg = __temp_msg_queue.get()
|
if msg:
|
for c in CLIENT_IDS:
|
__MsgQueueManager.add_msg(c, msg)
|
except:
|
pass
|
time.sleep(0.01)
|
|
t1 = threading.Thread(target=lambda: capture())
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
|
|
if __name__ == "__main__":
|
for i in range(0, 10):
|
add_msg("600839", "买入成功")
|
run_capture()
|
input()
|