Administrator
3 天以前 2fd3c6bb36ba489fed3f7078d76dce8e1d27cc36
utils/outside_api_command_manager.py
@@ -10,7 +10,7 @@
# 心跳信息
from huaxin_client.client_network import SendResponseSkManager
from log_module.log import logger_system, logger_request_api
from log_module.log import logger_system, logger_request_api, printlog
from utils import middle_api_protocol, tool, socket_util
MSG_TYPE_HEART = "heart"
@@ -39,6 +39,7 @@
API_TYPE_SELL_RULE = "sell_rule"  # 卖出规则
API_TYPE_REFRESH_TRADE_DATA = "refresh_trade_data"  # 交易数据刷新
API_TYPE_GET_CODE_POSITION_INFO = "get_code_position_info"  # 获取代码持仓信息
API_TYPE_COMMON_REQUEST = "common_request"  # 通用请求
class ActionCallback(object):
@@ -54,6 +55,9 @@
        pass
    def OnGetCodePositionInfo(self, client_id, request_id, data):
        pass
    def OnCommonRequest(self, client_id, request_id, data):
        pass
@@ -93,7 +97,7 @@
        # 发送心跳
        cls.__heartbeats_thread(type, key, sk)
        cls.__listen_command_thread(type, key, sk)
        print("create_and_run_client success", type, key)
        printlog("create_and_run_client success", type, key)
        return key, sk
    @classmethod
@@ -106,6 +110,30 @@
            result = cls.__create_and_run_client(CLIENT_TYPE_TRADE_SELL, i)
            cls.trade_client_dict[result[0]] = result[1]
    # @classmethod
    # def process_command(cls, client_id, result_json):
    #     data = result_json["data"]
    #     content_type = data["type"]
    #     printlog("接收内容", data)
    #     request_id = result_json.get('request_id')
    #     if not socket_util.is_client_params_sign_right(result_json):
    #         printlog("签名错误")
    #         # 签名出错
    #         SendResponseSkManager.send_error_response(_type, request_id, client_id,
    #                                                   {"code": -1, "msg": "签名错误"})
    #         return
    #     if content_type == API_TYPE_TRADE:
    #         # 交易
    #         cls.action_callback.OnTrade(client_id, request_id, data)
    #     elif content_type == API_TYPE_SELL_RULE:
    #         cls.action_callback.OnSellRule(client_id, request_id, data)
    #     elif content_type == API_TYPE_REFRESH_TRADE_DATA:
    #         cls.action_callback.OnRefreshTradeData(client_id, request_id, data)
    #     elif content_type == API_TYPE_GET_CODE_POSITION_INFO:
    #         cls.action_callback.OnGetCodePositionInfo(client_id, request_id, data)
    #     elif content_type == API_TYPE_COMMON_REQUEST:
    #         cls.action_callback.OnCommonRequest(client_id, request_id, data)
    # 听取指令
    @classmethod
    def __listen_command(cls, _type, client_id, sk):
@@ -115,7 +143,7 @@
                if result:
                    start_time = time.time()
                    try:
                        print("接收数据", _type, result)
                        printlog("接收数据", _type, result)
                        result_json = json.loads(result)
                        if result_json["type"] == MSG_TYPE_HEART:
                            # 返回内容
@@ -124,10 +152,10 @@
                        data = result_json["data"]
                        content_type = data["type"]
                        print("接收内容", data)
                        printlog("接收内容", data)
                        request_id = result_json.get('request_id')
                        if not socket_util.is_client_params_sign_right(result_json):
                            print("签名错误")
                            printlog("签名错误")
                            # 签名出错
                            SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                                      {"code": -1, "msg": "签名错误"})
@@ -141,6 +169,8 @@
                            cls.action_callback.OnRefreshTradeData(client_id, request_id, data)
                        elif content_type == API_TYPE_GET_CODE_POSITION_INFO:
                            cls.action_callback.OnGetCodePositionInfo(client_id, request_id, data)
                        elif content_type == API_TYPE_COMMON_REQUEST:
                            cls.action_callback.OnCommonRequest(client_id, request_id, data)
                    except Exception as e:
                        logging.exception(e)
                    finally:
@@ -170,7 +200,7 @@
        while True:
            try:
                sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
                # print("心跳信息发送成功", client_id)
                # printlog("心跳信息发送成功", client_id)
            except Exception as e:
                if _type == CLIENT_TYPE_TRADE_SELL:
                    if client_id in cls.trade_client_dict:
@@ -219,6 +249,173 @@
            t1.start()
