From 7a08ac115597c920a11731ba584fae9f6028ecb2 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期二, 29 四月 2025 19:10:11 +0800
Subject: [PATCH] L2成交数据精确订阅

---
 l2_test.py |   48 +++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 45 insertions(+), 3 deletions(-)

diff --git a/l2_test.py b/l2_test.py
index 598e877..716bfca 100644
--- a/l2_test.py
+++ b/l2_test.py
@@ -13,9 +13,12 @@
 
 from code_attribute import global_data_loader
 from huaxin_client import l2_client_test, l1_subscript_codes_manager
-from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system
+from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \
+    logger_local_huaxin_l2_transaction_accurate_big_order
 from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
-from utils import tool, middle_api_protocol
+from third_data.history_k_data_manager import HistoryKDataManager
+from third_data.history_k_data_util import HistoryKDatasUtils
+from utils import tool, middle_api_protocol, global_util
 import urllib.parse as urlparse
 from urllib.parse import parse_qs
 
@@ -105,7 +108,41 @@
             time.sleep(1)
 
 
+def __get_special_codes():
+    """
+    鑾峰彇鐗规畩鐨勪唬鐮侊紝闇�瑕佽闃�300w浠ヤ笂鐨勫ぇ鍗�
+    @return: 浠g爜闆嗗悎
+    """
+    try:
+        zylt_volume_map = global_util.zylt_volume_map
+        codes = set()
+        last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0]
+        for code in zylt_volume_map:
+            volume = zylt_volume_map.get(code)
+            # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环
+            k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
+            if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
+                # 鑷敱娴侀�氬競鍊煎湪30浜�-300浜夸互涓�
+                limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2)
+                if limit_up_price > k_bars[0]["high"]:
+                    # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环
+                    codes.add(code)
+        return codes
+    except Exception as e:
+        logger_system.exception(e)
+        return set()
+
+
+def __save_accurate_big_order(big_accurate_order_queue):
+    while True:
+        try:
+            data = big_accurate_order_queue.get()
+            logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}")
+        except:
+            pass
+
 def run():
+    special_codes = __get_special_codes()
     codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
     codes = [x.decode() for x in codes_sh]
     codes.extend([x.decode() for x in codes_sz])
@@ -115,17 +152,22 @@
     page_size = int(len(codes) / cpu_count) + 1
 
     big_order_queue = multiprocessing.Queue(maxsize=1024)
+    big_accurate_order_queue = multiprocessing.Queue(maxsize=1024)
     # 澶у崟涓婁紶闃熷垪
     big_order_upload_queue = queue.Queue(maxsize=1024)
 
     for i in range(cpu_count):
         process = multiprocessing.Process(target=l2_client_test.run,
-                                          args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,))
+                                          args=(
+                                          codes[i * page_size:(i + 1) * page_size], big_order_queue, big_accurate_order_queue, special_codes,))
 
         process.start()
         # 缁戞牳杩愯
         psutil.Process(process.pid).cpu_affinity([i])
     threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start()
+    threading.Thread(target=__save_accurate_big_order, args=(big_accurate_order_queue,), daemon=True).start()
+
+
     while True:
         try:
             data = big_order_queue.get()

--
Gitblit v1.8.0