Administrator
2025-06-10 efe62c0c92bee36da5179f34bb73e8ee4db6f814
l2_test.py
@@ -2,6 +2,7 @@
import json
import logging
import multiprocessing
import queue
import socketserver
import threading
import time
@@ -10,11 +11,16 @@
import psutil
import requests
from api import low_suction_data_pusher
from code_attribute import global_data_loader
from huaxin_client import l2_client_test, l1_subscript_codes_manager
from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system
from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \
    logger_local_huaxin_l2_transaction_accurate_big_order
from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
from utils import tool
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
from utils import tool, middle_api_protocol, global_util
import urllib.parse as urlparse
from urllib.parse import parse_qs
@@ -57,7 +63,15 @@
            code = ps_dict.get('code')
            money_info = CodeInMoneyManager().get_money_info(code)
            response_data = json.dumps({"code": 0, "data": money_info})
        elif url.path == "/get_codes_money_info":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            codes_str = ps_dict.get('codes')
            codes = json.loads(codes_str)
            fresults = {}
            for code in codes:
                money_info = CodeInMoneyManager().get_money_info(code)
                fresults[code] = money_info
            response_data = json.dumps({"code": 0, "data": fresults})
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
@@ -80,7 +94,70 @@
        logger_system.error(f"端口服务器:{port} 启动失败")
def __run_upload_big_order_task(_queue: queue.Queue):
    # 运行上传大单任务
    while True:
        try:
            datas = []
            while not _queue.empty():
                datas.append(_queue.get())
            if datas:
                # 上传数据
                requests.post("http://192.168.84.71:12881/upload_deal_big_orders", json.dumps(datas))
        except:
            pass
        finally:
            time.sleep(1)
def __get_special_codes():
    """
    获取特殊的代码,需要订阅300w以上的大单
    @return: 代码集合
    """
    try:
        zylt_volume_map = global_util.zylt_volume_map
        codes = set()
        last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0]
        for code in zylt_volume_map:
            if code == '601288':
                print("")
            volume = zylt_volume_map.get(code)
            # 今日涨停价要突破昨日最高价
            k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
            if k_bars and 10e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
                # 自由流通市值在10亿-300亿以上
                limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2)
                if limit_up_price > k_bars[0]["high"] or True:
                    # 今日涨停价要突破昨日最高价
                    codes.add(code)
        # 获取辨识度的票
        special_codes = BlockSpecialCodesManager().get_origin_code_blocks_dict().keys()
        if special_codes:
            codes |= set(special_codes)
        return codes
    except Exception as e:
        logger_system.exception(e)
        return set()
def __save_accurate_big_order(big_accurate_order_queue):
    while True:
        try:
            datas = []
            while not big_accurate_order_queue.empty():
                data = big_accurate_order_queue.get()
                datas.append(data)
            if datas:
                low_suction_data_pusher.push_big_order(datas)
                for data in datas:
                    logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}")
        except:
            pass
def run():
    special_codes = __get_special_codes()
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    codes = [x.decode() for x in codes_sh]
    codes.extend([x.decode() for x in codes_sz])
@@ -89,19 +166,29 @@
    cpu_count = 16
    page_size = int(len(codes) / cpu_count) + 1
    big_order_queue = multiprocessing.Queue()
    big_order_queue = multiprocessing.Queue(maxsize=1024)
    big_accurate_order_queue = multiprocessing.Queue(maxsize=1024)
    # 大单上传队列
    big_order_upload_queue = queue.Queue(maxsize=1024)
    for i in range(cpu_count):
        process = multiprocessing.Process(target=l2_client_test.run,
                                          args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,))
                                          args=(
                                              codes[i * page_size:(i + 1) * page_size], big_order_queue,
                                              big_accurate_order_queue, special_codes,))
        process.start()
        # 绑核运行
        psutil.Process(process.pid).cpu_affinity([i])
    threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start()
    threading.Thread(target=__save_accurate_big_order, args=(big_accurate_order_queue,), daemon=True).start()
    while True:
        try:
            data = big_order_queue.get()
            CodeInMoneyManager().add_data(data)
            # 添加上传数据
            big_order_upload_queue.put_nowait(data)
            logger_local_huaxin_l2_transaction_big_order.info(f"{data}")
        except:
            pass
@@ -142,6 +229,10 @@
            __upload_data("jingxuan_rank", json.dumps(fins))
            __upload_data("jingxuan_rank_out", json.dumps(fouts))
            __upload_codes_in_money()
            try:
                low_suction_data_pusher.push_block_in(in_list)
            except:
                pass
        except Exception as e:
            logging.exception(e)
        finally: