# -*- coding: utf-8 -*-
|
"""
|
命令管理器
|
"""
|
import concurrent.futures
|
import json
|
import logging
|
import multiprocessing
|
import threading
|
import time
|
|
import zmq
|
|
from log_module import async_log_util
|
from log_module.log import logger_local_huaxin_trade_debug, logger_trade, logger_local_huaxin_contact_debug
|
|
MSG_TYPE_HEART = "heart"
|
# 命令信息
|
MSG_TYPE_CMD = "cmd"
|
|
CLIENT_TYPE_TRADE = "trade"
|
CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
|
CLIENT_TYPE_DEAL_LIST = "deal_list"
|
CLIENT_TYPE_POSITION_LIST = "position_list"
|
CLIENT_TYPE_MONEY = "money"
|
CLIENT_TYPE_DEAL = "deal"
|
|
CLIENT_TYPE_CMD_L2 = "l2_cmd"
|
|
# 心跳时间间隔
|
HEART_SPACE_TIME = 3
|
|
|
class TradeActionCallback(object):
|
# 交易
|
def OnTrade(self, client_id, request_id, sk, type_, data):
|
pass
|
|
# 委托列表
|
def OnDelegateList(self, client_id, request_id, sk, can_cancel):
|
pass
|
|
# 成交列表
|
def OnDealList(self, client_id, request_id, sk):
|
pass
|
|
# 成交列表
|
def OnPositionList(self, client_id, request_id, sk):
|
pass
|
|
# 获取资金信息
|
def OnMoney(self, client_id, request_id, sk):
|
pass
|
|
# 测试
|
def OnTest(self, client_id, request_id, data, sk):
|
pass
|
|
|
class L2ActionCallback(object):
|
# 监听L2数据
|
def OnSetL2Position(self, codes_data):
|
pass
|
|
|
# 交易指令管理
|
class TradeCommandManager:
|
trade_client_dict = {}
|
_instance = None
|
process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
|
|
def __new__(cls, *args, **kwargs):
|
if not cls._instance:
|
cls._instance = super().__new__(cls, *args, **kwargs)
|
return cls._instance
|
|
@classmethod
|
def init(cls, trade_action_callback: TradeActionCallback,
|
queue_strategy_trade_read_for_trade: multiprocessing.Queue,
|
queue_strategy_trade_read_for_read: multiprocessing.Queue):
|
cls.action_callback = trade_action_callback
|
cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade
|
cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read
|
|
@classmethod
|
def process_command(cls, _type, client_id, result_json, sk=None):
|
async_log_util.info(logger_local_huaxin_contact_debug, f"process_command: {result_json}")
|
# 查看是否是设置L2的代码
|
try:
|
data = result_json["data"]
|
request_id = result_json.get('request_id')
|
# 暂时取消签名
|
# if not socket_util.is_client_params_sign_right(result_json):
|
# print("签名错误")
|
# # 签名出错
|
# SendResponseSkManager.send_error_response(_type, request_id, client_id,
|
# {"code": -1, "msg": "签名错误"})
|
# return
|
|
if _type == CLIENT_TYPE_TRADE:
|
# 交易
|
ctype = data["trade_type"]
|
async_log_util.info(logger_trade, f"交易开始:{request_id}")
|
cls.action_callback.OnTrade(client_id, request_id, sk, ctype, data)
|
async_log_util.info(logger_trade, f"交易结束:{request_id}")
|
elif _type == CLIENT_TYPE_MONEY:
|
cls.action_callback.OnMoney(client_id, request_id, sk)
|
elif _type == CLIENT_TYPE_DEAL_LIST:
|
cls.action_callback.OnDealList(client_id, request_id, sk)
|
elif _type == CLIENT_TYPE_DELEGATE_LIST:
|
can_cancel = data["can_cancel"]
|
cls.action_callback.OnDelegateList(client_id, request_id, sk, can_cancel)
|
elif _type == CLIENT_TYPE_POSITION_LIST:
|
cls.action_callback.OnPositionList(client_id, request_id, sk)
|
elif _type == "test":
|
cls.action_callback.OnTest(client_id, request_id, data, sk)
|
except Exception as e:
|
async_log_util.error(logger_local_huaxin_contact_debug, f"process_command出错: {result_json}")
|
# logging.exception(e)
|
# logging.error(result_json)
|
|
@classmethod
|
def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue):
|
if queue_strategy_trade is None:
|
return
|
# 本地命令接收
|
try:
|
while True:
|
try:
|
val = queue_strategy_trade.get()
|
if val:
|
_type = val["type"]
|
if _type != "test":
|
async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}")
|
cls.process_command(_type, None, val)
|
except Exception as e:
|
logger_local_huaxin_trade_debug.exception(e)
|
except Exception as e:
|
async_log_util.exception(logger_local_huaxin_trade_debug, e)
|
|
@classmethod
|
def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue):
|
if queue_strategy_trade_read_trade is None:
|
return
|
# 本地命令接收
|
try:
|
while True:
|
try:
|
val = queue_strategy_trade_read_trade.get()
|
if val:
|
_type = val["type"]
|
if _type != "test":
|
async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}")
|
cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val))
|
except Exception as e:
|
async_log_util.exception(logger_local_huaxin_trade_debug, e)
|
logging.exception(e)
|
except Exception as e:
|
async_log_util.exception(logger_local_huaxin_trade_debug, e)
|
|
###############ZEROMQ协议接收命令#################
|
@classmethod
|
def __create_order_command_reciever(cls, ipc_addr):
|
"""
|
接收下单命令
|
@param ipc_addr: ipc地址
|
@return:
|
"""
|
context = zmq.Context()
|
socket = context.socket(zmq.REP)
|
socket.bind(ipc_addr)
|
while True:
|
data = socket.recv_json()
|
try:
|
request_id = data.get('request_id')
|
use_time = time.time() - data.get('time')
|
data = data.get('data')
|
cls.action_callback.OnTrade(None, request_id, None, 1, data)
|
async_log_util.info(logger_local_huaxin_trade_debug, f"下单通信耗时: {round(use_time*1000,3)}ms request_id:{request_id}")
|
except Exception as e:
|
logger_local_huaxin_trade_debug.exception(e)
|
finally:
|
socket.send_string("SUCCESS")
|
|
@classmethod
|
def __create_cancel_order_command_reciever(cls, ipc_addr):
|
"""
|
接收撤单命令
|
@param ipc_addr: ipc地址
|
@return:
|
"""
|
context = zmq.Context()
|
socket = context.socket(zmq.REP)
|
socket.bind(ipc_addr)
|
while True:
|
data = socket.recv_json()
|
try:
|
request_id = data.get('request_id')
|
use_time = time.time() - data.get('time')
|
data = data.get('data')
|
cls.action_callback.OnTrade(None, request_id, None, 2, data)
|
async_log_util.info(logger_local_huaxin_trade_debug, f"撤单通信耗时: {round(use_time*1000,3)}ms request_id:{request_id}")
|
|
except Exception as e:
|
logger_local_huaxin_trade_debug.exception(e)
|
finally:
|
socket.send_string("SUCCESS")
|
|
# 维护连接数的稳定
|
def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True):
|
if blocking:
|
t1 = threading.Thread(
|
target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
|
t1.start()
|
t1 = threading.Thread(
|
target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
|
t1.start()
|
t1 = threading.Thread(
|
target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
|
t1.start()
|
self.run_process_command(self.queue_strategy_trade_read)
|
else:
|
# 接受命令
|
t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
|
t1.start()
|
t1 = threading.Thread(
|
target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
|
t1.start()
|
t1 = threading.Thread(
|
target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
|
t1.start()
|
t1 = threading.Thread(
|
target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
|
t1.start()
|
|
|
# L2指令管理
|
class L2CommandManager:
|
action_callback = None
|
|
@classmethod
|
def init(cls, l2_action_callback):
|
cls.action_callback = l2_action_callback
|
|
@classmethod
|
def process_command(cls, _type, client_id, result_json):
|
data = result_json["data"]
|
ctype = result_json["type"]
|
if ctype == CLIENT_TYPE_CMD_L2:
|
cls.action_callback.OnSetL2Position(data)
|
|
|
if __name__ == "__main__":
|
manager = TradeCommandManager("127.0.0.1", 10008, None)
|
manager.run()
|
input()
|