Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""
看盘端消息管理器
"""
import json
import queue
import threading
import time
 
from code_attribute import gpcode_manager
from log_module import log_export
from utils import middle_api_protocol, tool
 
CLIENT_IDS = ["zjb", "hxh"]
 
__temp_msg_queue = queue.Queue(maxsize=1000)
 
 
class MsgQueueManager:
    __queue_dict = {}
 
    # 添加消息,2s内有效
    def add_msg(self, client_id, msg):
        if client_id not in self.__queue_dict:
            self.__queue_dict[client_id] = queue.Queue(maxsize=1000)
 
        self.__queue_dict[client_id].put_nowait((time.time() + 2, msg))
 
    # 读取消息
    def read_msg(self, client_id):
        if client_id in self.__queue_dict:
            try:
                return self.__queue_dict[client_id].get(block=False)
            except:
                pass
        return None
 
 
# 运行采集器
__MsgQueueManager = MsgQueueManager()
 
 
# 添加消息
@tool.async_call
def add_msg(code, msg):
    # 根据代码获取名称
    name = gpcode_manager.get_code_name(code)
    msg = f"【{name}({code})】{msg}"
    fdata = middle_api_protocol.load_kp_msg(msg)
    middle_api_protocol.request(fdata)
    #
    # __temp_msg_queue.put_nowait(msg)
    # # 添加到日志
    # logger_kp_msg.info(msg)
 
 
def read_msg(client_id):
    msg_data = __MsgQueueManager.read_msg(client_id)
    if not msg_data:
        return None
    expire_time = msg_data[1]
    msg = msg_data[2]
    if expire_time < time.time():
        # 过期
        return None
    return msg
 
 
# 读取本地消息列表
def list_msg_from_local():
    return log_export.get_kp_msg_list()
 
 
# 运行采集
def run_capture():
    def capture():
        while True:
            try:
                msg = __temp_msg_queue.get()
                if msg:
                    for c in CLIENT_IDS:
                        __MsgQueueManager.add_msg(c, msg)
            except:
                pass
            time.sleep(0.01)
 
    t1 = threading.Thread(target=lambda: capture())
    # 后台运行
    t1.setDaemon(True)
    t1.start()
 
 
if __name__ == "__main__":
    for i in range(0, 10):
        add_msg("600839", "买入成功")
    run_capture()
    input()