class NewApiCommandManager:
    """
    新版交易指令管理
    """
    client_dict = {}  # 保存当前的客户端,格式:{client_type:{client_id:socket}}
    client_count_dict = {}  # 每种client的最大个数,格式:{client_type:count}
    action_callback = None
    _instance = None
    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super().__new__(cls, *args, **kwargs)
        return cls._instance
    @classmethod
    def __create_client(cls, client_type, rid):
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 生成socket,连接server
        # client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
        # client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000))
        client.connect(cls.ip_port)
        client.send(SendResponseSkManager.format_response(
            json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8")))
        client.recv(1024)
        return client
    @classmethod
    def __create_and_run_client(cls, type, index=None):
        key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}"
        if index is not None:
            key += f"_{index}"
        sk = cls.__create_client(type, key)
        # 发送心跳
        cls.__heartbeats_thread(type, key, sk)
        cls.__listen_command_thread(type, key, sk)
        printlog("create_and_run_client success", type, key)
        return key, sk
    @classmethod
    def init(cls, addr, port, action_callback, clients_info):
        """
        初始化
        :param addr: 服务器地址
        :param port: 服务器端口
        :param trade_action_callback: 回调
        :param clients_info: 客户端信息:[(类型,数量)]
        :return:
        """
        cls.client_dict.clear()
        cls.client_count_dict.clear()
        cls.action_callback = action_callback
        cls.ip_port = (addr, port)
        # 初始化
        for client_info in clients_info:
            cls.client_dict[client_info[0]] = {}
            cls.client_count_dict[client_info[0]] = client_info[1]
        # 创建连接客户端
        for client_type in cls.client_count_dict:
            for i in range(cls.client_count_dict[client_type]):
                result = cls.__create_and_run_client(client_type, i)
                cls.client_dict[client_type][result[0]] = result[1]
    # 听取指令
    @classmethod
    def __listen_command(cls, _type, client_id, sk):
        while True:
            try:
                result = socket_util.recv_data(sk)[0]
                if result:
                    start_time = time.time()
                    try:
                        printlog("接收数据", _type, result)
                        result_json = json.loads(result)
                        if result_json["type"] == MSG_TYPE_HEART:
                            # 返回内容
                            sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))
                            continue
                        data = result_json["data"]
                        content_type = data["type"]
                        printlog("接收内容", data)
                        request_id = result_json.get('request_id')
                        if not socket_util.is_client_params_sign_right(result_json):
                            printlog("签名错误")
                            # 签名出错
                            SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                                      {"code": -1, "msg": "签名错误"})
                            continue
                        cls.action_callback(client_id, request_id, data)
                    except Exception as e:
                        logging.exception(e)
                    finally:
                        use_time = int(time.time() - start_time)
                        if use_time > 5:
                            result_json = json.loads(result)
                            logger_request_api.info(f"超时5s以上:{result_json['data']['type']}")
                        # 发送响应
                        sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8'))
                else:
                    raise Exception("接收的内容为空")
            except Exception as e:
                logging.exception(e)
                if _type in cls.client_dict:
                    if client_id in cls.client_dict[_type]:
                        cls.client_dict[_type].pop(client_id)
                try:
                    sk.close()
                except:
                    pass
                    # 结束当前的消息循环
                break
    @classmethod
    def __heart_beats(cls, _type, client_id, sk):
        while True:
            try:
                sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
                # printlog("心跳信息发送成功", client_id)
            except Exception as e:
                if _type in cls.client_dict:
                    if client_id in cls.client_dict[_type]:
                        cls.client_dict[_type].pop(client_id)
                try:
                    sk.close()
                except:
                    pass
                    # 结束当前的消息循环
                break
            time.sleep(HEART_SPACE_TIME)
    @classmethod
    def __listen_command_thread(cls, _type, rid, sk):
        t1 = threading.Thread(target=lambda: cls.__listen_command(_type, rid, sk))
        t1.setDaemon(True)
        t1.start()
    @classmethod
    def __heartbeats_thread(cls, _type, rid, sk):
        t1 = threading.Thread(target=lambda: cls.__heart_beats(_type, rid, sk))
        t1.setDaemon(True)
        t1.start()
    @classmethod
    def __maintain_client(cls):
        logger_system.info(f"outside_api __maintain_client 线程ID:{tool.get_thread_id()}")
        while True:
            try:
                for client_type in cls.client_count_dict:
                    if len(cls.client_dict[client_type]) < cls.client_count_dict[client_type]:
                        for i in range(cls.client_count_dict[client_type] - len(cls.client_dict[client_type])):
                            result = cls.__create_and_run_client(client_type)
                            cls.client_dict[client_type][result[0]] = result[1]
            except:
                pass
            time.sleep(1)
    # 维护连接数的稳定
    def run(self, blocking=True):
        # 维护client
        if blocking:
            self.__maintain_client()
        else:
            t1 = threading.Thread(target=lambda: self.__maintain_client())
            t1.setDaemon(True)
            t1.start()
if __name__ == "__main__":
    manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, None)
    manager.run()