From 15199f8e93fe48e6261c99eadf6673d788db3a80 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期日, 17 三月 2024 22:56:10 +0800
Subject: [PATCH] L2进程与策略进程合并

---
 huaxin_client/l2_client.py |   82 +++++++++++++---------------------------
 1 files changed, 27 insertions(+), 55 deletions(-)

diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py
index 3cfe584..1649e59 100644
--- a/huaxin_client/l2_client.py
+++ b/huaxin_client/l2_client.py
@@ -13,9 +13,10 @@
 from huaxin_client import constant
 from huaxin_client import l2_data_manager
 import lev2mdapi
-from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
+from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager
 from huaxin_client.command_manager import L2ActionCallback
 from huaxin_client.l2_data_manager import L2DataUploadManager
+from huaxin_client.l2_data_transform_protocol import L2DataCallBack
 from log_module import log, async_log_util
 from log_module.async_log_util import huaxin_l2_log
 from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
@@ -546,7 +547,7 @@
 
 
 def test_add_codes(queue_r):
-    time.sleep(5)
+    time.sleep(10)
     # if value:
     #     if type(value) == bytes:
     #         value = value.decode("utf-8")
@@ -565,61 +566,31 @@
                   ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)]
 
     queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
-    time.sleep(1)
+    time.sleep(10)
     while True:
-        spi.l2_data_upload_manager.add_l2_order_detail(
-            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
-             'OrderTime': '13000015',
-             'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
-        spi.l2_data_upload_manager.add_l2_order_detail(
-            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
-             'OrderTime': '13000015',
-             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
-        queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
-        time.sleep(0.1)
-        spi.l2_data_upload_manager.add_l2_order_detail(
-            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
-             'OrderTime': '13000015',
-             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
-        time.sleep(10)
+        try:
+            spi.l2_data_upload_manager.add_l2_order_detail(
+                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
+                 'OrderTime': '13000015',
+                 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
+            spi.l2_data_upload_manager.add_l2_order_detail(
+                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
+                 'OrderTime': '13000015',
+                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
+            # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
+            time.sleep(0.1)
+            spi.l2_data_upload_manager.add_l2_order_detail(
+                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
+                 'OrderTime': '13000015',
+                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
+        except Exception as e:
+            logging.exception(e)
+        finally:
+            time.sleep(10)
 
 
 def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
-        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list) -> None:
-    # def test_add_codes():
-    #     time.sleep(5)
-    #     # if value:
-    #     #     if type(value) == bytes:
-    #     #         value = value.decode("utf-8")
-    #     #     data = json.loads(value)
-    #     #     _type = data["type"]
-    #     #     if _type == "listen_volume":
-    #     #         volume = data["data"]["volume"]
-    #     #         code = data["data"]["code"]
-    #     #         spi.set_code_special_watch_volume(code, volume)
-    #     #     elif _type == "l2_cmd":
-    #     #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
-    #
-    #     demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59),
-    #                   ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)]
-    #
-    #     queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
-    #     time.sleep(1)
-    #
-    #     spi.l2_data_upload_manager.add_l2_order_detail(
-    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
-    #          'OrderTime': '13000015',
-    #          'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
-    #     spi.l2_data_upload_manager.add_l2_order_detail(
-    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
-    #          'OrderTime': '13000015',
-    #          'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
-    #     queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
-    #     time.sleep(0.1)
-    #     spi.l2_data_upload_manager.add_l2_order_detail(
-    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
-    #          'OrderTime': '13000015',
-    #          'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
+        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list, data_callbacks:list) -> None:
 
     logger_system.info("L2杩涚▼ID锛歿}", os.getpid())
     logger_system.info(f"l2_client 绾跨▼ID:{tool.get_thread_id()}")
@@ -632,12 +603,13 @@
         # 鍒濆鍖�
         order_queue_distribute_manager = CodeQueueDistributeManager(order_queues)
         transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
+        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
         l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
-                                                     transaction_queue_distribute_manager, market_queue, order_ipc_hosts)
+                                                     transaction_queue_distribute_manager, market_queue, order_ipc_hosts, data_callback_distribute_manager)
         __init_l2(l2_data_upload_manager)
         l2_data_manager.run_upload_common()
         l2_data_manager.run_log()
-        # 娴嬭瘯
+        # TODO 娴嬭瘯
         # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
         global l2CommandManager
         l2CommandManager = command_manager.L2CommandManager()

--
Gitblit v1.8.0