| | |
| | | # 心跳信息 |
| | | from huaxin_client import socket_util |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | from log_module.log import logger_debug |
| | | from utils import middle_api_protocol |
| | | from log_module.log import logger_system, logger_request_api |
| | | from utils import middle_api_protocol, tool |
| | | |
| | | MSG_TYPE_HEART = "heart" |
| | | # 命令信息 |
| | | MSG_TYPE_CMD = "cmd" |
| | | |
| | | CLIENT_TYPE_COMMON = "common" |
| | | CLIENT_TYPE_TRADE = "trade" |
| | | |
| | | # 心跳时间间隔 |
| | |
| | | OPERRATE_SET = 1 # 设置 |
| | | OPERRATE_DELETE = 2 # 删除 |
| | | OPERRATE_GET = 3 # 获取 |
| | | OPERRATE_ADD = 4 # 新增 |
| | | |
| | | # 代码名单类型 |
| | | CODE_LIST_WHITE = "white" |
| | | CODE_LIST_BLACK = "black" |
| | | CODE_LIST_WANT = "want" |
| | | CODE_LIST_PAUSE_BUY = "pause_buy" |
| | | CODE_LIST_MUST_BUY = "must_buy" |
| | | CODE_LIST_GREEN = "green" |
| | | |
| | | # 类型 |
| | | API_TYPE_TRADE = "trade" # 交易 |
| | | API_TYPE_TRADE_STATE = "trade_state" # 交易状态 |
| | | API_TYPE_TRADE_MODE = "trade_mode" # 交易模式 |
| | | API_TYPE_SELL_RULE = "sell_rule" # 卖出规则 |
| | | API_TYPE_CODE_LIST = "code_list" # 代码名单 |
| | | API_TYPE_EXPORT_L2 = "export_l2" # 导出L2数据 |
| | | API_TYPE_INIT = "init" # 初始化 |
| | |
| | | API_TYPE_CODE_TRADE_STATE = "code_trade_state" # 代码交易状态 |
| | | API_TYPE_GET_ENV = "get_env" # 获取环境信息 |
| | | API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes" # 同步L1需要订阅的代码 |
| | | API_TYPE_SYSTEM_LOG = "system_log" # 系统日志 |
| | | API_TYPE_GET_FROM_DATA_SERVER = "get_from_data_server" # 从数据服务器拉取数据 |
| | | API_TYPE_CODE_TRADE_INFO = "code_trade_info" # 代码交易信息 |
| | | API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT = "l2_listen_active_count" # L2有效监听数量 |
| | | API_TYPE_SAVE_RUNNING_DATA = "save_running_data" # 保存运行时数据 |
| | | API_TYPE_GET_CODE_POSITION_INFO = "get_code_position_info" # 获取代码持仓信息 |
| | | API_TYPE_COMMON_REQUEST = "common_request" # 通用请求 |
| | | |
| | | |
| | | class ActionCallback(object): |
| | |
| | | |
| | | # 交易模式 |
| | | def OnTradeMode(self, client_id, request_id, data): |
| | | pass |
| | | |
| | | # 卖出规则 |
| | | def OnSellRule(self, client_id, request_id, data): |
| | | pass |
| | | |
| | | # 代码名单 |
| | |
| | | def OnSyncL2SubscriptCodes(self, client_id, request_id): |
| | | pass |
| | | |
| | | def OnGetFromDataServer(self, client_id, request_id, data): |
| | | pass |
| | | |
| | | # 代码的交易信息 |
| | | def OnGetCodeTradeInfo(self, client_id, request_id, data): |
| | | pass |
| | | |
| | | def OnGetActiveListenCount(self, client_id, request_id): |
| | | pass |
| | | |
| | | def OnSaveRunningData(self, client_id, request_id): |
| | | pass |
| | | |
| | | def OnGetCodePositionInfo(self, client_id, request_id, data): |
| | | pass |
| | | |
| | | def OnCommonRequest(self, client_id, request_id, data): |
| | | pass |
| | | |
| | | |
| | | # 交易指令管理 |
| | | # 交易指令管理 |
| | | class ApiCommandManager: |
| | | common_client_dict = {} |
| | | trade_client_dict = {} |
| | | trade_client_count = 0 |
| | | common_client_count = 0 |
| | | |
| | | _instance = None |
| | | |
| | | def __new__(cls, *args, **kwargs): |
| | |
| | | # 发送心跳 |
| | | cls.__heartbeats_thread(type, key, sk) |
| | | cls.__listen_command_thread(type, key, sk) |
| | | print("create_and_run_client success", type, key) |
| | | # print("create_and_run_client success", type, key) |
| | | logger_request_api.info(f"创建本地socket请求客户端:{type}") |
| | | return key, sk |
| | | |
| | | @classmethod |
| | | def init(cls, addr, port, trade_action_callback, trade_client_count=20): |
| | | cls.trade_client_dict = {} |
| | | def init(cls, addr, port, trade_action_callback, common_client_count=20, trade_client_count=20): |
| | | cls.common_client_dict.clear() |
| | | cls.common_client_count = common_client_count |
| | | cls.trade_client_dict.clear() |
| | | cls.trade_client_count = trade_client_count |
| | | cls.action_callback = trade_action_callback |
| | | cls.ip_port = (addr, port) |
| | | |
| | | for i in range(common_client_count): |
| | | result = cls.__create_and_run_client(CLIENT_TYPE_COMMON, i) |
| | | cls.common_client_dict[result[0]] = result[1] |
| | | for i in range(trade_client_count): |
| | | result = cls.__create_and_run_client(CLIENT_TYPE_TRADE, i) |
| | | cls.trade_client_dict[result[0]] = result[1] |
| | |
| | | try: |
| | | result = socket_util.recv_data(sk)[0] |
| | | if result: |
| | | start_time = time.time() |
| | | try: |
| | | print("接收数据", _type, result) |
| | | # print("接收数据", _type, result) |
| | | result_json = json.loads(result) |
| | | if result_json["type"] == MSG_TYPE_HEART: |
| | | # 返回内容 |
| | |
| | | |
| | | data = result_json["data"] |
| | | content_type = data["type"] |
| | | print("接收内容", data) |
| | | # print("接收内容", data) |
| | | request_id = result_json.get('request_id') |
| | | if not socket_util.is_client_params_sign_right(result_json): |
| | | print("签名错误") |
| | | # print("签名错误") |
| | | # 签名出错 |
| | | SendResponseSkManager.send_error_response(_type, request_id, client_id, |
| | | {"code": -1, "msg": "签名错误"}) |
| | | continue |
| | | |
| | | if content_type == API_TYPE_TRADE: |
| | | # 交易 |
| | | cls.action_callback.OnTrade(client_id, request_id, data) |
| | |
| | | cls.action_callback.OnTradeState(client_id, request_id, data) |
| | | elif content_type == API_TYPE_TRADE_MODE: |
| | | cls.action_callback.OnTradeMode(client_id, request_id, data) |
| | | elif content_type == API_TYPE_SELL_RULE: |
| | | cls.action_callback.OnSellRule(client_id, request_id, data) |
| | | elif content_type == API_TYPE_CODE_LIST: |
| | | cls.action_callback.OnCodeList(client_id, request_id, data) |
| | | elif content_type == API_TYPE_EXPORT_L2: |
| | |
| | | cls.action_callback.OnGetCodeTradeState(client_id, request_id, data) |
| | | elif content_type == API_TYPE_GET_ENV: |
| | | cls.action_callback.OnGetEnvInfo(client_id, request_id, data) |
| | | elif content_type == API_TYPE_GET_ENV: |
| | | elif content_type == API_TYPE_SYNC_L1_TARGET_CODES: |
| | | cls.action_callback.OnSyncL2SubscriptCodes(client_id, request_id) |
| | | elif content_type == API_TYPE_SYSTEM_LOG: |
| | | cls.action_callback.OnSystemLog(client_id, request_id, data) |
| | | elif content_type == API_TYPE_GET_FROM_DATA_SERVER: |
| | | cls.action_callback.OnGetFromDataServer(client_id, request_id, data) |
| | | elif content_type == API_TYPE_CODE_TRADE_INFO: |
| | | cls.action_callback.OnGetCodeTradeInfo(client_id, request_id, data) |
| | | elif content_type == API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT: |
| | | cls.action_callback.OnGetActiveListenCount(client_id, request_id) |
| | | elif content_type == API_TYPE_SAVE_RUNNING_DATA: |
| | | cls.action_callback.OnSaveRunningData(client_id, request_id) |
| | | elif content_type == API_TYPE_GET_CODE_POSITION_INFO: |
| | | cls.action_callback.OnGetCodePositionInfo(client_id, request_id, data) |
| | | elif content_type == API_TYPE_COMMON_REQUEST: |
| | | cls.action_callback.OnCommonRequest(client_id, request_id, data) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | pass |
| | | finally: |
| | | use_time = int(time.time() - start_time) |
| | | if use_time > 5: |
| | | result_json = json.loads(result) |
| | | logger_request_api.info(f"超时5s以上:{result_json['data']['type']}") |
| | | # 发送响应 |
| | | sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8')) |
| | | else: |
| | |
| | | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | if _type == CLIENT_TYPE_TRADE: |
| | | if _type == CLIENT_TYPE_COMMON: |
| | | if client_id in cls.common_client_dict: |
| | | cls.common_client_dict.pop(client_id) |
| | | elif _type == CLIENT_TYPE_TRADE: |
| | | if client_id in cls.trade_client_dict: |
| | | cls.trade_client_dict.pop(client_id) |
| | | print("pop trade client", client_id) |
| | | try: |
| | | sk.close() |
| | | except: |
| | |
| | | sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) |
| | | # print("心跳信息发送成功", client_id) |
| | | except Exception as e: |
| | | if _type == CLIENT_TYPE_TRADE: |
| | | if _type == CLIENT_TYPE_COMMON: |
| | | if client_id in cls.common_client_dict: |
| | | cls.common_client_dict.pop(client_id) |
| | | elif _type == CLIENT_TYPE_TRADE: |
| | | if client_id in cls.trade_client_dict: |
| | | cls.trade_client_dict.pop(client_id) |
| | | try: |
| | |
| | | |
| | | @classmethod |
| | | def __maintain_client(cls): |
| | | logger_system.info(f"outside_api __maintain_client 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | if len(cls.common_client_dict) < cls.common_client_count: |
| | | for i in range(cls.common_client_count - len(cls.common_client_dict)): |
| | | result = cls.__create_and_run_client(CLIENT_TYPE_COMMON) |
| | | cls.common_client_dict[result[0]] = result[1] |
| | | |
| | | if len(cls.trade_client_dict) < cls.trade_client_count: |
| | | print("__maintain_client", CLIENT_TYPE_TRADE, cls.trade_client_count - len(cls.trade_client_dict)) |
| | | for i in range(cls.trade_client_count - len(cls.trade_client_dict)): |
| | | result = cls.__create_and_run_client(CLIENT_TYPE_TRADE) |
| | | cls.trade_client_dict[result[0]] = result[1] |