# -*- coding: utf-8 -*- """ 命令管理器 """ import json import logging import threading from huaxin_api import socket_util from huaxin_api.client_network import SendResponseSkManager MSG_TYPE_HEART = "heart" # 命令信息 MSG_TYPE_CMD = "cmd" CLIENT_TYPE_TRADE = "trade" CLIENT_TYPE_DELEGATE_LIST = "delegate_list" CLIENT_TYPE_DEAL_LIST = "deal_list" CLIENT_TYPE_POSITION_LIST = "position_list" CLIENT_TYPE_MONEY = "money" CLIENT_TYPE_DEAL = "deal" CLIENT_TYPE_CMD_L2 = "l2_cmd" # 心跳时间间隔 HEART_SPACE_TIME = 3 class TradeActionCallback(object): # 交易 def OnTrade(self, client_id, request_id, type_, data): pass # 委托列表 def OnDelegateList(self, client_id, request_id): pass # 成交列表 def OnDealList(self, client_id, request_id): pass # 成交列表 def OnPositionList(self, client_id, request_id): pass # 获取资金信息 def OnMoney(self, client_id, request_id): pass class L2ActionCallback(object): # 监听L2数据 def OnSetL2Position(self, client_id, request_id, codes_data): pass # 交易指令管理 class TradeCommandManager: trade_client_dict = {} _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance @classmethod def init(cls, trade_action_callback, pipe_l2, pipe_strategy): cls.action_callback = trade_action_callback cls.pipe_strategy = pipe_strategy cls.pipe_l2 = pipe_l2 @classmethod def __process_command(cls, _type, client_id, result_json): try: data = result_json["data"] print("接收内容", result_json) request_id = result_json.get('request_id') if not socket_util.is_client_params_sign_right(result_json): print("签名错误") # 签名出错 SendResponseSkManager.send_error_response(_type, request_id, client_id, {"code": -1, "msg": "签名错误"}) return if _type == CLIENT_TYPE_TRADE: # 交易 ctype = data["trade_type"] cls.action_callback.OnTrade(client_id, request_id, ctype, data) elif _type == CLIENT_TYPE_MONEY: cls.action_callback.OnMoney(client_id, request_id) elif _type == CLIENT_TYPE_DEAL_LIST: cls.action_callback.OnDealList(client_id, request_id) elif _type == CLIENT_TYPE_DELEGATE_LIST: can_cancel = data["can_cancel"] cls.action_callback.OnDelegateList(client_id, request_id, can_cancel) elif _type == CLIENT_TYPE_POSITION_LIST: cls.action_callback.OnPositionList(client_id, request_id) except Exception as e: logging.exception(e) logging.error(result_json) @classmethod def run_process_command(cls, pipe_strategy): if pipe_strategy is None: return # 本地命令接收 while True: try: val = pipe_strategy.recv() if val: val = json.loads(val) print("run_process_command",val) _type = val["type"] _data = val["data"] # 查看是否是设置L2的代码 if _type == CLIENT_TYPE_CMD_L2: cls.pipe_l2.send( json.dumps({"type": "set_l2_codes", "data": _data})) else: t1 = threading.Thread(target=lambda: cls.__process_command(_type, None, _data), daemon=True) t1.start() except Exception as e: logging.exception(e) # 维护连接数的稳定 def run(self, blocking=True): if blocking: self.run_process_command(self.pipe_strategy) else: # 接受命令 t1 = threading.Thread(target=lambda: self.run_process_command(self.pipe_strategy), daemon=True) t1.start() # L2指令管理 class L2CommandManager: @classmethod def init(cls, l2_action_callback): cls.action_callback = l2_action_callback @classmethod def process_command(cls, _type, client_id, result_json): data = result_json["data"] request_id = result_json["request_id"] ctype = data["type"] if not socket_util.is_client_params_sign_right(result_json): # 签名出错 SendResponseSkManager.send_error_response(_type, request_id, client_id, {"code": -1, "msg": "签名错误"}) return codes_data = data["data"] if ctype == CLIENT_TYPE_CMD_L2: cls.action_callback.OnSetL2Position(client_id, request_id, codes_data) if __name__ == "__main__": manager = TradeCommandManager("127.0.0.1", 10008, None) manager.run() input()