Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
outside_api_command_manager.py
@@ -11,13 +11,14 @@
# 心跳信息
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
from log_module.log import logger_debug
from utils import middle_api_protocol
from log_module.log import logger_system, logger_request_api
from utils import middle_api_protocol, tool
MSG_TYPE_HEART = "heart"
# 命令信息
MSG_TYPE_CMD = "cmd"
CLIENT_TYPE_COMMON = "common"
CLIENT_TYPE_TRADE = "trade"
# 心跳时间间隔
@@ -33,17 +34,21 @@
OPERRATE_SET = 1  # 设置
OPERRATE_DELETE = 2  # 删除
OPERRATE_GET = 3  # 获取
OPERRATE_ADD = 4  # 新增
# 代码名单类型
CODE_LIST_WHITE = "white"
CODE_LIST_BLACK = "black"
CODE_LIST_WANT = "want"
CODE_LIST_PAUSE_BUY = "pause_buy"
CODE_LIST_MUST_BUY = "must_buy"
CODE_LIST_GREEN = "green"
# 类型
API_TYPE_TRADE = "trade"  # 交易
API_TYPE_TRADE_STATE = "trade_state"  # 交易状态
API_TYPE_TRADE_MODE = "trade_mode"  # 交易模式
API_TYPE_SELL_RULE = "sell_rule"  # 卖出规则
API_TYPE_CODE_LIST = "code_list"  # 代码名单
API_TYPE_EXPORT_L2 = "export_l2"  # 导出L2数据
API_TYPE_INIT = "init"  # 初始化
@@ -53,6 +58,12 @@
API_TYPE_GET_ENV = "get_env"  # 获取环境信息
API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes"  # 同步L1需要订阅的代码
API_TYPE_SYSTEM_LOG = "system_log"  # 系统日志
API_TYPE_GET_FROM_DATA_SERVER = "get_from_data_server"  # 从数据服务器拉取数据
API_TYPE_CODE_TRADE_INFO = "code_trade_info"  # 代码交易信息
API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT = "l2_listen_active_count"  # L2有效监听数量
API_TYPE_SAVE_RUNNING_DATA = "save_running_data"  # 保存运行时数据
API_TYPE_GET_CODE_POSITION_INFO = "get_code_position_info"  # 获取代码持仓信息
API_TYPE_COMMON_REQUEST = "common_request"  # 通用请求
class ActionCallback(object):
@@ -66,6 +77,10 @@
    # 交易模式
    def OnTradeMode(self, client_id, request_id, data):
        pass
    # 卖出规则
    def OnSellRule(self, client_id, request_id, data):
        pass
    # 代码名单
@@ -93,11 +108,34 @@
    def OnSyncL2SubscriptCodes(self, client_id, request_id):
        pass
    def OnGetFromDataServer(self, client_id, request_id, data):
        pass
    # 代码的交易信息
    def OnGetCodeTradeInfo(self, client_id, request_id, data):
        pass
    def OnGetActiveListenCount(self, client_id, request_id):
        pass
    def OnSaveRunningData(self, client_id, request_id):
        pass
    def OnGetCodePositionInfo(self, client_id, request_id, data):
        pass
    def OnCommonRequest(self, client_id, request_id, data):
        pass
# 交易指令管理
# 交易指令管理
class ApiCommandManager:
    common_client_dict = {}
    trade_client_dict = {}
    trade_client_count = 0
    common_client_count = 0
    _instance = None
    def __new__(cls, *args, **kwargs):
@@ -125,16 +163,22 @@
        # 发送心跳
        cls.__heartbeats_thread(type, key, sk)
        cls.__listen_command_thread(type, key, sk)
        print("create_and_run_client success", type, key)
        # print("create_and_run_client success", type, key)
        logger_request_api.info(f"创建本地socket请求客户端:{type}")
        return key, sk
    @classmethod
    def init(cls, addr, port, trade_action_callback, trade_client_count=20):
        cls.trade_client_dict = {}
    def init(cls, addr, port, trade_action_callback, common_client_count=20, trade_client_count=20):
        cls.common_client_dict.clear()
        cls.common_client_count = common_client_count
        cls.trade_client_dict.clear()
        cls.trade_client_count = trade_client_count
        cls.action_callback = trade_action_callback
        cls.ip_port = (addr, port)
        for i in range(common_client_count):
            result = cls.__create_and_run_client(CLIENT_TYPE_COMMON, i)
            cls.common_client_dict[result[0]] = result[1]
        for i in range(trade_client_count):
            result = cls.__create_and_run_client(CLIENT_TYPE_TRADE, i)
            cls.trade_client_dict[result[0]] = result[1]
