""" 命令管理器 """ import json import logging import random import socket import threading import time # 心跳信息 import crypt import socket_util from client_network import SendResponseSkManager MSG_TYPE_HEART = "heart" # 命令信息 MSG_TYPE_CMD = "cmd" CLIENT_TYPE_TRADE = "trade" CLIENT_TYPE_DELEGATE_LIST = "delegate_list" CLIENT_TYPE_DEAL_LIST = "deal_list" CLIENT_TYPE_POSITION_LIST = "position_list" CLIENT_TYPE_MONEY = "money" CLIENT_TYPE_DEAL = "deal" CLIENT_TYPE_CMD_L2 = "l2_cmd" # 心跳时间间隔 HEART_SPACE_TIME = 3 class TradeActionCallback(object): # 交易 def OnTrade(self, client_id, request_id, type_, data): pass # 委托列表 def OnDelegateList(self, client_id, request_id): pass # 成交列表 def OnDealList(self, client_id, request_id): pass # 成交列表 def OnPositionList(self, client_id, request_id): pass # 获取资金信息 def OnMoney(self, client_id, request_id): pass class L2ActionCallback(object): # 监听L2数据 def OnSetL2Position(self, client_id, request_id, codes_data): pass # 交易指令管理 class TradeCommandManager: trade_client_dict = {} _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance @classmethod def __create_client(cls, client_type, rid): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) # client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000)) client.connect(cls.ip_port) client.send(SendResponseSkManager.format_response( json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8"))) client.recv(1024) return client @classmethod def __create_and_run_client(cls, type, index=None): key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}" if index is not None: key += f"_{index}" sk = cls.__create_client(type, key) # 发送心跳 cls.__heartbeats_thread(type, key, sk) cls.__listen_command_thread(type, key, sk) print("create_and_run_client success",type,key) return key, sk @classmethod def init(cls, addr, port, trade_action_callback, trade_client_count=10): cls.trade_client_dict = {} cls.trade_client_count = trade_client_count cls.action_callback = trade_action_callback cls.ip_port = (addr, port) for i in range(trade_client_count): result = cls.__create_and_run_client(CLIENT_TYPE_TRADE, i) cls.trade_client_dict[result[0]] = result[1] # 查询委托与成交与资金 cls.delegate_client = cls.__create_and_run_client(CLIENT_TYPE_DELEGATE_LIST) cls.deal_list_client = cls.__create_and_run_client(CLIENT_TYPE_DEAL_LIST) # 持仓客户端 cls.position_list_client = cls.__create_and_run_client(CLIENT_TYPE_POSITION_LIST) # 资金详情 cls.money_client = cls.__create_and_run_client(CLIENT_TYPE_MONEY) # 听取指令 @classmethod def __listen_command(cls, _type, client_id, sk): while True: try: result = socket_util.recv_data(sk)[0] if result: try: print("接收数据", _type, result) result_json = json.loads(result) if result_json["type"] == MSG_TYPE_HEART: # 返回内容 sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')) continue data = result_json["data"] print("接收内容", data) request_id = result_json.get('request_id') if not socket_util.is_client_params_sign_right(result_json): print("签名错误") # 签名出错 SendResponseSkManager.send_error_response(_type, request_id, client_id, {"code": -1, "msg": "签名错误"}) continue if _type == CLIENT_TYPE_TRADE: # 交易 ctype = data["trade_type"] cls.action_callback.OnTrade(client_id, request_id, ctype, data) elif _type == CLIENT_TYPE_MONEY: cls.action_callback.OnMoney(client_id, request_id) elif _type == CLIENT_TYPE_DEAL_LIST: cls.action_callback.OnDealList(client_id, request_id) elif _type == CLIENT_TYPE_DELEGATE_LIST: can_cancel = data["can_cancel"] cls.action_callback.OnDelegateList(client_id, request_id, can_cancel) elif _type == CLIENT_TYPE_POSITION_LIST: cls.action_callback.OnPositionList(client_id, request_id) except Exception as e: logging.exception(e) pass finally: # 发送响应 sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8')) else: raise Exception("接收的内容为空") except Exception as e: logging.exception(e) if _type == CLIENT_TYPE_TRADE: if client_id in cls.trade_client_dict: cls.trade_client_dict.pop(client_id) print("pop trade client", client_id) elif _type == CLIENT_TYPE_MONEY: cls.money_client = None elif _type == CLIENT_TYPE_DEAL_LIST: cls.deal_list_client = None elif _type == CLIENT_TYPE_DELEGATE_LIST: cls.delegate_client = None elif _type == CLIENT_TYPE_POSITION_LIST: cls.position_list_client = None try: sk.close() except: pass # 结束当前的消息循环 break @classmethod def __heart_beats(cls, _type, client_id, sk): while True: try: sk.send(SendResponseSkManager.format_response( json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) # print("心跳信息发送成功", client_id) except Exception as e: print("心跳信息发送失败",_type,client_id) logging.exception(e) if _type == CLIENT_TYPE_TRADE: if client_id in cls.trade_client_dict: cls.trade_client_dict.pop(client_id) elif _type == CLIENT_TYPE_MONEY: cls.money_client = None elif _type == CLIENT_TYPE_DEAL_LIST: cls.deal_list_client = None elif _type == CLIENT_TYPE_DELEGATE_LIST: cls.delegate_client = None elif _type == CLIENT_TYPE_POSITION_LIST: cls.position_list_client = None try: sk.close() except: pass # 结束当前的消息循环 break time.sleep(HEART_SPACE_TIME) @classmethod def __listen_command_thread(cls, _type, rid, sk): t1 = threading.Thread(target=lambda: cls.__listen_command(_type, rid, sk)) t1.setDaemon(True) t1.start() @classmethod def __heartbeats_thread(cls, _type, rid, sk): t1 = threading.Thread(target=lambda: cls.__heart_beats(_type, rid, sk)) t1.setDaemon(True) t1.start() @classmethod def __maintain_client(cls): while True: try: if len(cls.trade_client_dict) < cls.trade_client_count: print("__maintain_client", CLIENT_TYPE_TRADE, cls.trade_client_count - len(cls.trade_client_dict)) for i in range(cls.trade_client_count - len(cls.trade_client_dict)): result = cls.__create_and_run_client(CLIENT_TYPE_TRADE) cls.trade_client_dict[result[0]] = result[1] # 判断 # 查询委托与成交与资金 if cls.delegate_client is None: print("__maintain_client", CLIENT_TYPE_DELEGATE_LIST) cls.delegate_client = cls.__create_and_run_client(CLIENT_TYPE_DELEGATE_LIST) if cls.deal_list_client is None: print("__maintain_client", CLIENT_TYPE_DEAL_LIST) cls.deal_list_client = cls.__create_and_run_client(CLIENT_TYPE_DEAL_LIST) # 持仓客户端 if cls.position_list_client is None: print("__maintain_client", CLIENT_TYPE_POSITION_LIST) cls.position_list_client = cls.__create_and_run_client(CLIENT_TYPE_POSITION_LIST) if cls.money_client is None: print("__maintain_client", CLIENT_TYPE_MONEY) cls.money_client = cls.__create_and_run_client(CLIENT_TYPE_MONEY) except: pass time.sleep(1) # 维护连接数的稳定 def run(self, blocking=True): # 维护client if blocking: self.__maintain_client() else: t1 = threading.Thread(target=lambda: self.__maintain_client()) t1.setDaemon(True) t1.start() # L2指令管理 class L2CommandManager: @classmethod def __create_client(cls, client_type, rid): client = socket.socket() # 生成socket,连接server # client.settimeout(20) client.connect(cls.ip_port) client.send(SendResponseSkManager.format_response( json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8"))) client.recv(10240) return client @classmethod def init(cls, addr, port, l2_action_callback): cls.action_callback = l2_action_callback cls.ip_port = (addr, port) # 查询委托与成交与资金 key = f"{CLIENT_TYPE_CMD_L2}_{round(time.time() * 1000)}_{random.randint(0, 1000)}" cls.l2_cmd_client = cls.__create_client(CLIENT_TYPE_CMD_L2, key) cls.__heartbeats_thread(CLIENT_TYPE_CMD_L2, key, cls.l2_cmd_client) cls.__listen_command_thread(CLIENT_TYPE_CMD_L2, key, cls.l2_cmd_client) # 听取指令 @classmethod def __listen_command(cls, _type, client_id, sk): while True: try: result = socket_util.recv_data(sk)[0] print("接收L2_CMD数据") if result: try: result_json = json.loads(result) if result_json["type"] == MSG_TYPE_HEART: # 返回内容 sk.send(json.dumps({"type": "heart", "rid": client_id}).encode('utf-8')) continue data = result_json["data"] request_id = result_json["request_id"] type = data["type"] if not socket_util.is_client_params_sign_right(result_json): # 签名出错 SendResponseSkManager.send_error_response(_type, request_id, client_id, {"code": -1, "msg": "签名错误"}) continue codes_data = data["data"] if type == CLIENT_TYPE_CMD_L2: cls.action_callback.OnSetL2Position(client_id, request_id, codes_data) except: pass finally: # 发送响应 sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8')) else: raise Exception("接收L2_CMD数据为空") except Exception as e: logging.exception(e) cls.l2_cmd_client = None try: sk.close() except: pass # 结束当前的消息循环 break @classmethod def __heart_beats(cls, _type, client_id, sk): while True: try: sk.send(SendResponseSkManager.format_response( json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) # print("心跳信息发送成功", client_id) except Exception as e: print("心跳信息发送失败") logging.exception(e) cls.l2_cmd_client = None try: sk.close() except: pass break time.sleep(HEART_SPACE_TIME) @classmethod def __heartbeats_thread(cls, _type, rid, sk): t1 = threading.Thread(target=lambda: cls.__heart_beats(_type, rid, sk)) t1.setDaemon(True) t1.start() @classmethod def __listen_command_thread(cls, _type, rid, sk): t1 = threading.Thread(target=lambda: cls.__listen_command(_type, rid, sk)) t1.setDaemon(True) t1.start() @classmethod def __maintain_client(cls): while True: try: if cls.l2_cmd_client is None: print("__maintain_client") key = f"{CLIENT_TYPE_CMD_L2}_{round(time.time() * 1000)}_{random.randint(0, 1000)}" cls.l2_cmd_client = cls.__create_client(CLIENT_TYPE_CMD_L2, key) cls.__heartbeats_thread(CLIENT_TYPE_CMD_L2, key, cls.l2_cmd_client) cls.__listen_command_thread(CLIENT_TYPE_CMD_L2, key, cls.l2_cmd_client) except Exception as e: logging.exception(e) time.sleep(1) # 维护连接数的稳定 def run(self, blocking=True): if blocking: self.__maintain_client() else: # 维护client t1 = threading.Thread(target=lambda: self.__maintain_client()) t1.setDaemon(True) t1.start() if __name__ == "__main__": manager = TradeCommandManager("127.0.0.1", 10008, None) manager.run() input()