admin
2024-05-22 4ee1bd5de9ca76a69adac6e17a11bd686c742ef3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""
看盘端消息管理器
"""
import json
import queue
import threading
import time
 
from log_module.log import logger_kp_msg
from utils import log_export
 
CLIENT_IDS = ["zjb", "hxh"]
 
__temp_msg_queue = queue.Queue()
 
 
class MsgQueueManager:
    __queue_dict = {}
 
    # 添加消息,2s内有效
    def add_msg(self, client_id, msg):
        if client_id not in self.__queue_dict:
            self.__queue_dict[client_id] = queue.Queue()
 
        self.__queue_dict[client_id].put_nowait((time.time() + 2, msg))
 
    # 读取消息
    def read_msg(self, client_id):
        if client_id in self.__queue_dict:
            try:
                return self.__queue_dict[client_id].get(block=False)
            except:
                pass
        return None
 
 
# 运行采集器
_MsgQueueManager = MsgQueueManager()
 
 
# 添加消息
def add_msg(msg):
    #
    __temp_msg_queue.put_nowait(msg)
    # # 添加到日志
    logger_kp_msg.info(msg)
 
 
def read_msg(client_id):
    msg_data = _MsgQueueManager.read_msg(client_id)
    if not msg_data:
        return None
    expire_time = msg_data[1]
    msg = msg_data[2]
    if expire_time < time.time():
        # 过期
        return None
    return msg
 
 
# 读取本地消息列表
def list_msg_from_local():
    return log_export.get_kp_msg_list()
 
 
# 运行采集
def run_capture():
    def capture():
        while True:
            try:
                msg = __temp_msg_queue.get()
                if msg:
                    for c in CLIENT_IDS:
                        _MsgQueueManager.add_msg(c, msg)
            except:
                pass
            time.sleep(0.01)
 
    t1 = threading.Thread(target=lambda: capture())
    # 后台运行
    t1.setDaemon(True)
    t1.start()
 
 
if __name__ == "__main__":
    for i in range(0, 10):
        add_msg("600839", "买入成功")
    run_capture()
    input()