From 7a08ac115597c920a11731ba584fae9f6028ecb2 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 29 四月 2025 19:10:11 +0800 Subject: [PATCH] L2成交数据精确订阅 --- l2_test.py | 48 +++++++++++++++++++++++++++++++++++++++++++++--- 1 files changed, 45 insertions(+), 3 deletions(-) diff --git a/l2_test.py b/l2_test.py index 598e877..716bfca 100644 --- a/l2_test.py +++ b/l2_test.py @@ -13,9 +13,12 @@ from code_attribute import global_data_loader from huaxin_client import l2_client_test, l1_subscript_codes_manager -from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system +from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \ + logger_local_huaxin_l2_transaction_accurate_big_order from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager -from utils import tool, middle_api_protocol +from third_data.history_k_data_manager import HistoryKDataManager +from third_data.history_k_data_util import HistoryKDatasUtils +from utils import tool, middle_api_protocol, global_util import urllib.parse as urlparse from urllib.parse import parse_qs @@ -105,7 +108,41 @@ time.sleep(1) +def __get_special_codes(): + """ + 鑾峰彇鐗规畩鐨勪唬鐮侊紝闇�瑕佽闃�300w浠ヤ笂鐨勫ぇ鍗� + @return: 浠g爜闆嗗悎 + """ + try: + zylt_volume_map = global_util.zylt_volume_map + codes = set() + last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0] + for code in zylt_volume_map: + volume = zylt_volume_map.get(code) + # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环 + k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day) + if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8: + # 鑷敱娴侀�氬競鍊煎湪30浜�-300浜夸互涓� + limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2) + if limit_up_price > k_bars[0]["high"]: + # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环 + codes.add(code) + return codes + except Exception as e: + logger_system.exception(e) + return set() + + +def __save_accurate_big_order(big_accurate_order_queue): + while True: + try: + data = big_accurate_order_queue.get() + logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}") + except: + pass + def run(): + special_codes = __get_special_codes() codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() codes = [x.decode() for x in codes_sh] codes.extend([x.decode() for x in codes_sz]) @@ -115,17 +152,22 @@ page_size = int(len(codes) / cpu_count) + 1 big_order_queue = multiprocessing.Queue(maxsize=1024) + big_accurate_order_queue = multiprocessing.Queue(maxsize=1024) # 澶у崟涓婁紶闃熷垪 big_order_upload_queue = queue.Queue(maxsize=1024) for i in range(cpu_count): process = multiprocessing.Process(target=l2_client_test.run, - args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,)) + args=( + codes[i * page_size:(i + 1) * page_size], big_order_queue, big_accurate_order_queue, special_codes,)) process.start() # 缁戞牳杩愯 psutil.Process(process.pid).cpu_affinity([i]) threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start() + threading.Thread(target=__save_accurate_big_order, args=(big_accurate_order_queue,), daemon=True).start() + + while True: try: data = big_order_queue.get() -- Gitblit v1.8.0