Administrator
2023-07-06 7284224d58773be6da3c569be3d54ac3b2646661
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""
看盘端消息管理器
"""
import json
import queue
import threading
import time
 
from code_attribute import gpcode_manager
from logs import log
from db.redis_manager import RedisManager
from logs.log import logger_kp_msg
 
CLIENT_IDS = ["zjb", "hxh"]
 
__temp_msg_queue = queue.Queue()
 
 
class MsgQueueManager:
    __redisManager = RedisManager(3)
 
    def __get_redis(self):
        return self.__redisManager.getRedis()
 
    # 添加消息,2s内有效
    def add_msg(self, client_id, msg):
        self.__get_redis().lpush(f"kp_msg_queue-{client_id}", json.dumps((time.time() + 2, msg)))
 
    # 读取消息
    def read_msg(self, client_id):
        data = self.__get_redis().lpop(f"kp_msg_queue-{client_id}")
        if not data:
            return None
        data = json.loads(data)
        return data
 
 
# 运行采集器
__MsgQueueManager = MsgQueueManager()
 
 
# 添加消息
def add_msg(code, msg):
    # 根据代码获取名称
    name = gpcode_manager.get_code_name(code)
    msg = f"【{name}({code})】{msg}"
    __temp_msg_queue.put_nowait(msg)
    # 添加到日志
    logger_kp_msg.info(msg)
 
 
def read_msg(client_id):
    msg_data = __MsgQueueManager.read_msg(client_id)
    if not msg_data:
        return None
    expire_time = msg_data[0]
    msg = msg_data[1]
    if expire_time < time.time():
        # 过期
        return None
    return msg
 
 
# 读取本地消息列表
def list_msg_from_local():
    return log.get_kp_msg_list()
 
 
# 运行采集
def run_capture():
    def capture():
        while True:
            try:
                msg = __temp_msg_queue.get()
                if msg:
                    for c in CLIENT_IDS:
                        __MsgQueueManager.add_msg(c, msg)
            except:
                pass
            time.sleep(0.01)
 
    t1 = threading.Thread(target=lambda: capture())
    # 后台运行
    t1.setDaemon(True)
    t1.start()
 
 
if __name__ == "__main__":
    for i in range(0, 10):
        add_msg("600839", "买入成功")
    run_capture()
    input()