From 15199f8e93fe48e6261c99eadf6673d788db3a80 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期日, 17 三月 2024 22:56:10 +0800 Subject: [PATCH] L2进程与策略进程合并 --- huaxin_client/l2_client.py | 82 +++++++++++++--------------------------- 1 files changed, 27 insertions(+), 55 deletions(-) diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py index 3cfe584..1649e59 100644 --- a/huaxin_client/l2_client.py +++ b/huaxin_client/l2_client.py @@ -13,9 +13,10 @@ from huaxin_client import constant from huaxin_client import l2_data_manager import lev2mdapi -from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager +from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager from huaxin_client.command_manager import L2ActionCallback from huaxin_client.l2_data_manager import L2DataUploadManager +from huaxin_client.l2_data_transform_protocol import L2DataCallBack from log_module import log, async_log_util from log_module.async_log_util import huaxin_l2_log from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \ @@ -546,7 +547,7 @@ def test_add_codes(queue_r): - time.sleep(5) + time.sleep(10) # if value: # if type(value) == bytes: # value = value.decode("utf-8") @@ -565,61 +566,31 @@ ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)] queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) - time.sleep(1) + time.sleep(10) while True: - spi.l2_data_upload_manager.add_l2_order_detail( - {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', - 'OrderTime': '13000015', - 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) - spi.l2_data_upload_manager.add_l2_order_detail( - {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', - 'OrderTime': '13000015', - 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) - queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) - time.sleep(0.1) - spi.l2_data_upload_manager.add_l2_order_detail( - {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', - 'OrderTime': '13000015', - 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) - time.sleep(10) + try: + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) + # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) + time.sleep(0.1) + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) + except Exception as e: + logging.exception(e) + finally: + time.sleep(10) def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], - transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list) -> None: - # def test_add_codes(): - # time.sleep(5) - # # if value: - # # if type(value) == bytes: - # # value = value.decode("utf-8") - # # data = json.loads(value) - # # _type = data["type"] - # # if _type == "listen_volume": - # # volume = data["data"]["volume"] - # # code = data["data"]["code"] - # # spi.set_code_special_watch_volume(code, volume) - # # elif _type == "l2_cmd": - # # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) - # - # demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59), - # ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)] - # - # queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) - # time.sleep(1) - # - # spi.l2_data_upload_manager.add_l2_order_detail( - # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', - # 'OrderTime': '13000015', - # 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) - # spi.l2_data_upload_manager.add_l2_order_detail( - # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', - # 'OrderTime': '13000015', - # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) - # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) - # time.sleep(0.1) - # spi.l2_data_upload_manager.add_l2_order_detail( - # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', - # 'OrderTime': '13000015', - # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) + transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list, data_callbacks:list) -> None: logger_system.info("L2杩涚▼ID锛歿}", os.getpid()) logger_system.info(f"l2_client 绾跨▼ID:{tool.get_thread_id()}") @@ -632,12 +603,13 @@ # 鍒濆鍖� order_queue_distribute_manager = CodeQueueDistributeManager(order_queues) transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues) + data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager, - transaction_queue_distribute_manager, market_queue, order_ipc_hosts) + transaction_queue_distribute_manager, market_queue, order_ipc_hosts, data_callback_distribute_manager) __init_l2(l2_data_upload_manager) l2_data_manager.run_upload_common() l2_data_manager.run_log() - # 娴嬭瘯 + # TODO 娴嬭瘯 # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start() global l2CommandManager l2CommandManager = command_manager.L2CommandManager() -- Gitblit v1.8.0