"""
|
推送消息管理器
|
"""
|
import concurrent.futures
|
|
from utils import middle_api_protocol
|
|
TYPE_ORDER_ALMOST_DEAL = "order_almost_deal" # 订单即将成交
|
|
TYPE_DELEGATE_QUEUE_CHANGE = "delegate_queue_change" # 委托队列变化
|
|
TYPE_DELEGATE_ORDER_DANGER = "delegate_order_danger" # 委托的订单危险
|
|
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
|
|
__order_almost_deal_filter_dict = {}
|
|
__delegate_order_danger_filter_dict = {}
|
|
|
# 推送订单快成交信息
|
def push_order_almost_deal(code, code_name, buy_single_index, msg, ctype='', force=False):
|
if code not in __order_almost_deal_filter_dict:
|
__order_almost_deal_filter_dict[code] = set()
|
notify_key = f"{buy_single_index}_{ctype}"
|
if force or notify_key not in __order_almost_deal_filter_dict[code]:
|
__order_almost_deal_filter_dict[code].add(notify_key)
|
# 强制推送或这次下单之前未推送
|
__push_msg(TYPE_ORDER_ALMOST_DEAL,
|
data={"code": code, "code_name": code_name, "msg": msg})
|
|
|
# 推送委托的订单危险信息
|
def push_delegate_order_danger(code, code_name, buy_single_index, msg, ctype='', force=False):
|
if code not in __delegate_order_danger_filter_dict:
|
__delegate_order_danger_filter_dict[code] = set()
|
notify_key = f"{buy_single_index}_{ctype}"
|
if force or notify_key not in __delegate_order_danger_filter_dict[code]:
|
__delegate_order_danger_filter_dict[code].add(notify_key)
|
# 强制推送或这次下单之前未推送
|
__push_msg(TYPE_DELEGATE_ORDER_DANGER,
|
data={"code": code, "code_name": code_name, "msg": msg})
|
|
|
# 推送委托队列变化消息
|
def push_delegate_queue_update():
|
__push_msg(TYPE_DELEGATE_QUEUE_CHANGE)
|
|
|
# 添加消息
|
def __push_msg(msg_type, data=None):
|
def push():
|
fdata = {"type": msg_type}
|
if data:
|
fdata["data"] = data
|
middle_api_protocol.request(middle_api_protocol.load_push_msg(fdata))
|
|
thread_pool.submit(push)
|