Administrator
2025-04-29 7a08ac115597c920a11731ba584fae9f6028ecb2
L2成交数据精确订阅
4个文件已修改
167 ■■■■■ 已修改文件
huaxin_client/l2_client_test.py 112 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_test.py 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/block_special_codes_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_test.py
@@ -39,20 +39,81 @@
class L2TransactionDataManager:
    def __init__(self, code):
    def __init__(self, code, accurate_buy=False):
        """
        @param code:
        @param accurate_buy: 是否需要精确的买单信息
        """
        self.code = code
        self.__latest_buy_order = None
        self.__big_buy_orders = []
        # 精确的买单信息,{买单号:订单信息}
        self.__big_accurate_buy_order_dict = {}
        self.__latest_sell_order = None
        self.__big_sell_orders = []
        self.big_accurate_buy_order_queue = queue.Queue(maxsize=10240)
        self.big_buy_order_queue = queue.Queue(maxsize=10240)
        self.big_sell_order_queue = queue.Queue(maxsize=10240)
        self.accurate_buy = accurate_buy
        self.__last_accurate_buy_count = 0
    def get_big_buy_orders(self):
        return self.__big_buy_orders
    def get_big_sell_orders(self):
        return self.__big_sell_orders
    def add_transaction_data_for_accurate_buy(self, data):
        """
        获取精确的买单信息
        @param data:
        @return:
        """
        def format_timestamp(timestamp):
            time_str = str(timestamp)
            return int(time_str[:5] if time_str[0] == '9' else time_str[:6])
        item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"])
        # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
        #         "TradeVolume": pTransaction['TradeVolume'],
        #         "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
        #         "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
        #         "SellNo": pTransaction['SellNo'],
        #         "ExecType": pTransaction['ExecType'].decode()}
        money = round(item[2] * item[3])
        volume = item[3]
        price = item[2]
        order_time = data["OrderTime"]
        if item[0] not in self.__big_accurate_buy_order_dict:
            # (买单号, 量, 金额, 时间, 最新成交价格)
            self.__big_accurate_buy_order_dict[item[0]] = [item[0], 0, 0, order_time, price]
        buy_order_info = self.__big_accurate_buy_order_dict[item[0]]
        buy_order_info[1] += volume
        buy_order_info[2] += money
        buy_order_info[3] = order_time
        buy_order_info[4] = price
        # 将大单写入本地文件
        if self.__latest_buy_order[0] != item[0]:
            # 有可能是大单成交完成, 判断上个订单是否是大单
            last_buy_order = self.__big_accurate_buy_order_dict.get(self.__latest_buy_order[0])
            if last_buy_order[2] > 299e4:
                self.big_accurate_buy_order_queue.put_nowait(last_buy_order)
            # 如果数据过多需要移除过长时间的小金额数据
            accurate_buy_count = len(self.__big_accurate_buy_order_dict)
            if accurate_buy_count > 10000 and accurate_buy_count - self.__last_accurate_buy_count > 2000:
                # 超过1w条数据且新增2000条数据
                # 超过1w条数据就要移除30分钟之前的数据
                now_time_int = int(tool.trade_time_add_second(tool.get_now_time_str(), -1800).replace(":", ""))
                try:
                    remove_order_nos = [x for x in self.__big_accurate_buy_order_dict if
                                        now_time_int - format_timestamp(
                                            self.__big_accurate_buy_order_dict[x][3]) > 0]
                    if remove_order_nos:
                        for order_no in remove_order_nos:
                            self.__big_accurate_buy_order_dict.pop(order_no)
                finally:
                    self.__last_accurate_buy_count = len(self.__big_accurate_buy_order_dict)
    def add_transaction_data(self, data):
        item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"])
@@ -66,6 +127,10 @@
        volume = item[3]
        price = item[2]
        order_time = data["OrderTime"]
        if self.accurate_buy:
            self.add_transaction_data_for_accurate_buy(data)
        if not self.__latest_buy_order:
            # (买单号, 量, 金额, 时间, 最新成交价格)
            self.__latest_buy_order = [item[0], 0, 0, order_time, price]
@@ -76,7 +141,8 @@
            self.__latest_buy_order[4] = price
        else:
            if self.__latest_buy_order[2] > 1e6:
                d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2], self.__latest_buy_order[3], self.__latest_buy_order[4])
                d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2],
                     self.__latest_buy_order[3], self.__latest_buy_order[4])
                self.__big_buy_orders.append(d)
                self.big_buy_order_queue.put_nowait(d)
@@ -91,7 +157,8 @@
            self.__latest_sell_order[4] = price
        else:
            if self.__latest_sell_order[2] > 1e6:
                d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2], self.__latest_sell_order[3], self.__latest_sell_order[4])
                d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2],
                     self.__latest_sell_order[3], self.__latest_sell_order[4])
                self.__big_sell_orders.append(d)
                self.big_sell_order_queue.put_nowait(d)
            self.__latest_sell_order = [item[1], volume, money, order_time, price]
@@ -110,12 +177,13 @@
    # 代码的上次成交的订单唯一索引
    __last_transaction_keys_dict = {}
    def __init__(self, api, codes):
    def __init__(self, api, codes, special_codes):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
        self.codes = codes
        self.codes_volume_and_price_dict = {}
        self.special_codes = special_codes
    def __split_codes(self, codes):
        szse_codes = []
