| | |
| | | return cls._instance |
| | | |
| | | @classmethod |
| | | def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read: multiprocessing.Queue): |
| | | def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue,queue_strategy_trade_read_for_read: multiprocessing.Queue): |
| | | cls.action_callback = trade_action_callback |
| | | cls.queue_strategy_trade_read = queue_strategy_trade_read |
| | | cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade |
| | | cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read |
| | | |
| | | @classmethod |
| | | def process_command(cls, _type, client_id, result_json, sk=None): |
| | |
| | | _type = val["type"] |
| | | if _type != "test": |
| | | async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}") |
| | | cls.process_command(_type, None, val) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | logging.exception(e) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | |
| | | @classmethod |
| | | def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue): |
| | | if queue_strategy_trade_read_trade is None: |
| | | return |
| | | # 本地命令接收 |
| | | try: |
| | | while True: |
| | | try: |
| | | val = queue_strategy_trade_read_trade.get() |
| | | if val: |
| | | _type = val["type"] |
| | | if _type != "test": |
| | | async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}") |
| | | cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val)) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | |
| | | # 维护连接数的稳定 |
| | | def run(self, blocking=True): |
| | | if blocking: |
| | | t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1.start() |
| | | self.run_process_command(self.queue_strategy_trade_read) |
| | | else: |
| | | # 接受命令 |
| | | t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1.start() |
| | | |
| | | |
| | | # L2指令管理 |
| | |
| | | |
| | | class MyTradeActionCallback(command_manager.TradeActionCallback): |
| | | __tradeSimpleApi = TradeSimpleApi() |
| | | trade_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20) |
| | | trade_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30) |
| | | |
| | | def OnTrade(self, client_id, request_id, sk, type_, data): |
| | | if type_ == 1: |
| | |
| | | |
| | | def run(trade_response_: TradeResponse = None, queue_other_w_l2_r_: multiprocessing.Queue = None, |
| | | queue_strategy_trade_write_=None, |
| | | queue_strategy_trade_read=None): |
| | | queue_strategy_trade_read=None,queue_strategy_trade_read_for_read=None): |
| | | try: |
| | | logger_system.info("交易进程ID:{}", os.getpid()) |
| | | logger_system.info(f"trade 线程ID:{tool.get_thread_id()}") |
| | |
| | | |
| | | global tradeCommandManager |
| | | tradeCommandManager = command_manager.TradeCommandManager() |
| | | tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_trade_read) |
| | | tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_trade_read, queue_strategy_trade_read_for_read) |
| | | logger_system.info("华鑫交易服务启动") |
| | | tradeCommandManager.run() |
| | | except Exception as e: |
| | |
| | | MIN_MONEYS = [300, 200, 100, 50] |
| | | watch_indexes = set() |
| | | for min_money in MIN_MONEYS: |
| | | for i in range(end_index, re_start_index, -1): |
| | | for i in range(end_index, re_start_index - 1, -1): |
| | | try: |
| | | data = total_datas[i] |
| | | val = data['val'] |
| | |
| | | |
| | | def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue, |
| | | queue_l1_w_strategy_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_: multiprocessing.Queue, order_queues_, transaction_queues_, |
| | | queue_strategy_w_trade_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_, |
| | | market_queue_): |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | |
| | | t1.start() |
| | | # |
| | | # 启动华鑫交易服务 |
| | | huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, order_queues_, |
| | | huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_, order_queues_, |
| | | transaction_queues_, market_queue_) |
| | | |
| | | |
| | |
| | | |
| | | # 交易读策略写 |
| | | queue_strategy_w_trade_r = multiprocessing.Queue() |
| | | queue_strategy_w_trade_r_for_read = multiprocessing.Queue() |
| | | # 策略读交易写 |
| | | queue_strategy_r_trade_w = multiprocessing.Queue() |
| | | |
| | |
| | | # 交易进程 |
| | | tradeProcess = multiprocessing.Process( |
| | | target=huaxin_client.trade_client.run, |
| | | args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r,)) |
| | | args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read)) |
| | | tradeProcess.start() |
| | | |
| | | # 创建L2通信队列 |
| | |
| | | |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read, |
| | | order_queues, transaction_queues, market_queue) |
| | | |
| | | # 将tradeServer作为主进程 |
| | |
| | | if __name__ == '__main__': |
| | | thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20) |
| | | thread_pool.submit(lambda: test1("123123")) |
| | | input() |
| | | thread_pool.submit(lambda: test1("123123")) |
| | | thread_pool.submit(lambda: test1("123123")) |
| | | thread_pool.submit(lambda: test1("123123")) |
| | | while True: |
| | | running_count = 0 |
| | | for future in thread_pool._threads: |
| | | if future.is_alive(): |
| | | running_count += 1 |
| | | print(running_count) |
| | | time.sleep(1) |
| | |
| | | |
| | | # 设置交易通信队列 |
| | | # 暂时不会使用该方法 |
| | | def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_): |
| | | global queue_strategy_w_trade_r |
| | | def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_): |
| | | global queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read |
| | | queue_strategy_w_trade_r = queue_strategy_w_trade_r_ |
| | | queue_strategy_w_trade_r_for_read = queue_strategy_w_trade_r_for_read_ |
| | | |
| | | |
| | | t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread(target=lambda: __run_save_data(), daemon=True) |
| | |
| | | |
| | | |
| | | # 网络请求 |
| | | def __request(_type, data, request_id=None, blocking=False, is_pipe=True, log_enable=True): |
| | | def __request(_type, data, request_id=None, blocking=False, is_pipe=True, log_enable=True,is_trade=False): |
| | | if not request_id: |
| | | request_id = __get_request_id(_type) |
| | | try: |
| | |
| | | "request_id": request_id} |
| | | root_data = socket_util.encryp_client_params_sign(root_data) |
| | | start_time = time.time() |
| | | queue_strategy_w_trade_r.put_nowait(root_data) |
| | | if is_trade: |
| | | queue_strategy_w_trade_r.put_nowait(root_data) |
| | | else: |
| | | queue_strategy_w_trade_r_for_read.put_nowait(root_data) |
| | | |
| | | use_time = int((time.time() - start_time) * 1000) |
| | | if use_time > 10: |
| | | async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}") |
| | |
| | | order_ref = huaxin_util.create_order_ref() |
| | | if not request_id: |
| | | request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE) |
| | | for i in range(2): |
| | | for i in range(1): |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE, |
| | | {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1, |
| | | "direction": direction, |
| | |
| | | "price": price, "shadow_price": shadow_price, "sinfo": sinfo, "blocking": blocking}, |
| | | request_id=request_id, |
| | | blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | is_pipe=is_pipe_channel_normal(),is_trade=True) |
| | | try: |
| | | if blocking: |
| | | return __read_response(request_id, blocking) |
| | |
| | | "orderRef": orderRef, |
| | | "orderActionRef": order_action_ref, |
| | | "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | is_pipe=is_pipe_channel_normal(),is_trade=True) |
| | | try: |
| | | return __read_response(request_id, blocking) |
| | | finally: |
| | |
| | | l2DataListenManager: L2DataListenManager = None |
| | | |
| | | |
| | | def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, order_queues, transaction_queues, |
| | | def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,queue_strategy_w_trade_r_for_read, order_queues, transaction_queues, |
| | | market_queue): |
| | | logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | |
| | | l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue) |
| | | |
| | | # 启动交易服务 |
| | | huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r) |
| | | huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,queue_strategy_w_trade_r_for_read) |
| | | |
| | | # 监听l1那边传过来的代码 |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True) |