"""
|
看盘端消息管理器
|
"""
|
import json
|
import queue
|
import threading
|
import time
|
|
from code_attribute import gpcode_manager
|
from log_module import log, log_export
|
from db.redis_manager import RedisManager, RedisUtils
|
from log_module.log import logger_kp_msg
|
|
CLIENT_IDS = ["zjb", "hxh"]
|
|
__temp_msg_queue = queue.Queue()
|
|
|
class MsgQueueManager:
|
__redisManager = RedisManager(3)
|
|
def __get_redis(self):
|
return self.__redisManager.getRedis()
|
|
# 添加消息,2s内有效
|
def add_msg(self, client_id, msg):
|
RedisUtils.lpush(
|
self.__get_redis(), f"kp_msg_queue-{client_id}", json.dumps((time.time() + 2, msg)))
|
|
# 读取消息
|
def read_msg(self, client_id):
|
data = RedisUtils.lpop(self.__get_redis(), f"kp_msg_queue-{client_id}")
|
if not data:
|
return None
|
data = json.loads(data)
|
return data
|
|
|
# 运行采集器
|
__MsgQueueManager = MsgQueueManager()
|
|
|
# 添加消息
|
def add_msg(code, msg):
|
# 根据代码获取名称
|
name = gpcode_manager.get_code_name(code)
|
msg = f"【{name}({code})】{msg}"
|
__temp_msg_queue.put_nowait(msg)
|
# 添加到日志
|
logger_kp_msg.info(msg)
|
|
|
def read_msg(client_id):
|
msg_data = __MsgQueueManager.read_msg(client_id)
|
if not msg_data:
|
return None
|
expire_time = msg_data[0]
|
msg = msg_data[1]
|
if expire_time < time.time():
|
# 过期
|
return None
|
return msg
|
|
|
# 读取本地消息列表
|
def list_msg_from_local():
|
return log_export.get_kp_msg_list()
|
|
|
# 运行采集
|
def run_capture():
|
def capture():
|
while True:
|
try:
|
msg = __temp_msg_queue.get()
|
if msg:
|
for c in CLIENT_IDS:
|
__MsgQueueManager.add_msg(c, msg)
|
except:
|
pass
|
time.sleep(0.01)
|
|
t1 = threading.Thread(target=lambda: capture())
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
|
|
if __name__ == "__main__":
|
for i in range(0, 10):
|
add_msg("600839", "买入成功")
|
run_capture()
|
input()
|