Administrator
2025-04-29 7a08ac115597c920a11731ba584fae9f6028ecb2
l2_test.py
@@ -13,9 +13,12 @@
from code_attribute import global_data_loader
from huaxin_client import l2_client_test, l1_subscript_codes_manager
from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system
from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \
    logger_local_huaxin_l2_transaction_accurate_big_order
from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
from utils import tool, middle_api_protocol
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import tool, middle_api_protocol, global_util
import urllib.parse as urlparse
from urllib.parse import parse_qs
@@ -105,7 +108,41 @@
            time.sleep(1)
def __get_special_codes():
    """
    获取特殊的代码,需要订阅300w以上的大单
    @return: 代码集合
    """
    try:
        zylt_volume_map = global_util.zylt_volume_map
        codes = set()
        last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0]
        for code in zylt_volume_map:
            volume = zylt_volume_map.get(code)
            # 今日涨停价要突破昨日最高价
            k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
            if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
                # 自由流通市值在30亿-300亿以上
                limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2)
                if limit_up_price > k_bars[0]["high"]:
                    # 今日涨停价要突破昨日最高价
                    codes.add(code)
        return codes
    except Exception as e:
        logger_system.exception(e)
        return set()
def __save_accurate_big_order(big_accurate_order_queue):
    while True:
        try:
            data = big_accurate_order_queue.get()
            logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}")
        except:
            pass
def run():
    special_codes = __get_special_codes()
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    codes = [x.decode() for x in codes_sh]
    codes.extend([x.decode() for x in codes_sz])
@@ -115,17 +152,22 @@
    page_size = int(len(codes) / cpu_count) + 1
    big_order_queue = multiprocessing.Queue(maxsize=1024)
    big_accurate_order_queue = multiprocessing.Queue(maxsize=1024)
    # 大单上传队列
    big_order_upload_queue = queue.Queue(maxsize=1024)
    for i in range(cpu_count):
        process = multiprocessing.Process(target=l2_client_test.run,
                                          args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,))
                                          args=(
                                          codes[i * page_size:(i + 1) * page_size], big_order_queue, big_accurate_order_queue, special_codes,))
        process.start()
        # 绑核运行
        psutil.Process(process.pid).cpu_affinity([i])
    threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start()
    threading.Thread(target=__save_accurate_big_order, args=(big_accurate_order_queue,), daemon=True).start()
    while True:
        try:
            data = big_order_queue.get()