""" 外部接口管理 """ import json import logging import random import socket import threading import time # 心跳信息 from huaxin_client.client_network import SendResponseSkManager from log_module.log import logger_system, logger_request_api, printlog from utils import middle_api_protocol, tool, socket_util MSG_TYPE_HEART = "heart" # 命令信息 MSG_TYPE_CMD = "cmd" CLIENT_TYPE_TRADE_SELL = "trade_sell" # 心跳时间间隔 HEART_SPACE_TIME = 3 TRADE_DIRECTION_BUY = 1 TRADE_DIRECTION_SELL = 2 TRADE_TYPE_ORDER = 1 TRADE_TYPE_CANCEL_ORDER = 2 # 数据操作 OPERRATE_SET = 1 # 设置 OPERRATE_DELETE = 2 # 删除 OPERRATE_GET = 3 # 获取 OPERRATE_ADD = 4 # 新增 # 类型 API_TYPE_TRADE = "trade" # 交易 API_TYPE_SELL_RULE = "sell_rule" # 卖出规则 API_TYPE_REFRESH_TRADE_DATA = "refresh_trade_data" # 交易数据刷新 API_TYPE_GET_CODE_POSITION_INFO = "get_code_position_info" # 获取代码持仓信息 API_TYPE_COMMON_REQUEST = "common_request" # 通用请求 class ActionCallback(object): # 交易 def OnTrade(self, client_id, request_id, data): pass # 卖出规则 def OnSellRule(self, client_id, request_id, data): pass def OnRefreshTradeData(self, client_id, request_id, data): pass def OnGetCodePositionInfo(self, client_id, request_id, data): pass def OnCommonRequest(self, client_id, request_id, data): pass # 交易指令管理 # 交易指令管理 class ApiCommandManager: common_client_dict = {} trade_client_dict = {} trade_client_count = 0 common_client_count = 0 action_callback = None _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance @classmethod def __create_client(cls, client_type, rid): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server # client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) # client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000)) client.connect(cls.ip_port) client.send(SendResponseSkManager.format_response( json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8"))) client.recv(1024) return client @classmethod def __create_and_run_client(cls, type, index=None): key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}" if index is not None: key += f"_{index}" sk = cls.__create_client(type, key) # 发送心跳 cls.__heartbeats_thread(type, key, sk) cls.__listen_command_thread(type, key, sk) printlog("create_and_run_client success", type, key) return key, sk @classmethod def init(cls, addr, port, trade_action_callback, trade_client_count=20): cls.trade_client_dict.clear() cls.trade_client_count = trade_client_count cls.action_callback = trade_action_callback cls.ip_port = (addr, port) for i in range(trade_client_count): result = cls.__create_and_run_client(CLIENT_TYPE_TRADE_SELL, i) cls.trade_client_dict[result[0]] = result[1] # @classmethod # def process_command(cls, client_id, result_json): # data = result_json["data"] # content_type = data["type"] # printlog("接收内容", data) # request_id = result_json.get('request_id') # if not socket_util.is_client_params_sign_right(result_json): # printlog("签名错误") # # 签名出错 # SendResponseSkManager.send_error_response(_type, request_id, client_id, # {"code": -1, "msg": "签名错误"}) # return # if content_type == API_TYPE_TRADE: # # 交易 # cls.action_callback.OnTrade(client_id, request_id, data) # elif content_type == API_TYPE_SELL_RULE: # cls.action_callback.OnSellRule(client_id, request_id, data) # elif content_type == API_TYPE_REFRESH_TRADE_DATA: # cls.action_callback.OnRefreshTradeData(client_id, request_id, data) # elif content_type == API_TYPE_GET_CODE_POSITION_INFO: # cls.action_callback.OnGetCodePositionInfo(client_id, request_id, data) # elif content_type == API_TYPE_COMMON_REQUEST: # cls.action_callback.OnCommonRequest(client_id, request_id, data) # 听取指令 @classmethod def __listen_command(cls, _type, client_id, sk): while True: try: result = socket_util.recv_data(sk)[0] if result: start_time = time.time() try: printlog("接收数据", _type, result) result_json = json.loads(result) if result_json["type"] == MSG_TYPE_HEART: # 返回内容 sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')) continue data = result_json["data"] content_type = data["type"] printlog("接收内容", data) request_id = result_json.get('request_id') if not socket_util.is_client_params_sign_right(result_json): printlog("签名错误") # 签名出错 SendResponseSkManager.send_error_response(_type, request_id, client_id, {"code": -1, "msg": "签名错误"}) continue if content_type == API_TYPE_TRADE: # 交易 cls.action_callback.OnTrade(client_id, request_id, data) elif content_type == API_TYPE_SELL_RULE: cls.action_callback.OnSellRule(client_id, request_id, data) elif content_type == API_TYPE_REFRESH_TRADE_DATA: cls.action_callback.OnRefreshTradeData(client_id, request_id, data) elif content_type == API_TYPE_GET_CODE_POSITION_INFO: cls.action_callback.OnGetCodePositionInfo(client_id, request_id, data) elif content_type == API_TYPE_COMMON_REQUEST: cls.action_callback.OnCommonRequest(client_id, request_id, data) except Exception as e: logging.exception(e) finally: use_time = int(time.time() - start_time) if use_time > 5: result_json = json.loads(result) logger_request_api.info(f"超时5s以上:{result_json['data']['type']}") # 发送响应 sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8')) else: raise Exception("接收的内容为空") except Exception as e: logging.exception(e) if _type == CLIENT_TYPE_TRADE_SELL: if client_id in cls.trade_client_dict: cls.trade_client_dict.pop(client_id) try: sk.close() except: pass # 结束当前的消息循环 break @classmethod def __heart_beats(cls, _type, client_id, sk): while True: try: sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) # printlog("心跳信息发送成功", client_id) except Exception as e: if _type == CLIENT_TYPE_TRADE_SELL: if client_id in cls.trade_client_dict: cls.trade_client_dict.pop(client_id) try: sk.close() except: pass # 结束当前的消息循环 break time.sleep(HEART_SPACE_TIME) @classmethod def __listen_command_thread(cls, _type, rid, sk): t1 = threading.Thread(target=lambda: cls.__listen_command(_type, rid, sk)) t1.setDaemon(True) t1.start() @classmethod def __heartbeats_thread(cls, _type, rid, sk): t1 = threading.Thread(target=lambda: cls.__heart_beats(_type, rid, sk)) t1.setDaemon(True) t1.start() @classmethod def __maintain_client(cls): logger_system.info(f"outside_api __maintain_client 线程ID:{tool.get_thread_id()}") while True: try: if len(cls.trade_client_dict) < cls.trade_client_count: for i in range(cls.trade_client_count - len(cls.trade_client_dict)): result = cls.__create_and_run_client(CLIENT_TYPE_TRADE_SELL) cls.trade_client_dict[result[0]] = result[1] except: pass time.sleep(1) # 维护连接数的稳定 def run(self, blocking=True): # 维护client if blocking: self.__maintain_client() else: t1 = threading.Thread(target=lambda: self.__maintain_client()) t1.setDaemon(True) t1.start() class NewApiCommandManager: """ 新版交易指令管理 """ client_dict = {} # 保存当前的客户端,格式:{client_type:{client_id:socket}} client_count_dict = {} # 每种client的最大个数,格式:{client_type:count} action_callback = None _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance @classmethod def __create_client(cls, client_type, rid): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server # client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) # client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000)) client.connect(cls.ip_port) client.send(SendResponseSkManager.format_response( json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8"))) client.recv(1024) return client @classmethod def __create_and_run_client(cls, type, index=None): key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}" if index is not None: key += f"_{index}" sk = cls.__create_client(type, key) # 发送心跳 cls.__heartbeats_thread(type, key, sk) cls.__listen_command_thread(type, key, sk) printlog("create_and_run_client success", type, key) return key, sk @classmethod def init(cls, addr, port, action_callback, clients_info): """ 初始化 :param addr: 服务器地址 :param port: 服务器端口 :param trade_action_callback: 回调 :param clients_info: 客户端信息:[(类型,数量)] :return: """ cls.client_dict.clear() cls.client_count_dict.clear() cls.action_callback = action_callback cls.ip_port = (addr, port) # 初始化 for client_info in clients_info: cls.client_dict[client_info[0]] = {} cls.client_count_dict[client_info[0]] = client_info[1] # 创建连接客户端 for client_type in cls.client_count_dict: for i in range(cls.client_count_dict[client_type]): result = cls.__create_and_run_client(client_type, i) cls.client_dict[client_type][result[0]] = result[1] # 听取指令 @classmethod def __listen_command(cls, _type, client_id, sk): while True: try: result = socket_util.recv_data(sk)[0] if result: start_time = time.time() try: printlog("接收数据", _type, result) result_json = json.loads(result) if result_json["type"] == MSG_TYPE_HEART: # 返回内容 sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')) continue data = result_json["data"] content_type = data["type"] printlog("接收内容", data) request_id = result_json.get('request_id') if not socket_util.is_client_params_sign_right(result_json): printlog("签名错误") # 签名出错 SendResponseSkManager.send_error_response(_type, request_id, client_id, {"code": -1, "msg": "签名错误"}) continue cls.action_callback(client_id, request_id, data) except Exception as e: logging.exception(e) finally: use_time = int(time.time() - start_time) if use_time > 5: result_json = json.loads(result) logger_request_api.info(f"超时5s以上:{result_json['data']['type']}") # 发送响应 sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8')) else: raise Exception("接收的内容为空") except Exception as e: logging.exception(e) if _type in cls.client_dict: if client_id in cls.client_dict[_type]: cls.client_dict[_type].pop(client_id) try: sk.close() except: pass # 结束当前的消息循环 break @classmethod def __heart_beats(cls, _type, client_id, sk): while True: try: sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) # printlog("心跳信息发送成功", client_id) except Exception as e: if _type in cls.client_dict: if client_id in cls.client_dict[_type]: cls.client_dict[_type].pop(client_id) try: sk.close() except: pass # 结束当前的消息循环 break time.sleep(HEART_SPACE_TIME) @classmethod def __listen_command_thread(cls, _type, rid, sk): t1 = threading.Thread(target=lambda: cls.__listen_command(_type, rid, sk)) t1.setDaemon(True) t1.start() @classmethod def __heartbeats_thread(cls, _type, rid, sk): t1 = threading.Thread(target=lambda: cls.__heart_beats(_type, rid, sk)) t1.setDaemon(True) t1.start() @classmethod def __maintain_client(cls): logger_system.info(f"outside_api __maintain_client 线程ID:{tool.get_thread_id()}") while True: try: for client_type in cls.client_count_dict: if len(cls.client_dict[client_type]) < cls.client_count_dict[client_type]: for i in range(cls.client_count_dict[client_type] - len(cls.client_dict[client_type])): result = cls.__create_and_run_client(client_type) cls.client_dict[client_type][result[0]] = result[1] except: pass time.sleep(1) # 维护连接数的稳定 def run(self, blocking=True): # 维护client if blocking: self.__maintain_client() else: t1 = threading.Thread(target=lambda: self.__maintain_client()) t1.setDaemon(True) t1.start() if __name__ == "__main__": manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, None) manager.run() input()