| | |
| | | |
| | | # 心跳信息 |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | from log_module.log import logger_system, logger_request_api |
| | | from log_module.log import logger_system, logger_request_api, printlog |
| | | from utils import middle_api_protocol, tool, socket_util |
| | | |
| | | MSG_TYPE_HEART = "heart" |
| | |
| | | # 发送心跳 |
| | | cls.__heartbeats_thread(type, key, sk) |
| | | cls.__listen_command_thread(type, key, sk) |
| | | print("create_and_run_client success", type, key) |
| | | printlog("create_and_run_client success", type, key) |
| | | return key, sk |
| | | |
| | | @classmethod |
| | |
| | | result = cls.__create_and_run_client(CLIENT_TYPE_TRADE_SELL, i) |
| | | cls.trade_client_dict[result[0]] = result[1] |
| | | |
| | | # @classmethod |
| | | # def process_command(cls, client_id, result_json): |
| | | # data = result_json["data"] |
| | | # content_type = data["type"] |
| | | # printlog("接收内容", data) |
| | | # request_id = result_json.get('request_id') |
| | | # if not socket_util.is_client_params_sign_right(result_json): |
| | | # printlog("签名错误") |
| | | # # 签名出错 |
| | | # SendResponseSkManager.send_error_response(_type, request_id, client_id, |
| | | # {"code": -1, "msg": "签名错误"}) |
| | | # return |
| | | # if content_type == API_TYPE_TRADE: |
| | | # # 交易 |
| | | # cls.action_callback.OnTrade(client_id, request_id, data) |
| | | # elif content_type == API_TYPE_SELL_RULE: |
| | | # cls.action_callback.OnSellRule(client_id, request_id, data) |
| | | # elif content_type == API_TYPE_REFRESH_TRADE_DATA: |
| | | # cls.action_callback.OnRefreshTradeData(client_id, request_id, data) |
| | | # elif content_type == API_TYPE_GET_CODE_POSITION_INFO: |
| | | # cls.action_callback.OnGetCodePositionInfo(client_id, request_id, data) |
| | | # elif content_type == API_TYPE_COMMON_REQUEST: |
| | | # cls.action_callback.OnCommonRequest(client_id, request_id, data) |
| | | |
| | | # 听取指令 |
| | | @classmethod |
| | | def __listen_command(cls, _type, client_id, sk): |
| | |
| | | if result: |
| | | start_time = time.time() |
| | | try: |
| | | print("接收数据", _type, result) |
| | | printlog("接收数据", _type, result) |
| | | result_json = json.loads(result) |
| | | if result_json["type"] == MSG_TYPE_HEART: |
| | | # 返回内容 |
| | |
| | | |
| | | data = result_json["data"] |
| | | content_type = data["type"] |
| | | print("接收内容", data) |
| | | printlog("接收内容", data) |
| | | request_id = result_json.get('request_id') |
| | | if not socket_util.is_client_params_sign_right(result_json): |
| | | print("签名错误") |
| | | printlog("签名错误") |
| | | # 签名出错 |
| | | SendResponseSkManager.send_error_response(_type, request_id, client_id, |
| | | {"code": -1, "msg": "签名错误"}) |
| | |
| | | while True: |
| | | try: |
| | | sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) |
| | | # print("心跳信息发送成功", client_id) |
| | | # printlog("心跳信息发送成功", client_id) |
| | | except Exception as e: |
| | | if _type == CLIENT_TYPE_TRADE_SELL: |
| | | if client_id in cls.trade_client_dict: |
| | |
| | | t1.start() |
| | | |
| | | |
| | | class NewApiCommandManager: |
| | | """ |
| | | 新版交易指令管理 |
| | | """ |
| | | client_dict = {} # 保存当前的客户端,格式:{client_type:{client_id:socket}} |
| | | client_count_dict = {} # 每种client的最大个数,格式:{client_type:count} |
| | | action_callback = None |
| | | |
| | | _instance = None |
| | | |
| | | def __new__(cls, *args, **kwargs): |
| | | if not cls._instance: |
| | | cls._instance = super().__new__(cls, *args, **kwargs) |
| | | return cls._instance |
| | | |
| | | @classmethod |
| | | def __create_client(cls, client_type, rid): |
| | | client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server |
| | | # client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) |
| | | # client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000)) |
| | | client.connect(cls.ip_port) |
| | | client.send(SendResponseSkManager.format_response( |
| | | json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8"))) |
| | | client.recv(1024) |
| | | return client |
| | | |
| | | @classmethod |
| | | def __create_and_run_client(cls, type, index=None): |
| | | key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}" |
| | | if index is not None: |
| | | key += f"_{index}" |
| | | sk = cls.__create_client(type, key) |
| | | # 发送心跳 |
| | | cls.__heartbeats_thread(type, key, sk) |
| | | cls.__listen_command_thread(type, key, sk) |
| | | printlog("create_and_run_client success", type, key) |
| | | return key, sk |
| | | |
| | | @classmethod |
| | | def init(cls, addr, port, action_callback, clients_info): |
| | | """ |
| | | 初始化 |
| | | :param addr: 服务器地址 |
| | | :param port: 服务器端口 |
| | | :param trade_action_callback: 回调 |
| | | :param clients_info: 客户端信息:[(类型,数量)] |
| | | :return: |
| | | """ |
| | | cls.client_dict.clear() |
| | | cls.client_count_dict.clear() |
| | | cls.action_callback = action_callback |
| | | cls.ip_port = (addr, port) |
| | | # 初始化 |
| | | for client_info in clients_info: |
| | | cls.client_dict[client_info[0]] = {} |
| | | cls.client_count_dict[client_info[0]] = client_info[1] |
| | | # 创建连接客户端 |
| | | for client_type in cls.client_count_dict: |
| | | for i in range(cls.client_count_dict[client_type]): |
| | | result = cls.__create_and_run_client(client_type, i) |
| | | cls.client_dict[client_type][result[0]] = result[1] |
| | | |
| | | # 听取指令 |
| | | @classmethod |
| | | def __listen_command(cls, _type, client_id, sk): |
| | | while True: |
| | | try: |
| | | result = socket_util.recv_data(sk)[0] |
| | | if result: |
| | | start_time = time.time() |
| | | try: |
| | | printlog("接收数据", _type, result) |
| | | result_json = json.loads(result) |
| | | if result_json["type"] == MSG_TYPE_HEART: |
| | | # 返回内容 |
| | | sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')) |
| | | continue |
| | | |
| | | data = result_json["data"] |
| | | content_type = data["type"] |
| | | printlog("接收内容", data) |
| | | request_id = result_json.get('request_id') |
| | | if not socket_util.is_client_params_sign_right(result_json): |
| | | printlog("签名错误") |
| | | # 签名出错 |
| | | SendResponseSkManager.send_error_response(_type, request_id, client_id, |
| | | {"code": -1, "msg": "签名错误"}) |
| | | continue |
| | | cls.action_callback(client_id, request_id, data) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | | use_time = int(time.time() - start_time) |
| | | if use_time > 5: |
| | | result_json = json.loads(result) |
| | | logger_request_api.info(f"超时5s以上:{result_json['data']['type']}") |
| | | # 发送响应 |
| | | sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8')) |
| | | else: |
| | | raise Exception("接收的内容为空") |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | if _type in cls.client_dict: |
| | | if client_id in cls.client_dict[_type]: |
| | | cls.client_dict[_type].pop(client_id) |
| | | try: |
| | | sk.close() |
| | | except: |
| | | pass |
| | | # 结束当前的消息循环 |
| | | break |
| | | |
| | | @classmethod |
| | | def __heart_beats(cls, _type, client_id, sk): |
| | | while True: |
| | | try: |
| | | sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) |
| | | # printlog("心跳信息发送成功", client_id) |
| | | except Exception as e: |
| | | if _type in cls.client_dict: |
| | | if client_id in cls.client_dict[_type]: |
| | | cls.client_dict[_type].pop(client_id) |
| | | try: |
| | | sk.close() |
| | | except: |
| | | pass |
| | | # 结束当前的消息循环 |
| | | break |
| | | time.sleep(HEART_SPACE_TIME) |
| | | |
| | | @classmethod |
| | | def __listen_command_thread(cls, _type, rid, sk): |
| | | t1 = threading.Thread(target=lambda: cls.__listen_command(_type, rid, sk)) |
| | | t1.setDaemon(True) |
| | | t1.start() |
| | | |
| | | @classmethod |
| | | def __heartbeats_thread(cls, _type, rid, sk): |
| | | t1 = threading.Thread(target=lambda: cls.__heart_beats(_type, rid, sk)) |
| | | t1.setDaemon(True) |
| | | t1.start() |
| | | |
| | | @classmethod |
| | | def __maintain_client(cls): |
| | | logger_system.info(f"outside_api __maintain_client 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | for client_type in cls.client_count_dict: |
| | | if len(cls.client_dict[client_type]) < cls.client_count_dict[client_type]: |
| | | for i in range(cls.client_count_dict[client_type] - len(cls.client_dict[client_type])): |
| | | result = cls.__create_and_run_client(client_type) |
| | | cls.client_dict[client_type][result[0]] = result[1] |
| | | except: |
| | | pass |
| | | time.sleep(1) |
| | | |
| | | # 维护连接数的稳定 |
| | | def run(self, blocking=True): |
| | | # 维护client |
| | | if blocking: |
| | | self.__maintain_client() |
| | | else: |
| | | t1 = threading.Thread(target=lambda: self.__maintain_client()) |
| | | t1.setDaemon(True) |
| | | t1.start() |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, None) |
| | | manager.run() |