@@ -146,8 +190,9 @@
            try:
                result = socket_util.recv_data(sk)[0]
                if result:
                    start_time = time.time()
                    try:
                        print("接收数据", _type, result)
                        # print("接收数据", _type, result)
                        result_json = json.loads(result)
                        if result_json["type"] == MSG_TYPE_HEART:
                            # 返回内容
@@ -156,15 +201,14 @@
                        data = result_json["data"]
                        content_type = data["type"]
                        print("接收内容", data)
                        # print("接收内容", data)
                        request_id = result_json.get('request_id')
                        if not socket_util.is_client_params_sign_right(result_json):
                            print("签名错误")
                            # print("签名错误")
                            # 签名出错
                            SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                                      {"code": -1, "msg": "签名错误"})
                            continue
                        if content_type == API_TYPE_TRADE:
                            # 交易
                            cls.action_callback.OnTrade(client_id, request_id, data)
@@ -172,6 +216,8 @@
                            cls.action_callback.OnTradeState(client_id, request_id, data)
                        elif content_type == API_TYPE_TRADE_MODE:
                            cls.action_callback.OnTradeMode(client_id, request_id, data)
                        elif content_type == API_TYPE_SELL_RULE:
                            cls.action_callback.OnSellRule(client_id, request_id, data)
                        elif content_type == API_TYPE_CODE_LIST:
                            cls.action_callback.OnCodeList(client_id, request_id, data)
                        elif content_type == API_TYPE_EXPORT_L2:
@@ -190,10 +236,25 @@
                            cls.action_callback.OnSyncL2SubscriptCodes(client_id, request_id)
                        elif content_type == API_TYPE_SYSTEM_LOG:
                            cls.action_callback.OnSystemLog(client_id, request_id, data)
                        elif content_type == API_TYPE_GET_FROM_DATA_SERVER:
                            cls.action_callback.OnGetFromDataServer(client_id, request_id, data)
                        elif content_type == API_TYPE_CODE_TRADE_INFO:
                            cls.action_callback.OnGetCodeTradeInfo(client_id, request_id, data)
                        elif content_type == API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT:
                            cls.action_callback.OnGetActiveListenCount(client_id, request_id)
                        elif content_type == API_TYPE_SAVE_RUNNING_DATA:
                            cls.action_callback.OnSaveRunningData(client_id, request_id)
                        elif content_type == API_TYPE_GET_CODE_POSITION_INFO:
                            cls.action_callback.OnGetCodePositionInfo(client_id, request_id, data)
                        elif content_type == API_TYPE_COMMON_REQUEST:
                            cls.action_callback.OnCommonRequest(client_id, request_id, data)
                    except Exception as e:
                        logging.exception(e)
                        pass
                    finally:
                        use_time = int(time.time() - start_time)
                        if use_time > 5:
                            result_json = json.loads(result)
                            logger_request_api.info(f"超时5s以上:{result_json['data']['type']}")
                        # 发送响应
                        sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8'))
                else:
@@ -201,10 +262,12 @@
            except Exception as e:
                logging.exception(e)
                if _type == CLIENT_TYPE_TRADE:
                if _type == CLIENT_TYPE_COMMON:
                    if client_id in cls.common_client_dict:
                        cls.common_client_dict.pop(client_id)
                elif _type == CLIENT_TYPE_TRADE:
                    if client_id in cls.trade_client_dict:
                        cls.trade_client_dict.pop(client_id)
                        print("pop trade client", client_id)
                try:
                    sk.close()
                except:
@@ -219,7 +282,10 @@
                sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
                # print("心跳信息发送成功", client_id)
            except Exception as e:
                if _type == CLIENT_TYPE_TRADE:
                if _type == CLIENT_TYPE_COMMON:
                    if client_id in cls.common_client_dict:
                        cls.common_client_dict.pop(client_id)
                elif _type == CLIENT_TYPE_TRADE:
                    if client_id in cls.trade_client_dict:
                        cls.trade_client_dict.pop(client_id)
                try:
@@ -244,10 +310,15 @@
    @classmethod
    def __maintain_client(cls):
        logger_system.info(f"outside_api __maintain_client 线程ID:{tool.get_thread_id()}")
        while True:
            try:
                if len(cls.common_client_dict) < cls.common_client_count:
                    for i in range(cls.common_client_count - len(cls.common_client_dict)):
                        result = cls.__create_and_run_client(CLIENT_TYPE_COMMON)
                        cls.common_client_dict[result[0]] = result[1]
                if len(cls.trade_client_dict) < cls.trade_client_count:
                    print("__maintain_client", CLIENT_TYPE_TRADE, cls.trade_client_count - len(cls.trade_client_dict))
                    for i in range(cls.trade_client_count - len(cls.trade_client_dict)):
                        result = cls.__create_and_run_client(CLIENT_TYPE_TRADE)
                        cls.trade_client_dict[result[0]] = result[1]