From 122def357140a4a504710a57fe2bc1a8020aa7b1 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 27 二月 2024 17:57:39 +0800 Subject: [PATCH] 新版L2数据传输协议测试 --- huaxin_client/l2_client.py | 49 +++++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 45 insertions(+), 4 deletions(-) diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py index 88dec03..b55c574 100644 --- a/huaxin_client/l2_client.py +++ b/huaxin_client/l2_client.py @@ -192,7 +192,8 @@ d = self.codes_volume_and_price_dict.get(code) if d: min_volume, limit_up_price, special_price, buy_volume = d[0], d[1], d[2], d[3] - self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price, special_price,buy_volume, + self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price, special_price, + buy_volume, {volume, constant.SHADOW_ORDER_VOLUME}, time.time() + 3) huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"璁剧疆涓嬪崟閲忕洃鍚細{code}-{volume}") @@ -565,8 +566,47 @@ pipe_strategy = None +def test_add_codes(queue_r): + time.sleep(5) + # if value: + # if type(value) == bytes: + # value = value.decode("utf-8") + # data = json.loads(value) + # _type = data["type"] + # if _type == "listen_volume": + # volume = data["data"]["volume"] + # code = data["data"]["code"] + # spi.set_code_special_watch_volume(code, volume) + # elif _type == "l2_cmd": + # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) + + demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35, 6.00, 200), + ("002654", int(50 * 10000 / 15.59), 15.59, 15.3, 200), + ("603701", int(50 * 10000 / 14.28), 14.28, 14.00, 200), + ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)] + + queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) + time.sleep(1) + + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) + queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) + time.sleep(0.1) + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) + + def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], - transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None: + transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, + order_ipc_hosts: list) -> None: # def test_add_codes(): # time.sleep(5) # # if value: @@ -614,12 +654,13 @@ order_queue_distribute_manager = CodeQueueDistributeManager(order_queues) transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues) l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager, - transaction_queue_distribute_manager, market_queue) + transaction_queue_distribute_manager, market_queue, + order_ipc_hosts) __init_l2(l2_data_upload_manager) l2_data_manager.run_upload_common() l2_data_manager.run_log() # 娴嬭瘯 - # threading.Thread(target=lambda: test_add_codes(),daemon=True).start() + threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start() global l2CommandManager l2CommandManager = command_manager.L2CommandManager() l2CommandManager.init(MyL2ActionCallback()) -- Gitblit v1.8.0