| | |
| | | import concurrent.futures |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import threading |
| | | import time |
| | | |
| | | import zmq |
| | | |
| | | from huaxin_client import socket_util |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_local_huaxin_trade_debug, logger_trade |
| | | from log_module.log import logger_local_huaxin_trade_debug, logger_trade, logger_local_huaxin_contact_debug |
| | | |
| | | MSG_TYPE_HEART = "heart" |
| | | # 命令信息 |
| | |
| | | |
| | | class L2ActionCallback(object): |
| | | # 监听L2数据 |
| | | def OnSetL2Position(self, client_id, request_id, codes_data): |
| | | def OnSetL2Position(self, codes_data): |
| | | pass |
| | | |
| | | |
| | |
| | | return cls._instance |
| | | |
| | | @classmethod |
| | | def init(cls, trade_action_callback: TradeActionCallback, pipe_l2, pipe_strategy): |
| | | def init(cls, trade_action_callback: TradeActionCallback, |
| | | queue_strategy_trade_read_for_trade: multiprocessing.Queue, |
| | | queue_strategy_trade_read_for_read: multiprocessing.Queue): |
| | | cls.action_callback = trade_action_callback |
| | | cls.pipe_strategy = pipe_strategy |
| | | cls.pipe_l2 = pipe_l2 |
| | | cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade |
| | | cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read |
| | | |
| | | @classmethod |
| | | def process_command(cls, _type, client_id, result_json, sk=None): |
| | | async_log_util.info(logger_local_huaxin_contact_debug, f"process_command: {result_json}") |
| | | # 查看是否是设置L2的代码 |
| | | if _type == CLIENT_TYPE_CMD_L2: |
| | | cls.pipe_l2.send( |
| | | json.dumps({"type": "set_l2_codes", "data": result_json["data"]})) |
| | | return |
| | | |
| | | try: |
| | | data = result_json["data"] |
| | | request_id = result_json.get('request_id') |
| | |
| | | elif _type == "test": |
| | | cls.action_callback.OnTest(client_id, request_id, data, sk) |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.debug(f"process_command出错:{result_json}") |
| | | logging.exception(e) |
| | | logging.error(result_json) |
| | | async_log_util.error(logger_local_huaxin_contact_debug, f"process_command出错: {result_json}") |
| | | # logging.exception(e) |
| | | # logging.error(result_json) |
| | | |
| | | @classmethod |
| | | def run_process_command(cls, pipe_strategy): |
| | | if pipe_strategy is None: |
| | | def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue): |
| | | if queue_strategy_trade is None: |
| | | return |
| | | # 本地命令接收 |
| | | try: |
| | | while True: |
| | | try: |
| | | val = pipe_strategy.recv() |
| | | val = queue_strategy_trade.get() |
| | | if val: |
| | | val = json.loads(val) |
| | | _type = val["type"] |
| | | cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val)) |
| | | |
| | | if _type != "test": |
| | | async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}") |
| | | cls.process_command(_type, None, val) |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | |
| | | @classmethod |
| | | def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue): |
| | | if queue_strategy_trade_read_trade is None: |
| | | return |
| | | # 本地命令接收 |
| | | try: |
| | | while True: |
| | | try: |
| | | val = queue_strategy_trade_read_trade.get() |
| | | if val: |
| | | _type = val["type"] |
| | | if _type != "test": |
| | | async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}") |
| | | cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val)) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | logging.exception(e) |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | |
| | | ###############ZEROMQ协议接收命令################# |
| | | @classmethod |
| | | def __create_order_command_reciever(cls, ipc_addr): |
| | | """ |
| | | 接收下单命令 |
| | | @param ipc_addr: ipc地址 |
| | | @return: |
| | | """ |
| | | context = zmq.Context() |
| | | socket = context.socket(zmq.REP) |
| | | socket.bind(ipc_addr) |
| | | while True: |
| | | data = socket.recv_json() |
| | | try: |
| | | request_id = data.get('request_id') |
| | | use_time = time.time() - data.get('time') |
| | | data = data.get('data') |
| | | cls.action_callback.OnTrade(None, request_id, None, 1, data) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"下单通信耗时: {round(use_time*1000,3)}ms request_id:{request_id}") |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | @classmethod |
| | | def __create_cancel_order_command_reciever(cls, ipc_addr): |
| | | """ |
| | | 接收撤单命令 |
| | | @param ipc_addr: ipc地址 |
| | | @return: |
| | | """ |
| | | context = zmq.Context() |
| | | socket = context.socket(zmq.REP) |
| | | socket.bind(ipc_addr) |
| | | while True: |
| | | data = socket.recv_json() |
| | | try: |
| | | request_id = data.get('request_id') |
| | | use_time = time.time() - data.get('time') |
| | | data = data.get('data') |
| | | cls.action_callback.OnTrade(None, request_id, None, 2, data) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"撤单通信耗时: {round(use_time*1000,3)}ms request_id:{request_id}") |
| | | |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | # 维护连接数的稳定 |
| | | def run(self, blocking=True): |
| | | def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True): |
| | | if blocking: |
| | | self.run_process_command(self.pipe_strategy) |
| | | t1 = threading.Thread( |
| | | target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | self.run_process_command(self.queue_strategy_trade_read) |
| | | else: |
| | | # 接受命令 |
| | | t1 = threading.Thread(target=lambda: self.run_process_command(self.pipe_strategy), daemon=True) |
| | | t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | |
| | | |
| | | # L2指令管理 |
| | | class L2CommandManager: |
| | | action_callback = None |
| | | |
| | | @classmethod |
| | | def init(cls, l2_action_callback): |
| | |
| | | @classmethod |
| | | def process_command(cls, _type, client_id, result_json): |
| | | data = result_json["data"] |
| | | request_id = result_json["request_id"] |
| | | ctype = data["type"] |
| | | if not socket_util.is_client_params_sign_right(result_json): |
| | | # 签名出错 |
| | | SendResponseSkManager.send_error_response(_type, request_id, client_id, |
| | | {"code": -1, "msg": "签名错误"}) |
| | | return |
| | | codes_data = data["data"] |
| | | ctype = result_json["type"] |
| | | if ctype == CLIENT_TYPE_CMD_L2: |
| | | cls.action_callback.OnSetL2Position(client_id, request_id, codes_data) |
| | | cls.action_callback.OnSetL2Position(data) |
| | | |
| | | |
| | | if __name__ == "__main__": |