@@ -227,7 +295,8 @@
                    "ExecType": pTransaction['ExecType'].decode()}
            if item["SecurityID"] not in l2_transaction_data_dict:
                l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
            l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
            l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item,
                                                                              item["SecurityID"] in self.special_codes)
    def OnRtnNGTSTick(self, pTick):
        """
@@ -246,12 +315,13 @@
                        "ExecType": '1'}
                if item["SecurityID"] not in l2_transaction_data_dict:
                    l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
                l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
                l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item, item[
                    "SecurityID"] in self.special_codes)
        except Exception as e:
            logger_local_huaxin_l2_subscript.exception(e)
def __init_l2(codes):
def __init_l2(codes, special_codes):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
@@ -264,7 +334,7 @@
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api, codes)
    spi = Lev2MdSpi(api, codes, special_codes)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
@@ -285,11 +355,18 @@
    api.Init()
def run(codes, _queue: multiprocessing.Queue) -> None:
def run(codes, _queue: multiprocessing.Queue, accurate_buy_order_queue: multiprocessing.Queue, special_codes) -> None:
    """
    运行订阅
    @param accurate_buy_order_queue: 精确大单队列
    @param codes: 订阅的代码
    @param _queue: 数据传输的队列
    @param special_codes: 需要确定完整大单的代码
    @return:
    """
    try:
        log.close_print()
        __init_l2(codes)
        __init_l2(codes, special_codes)
        logger_system.info(f"L2订阅服务启动成功:")
    except Exception as e:
        logger_system.exception(e)
@@ -306,6 +383,18 @@
                            break
                except:
                    pass
                try:
                    while True:
                        result = l2_transaction_data_manager.big_accurate_buy_order_queue.get(block=False)
                        if result:
                            accurate_buy_order_queue.put_nowait((code, 0, result))
                        else:
                            break
                except:
                    pass
                try:
                    while True:
                        result = l2_transaction_data_manager.big_sell_order_queue.get(block=False)
@@ -319,4 +408,3 @@
            pass
        finally:
            time.sleep(1)
l2_test.py
@@ -13,9 +13,12 @@
from code_attribute import global_data_loader
from huaxin_client import l2_client_test, l1_subscript_codes_manager
from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system
from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \
    logger_local_huaxin_l2_transaction_accurate_big_order
from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
from utils import tool, middle_api_protocol
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import tool, middle_api_protocol, global_util
import urllib.parse as urlparse
from urllib.parse import parse_qs
@@ -105,7 +108,41 @@
            time.sleep(1)
def __get_special_codes():
    """
    获取特殊的代码,需要订阅300w以上的大单
    @return: 代码集合
    """
    try:
        zylt_volume_map = global_util.zylt_volume_map
        codes = set()
        last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0]
        for code in zylt_volume_map:
            volume = zylt_volume_map.get(code)
            # 今日涨停价要突破昨日最高价
            k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
            if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
                # 自由流通市值在30亿-300亿以上
                limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2)
                if limit_up_price > k_bars[0]["high"]:
                    # 今日涨停价要突破昨日最高价
                    codes.add(code)
        return codes
    except Exception as e:
        logger_system.exception(e)
        return set()
def __save_accurate_big_order(big_accurate_order_queue):
    while True:
        try:
            data = big_accurate_order_queue.get()
            logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}")
        except:
            pass
def run():
    special_codes = __get_special_codes()
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    codes = [x.decode() for x in codes_sh]
    codes.extend([x.decode() for x in codes_sz])
@@ -115,17 +152,22 @@
    page_size = int(len(codes) / cpu_count) + 1
    big_order_queue = multiprocessing.Queue(maxsize=1024)
    big_accurate_order_queue = multiprocessing.Queue(maxsize=1024)
    # 大单上传队列
    big_order_upload_queue = queue.Queue(maxsize=1024)
    for i in range(cpu_count):
        process = multiprocessing.Process(target=l2_client_test.run,
                                          args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,))
                                          args=(
                                          codes[i * page_size:(i + 1) * page_size], big_order_queue, big_accurate_order_queue, special_codes,))
        process.start()
        # 绑核运行
        psutil.Process(process.pid).cpu_affinity([i])
    threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start()
    threading.Thread(target=__save_accurate_big_order, args=(big_accurate_order_queue,), daemon=True).start()
    while True:
        try:
            data = big_order_queue.get()
log_module/log.py
@@ -326,6 +326,10 @@
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_transaction_for_big_order",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "transaction_accurate_big_order"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_transaction_for_accurate_big_order",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "orderdetail"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_orderdetail",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -513,6 +517,7 @@
# -------------------------------华鑫本地日志---------------------------------
logger_local_huaxin_l2_transaction = __mylogger.get_logger("local_huaxin_transaction")
logger_local_huaxin_l2_transaction_big_order = __mylogger.get_logger("local_huaxin_transaction_for_big_order")
logger_local_huaxin_l2_transaction_accurate_big_order = __mylogger.get_logger("local_huaxin_transaction_for_accurate_big_order")
logger_local_huaxin_l2_orderdetail = __mylogger.get_logger("local_huaxin_orderdetail")
logger_local_huaxin_l2_upload = __mylogger.get_logger("local_huaxin_upload")
logger_local_huaxin_l2_error = __mylogger.get_logger("local_huaxin_error")
trade/buy_radical/block_special_codes_manager.py
@@ -311,7 +311,7 @@
        limit_up_info_map = self.__get_limit_up_info(min_day)
        fdatas = []
        for b in block_codes_dict:
            # if b != '机器人':
            # if b != '人工智能':
            #     continue
            if b in constant.KPL_INVALID_BLOCKS: