Administrator
2025-03-11 46f51dfb83f6e6a2784676bde64577e5f6f28cf0
新版L2订阅/L2成交处理时间日志
6个文件已修改
246 ■■■■ 已修改文件
huaxin_client/l2_client_v2.py 68 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager_v2.py 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/subscript/l2_subscript_manager.py 81 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_v2.py
@@ -46,12 +46,13 @@
    # 买入的大单订单号
    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager):
    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager, processor_index):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
        self.l2_data_upload_manager = l2_data_upload_manager
        self.codes_volume_and_price_dict = {}
        self.processor_index = processor_index
    def __split_codes(self, codes):
        szse_codes = []
@@ -91,29 +92,29 @@
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
        logger_local_huaxin_l2_subscript.info(f"订阅上证({self.processor_index}):{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证({self.processor_index}):{sz}")
        if sh:
            if ENABLE_NGST:
                result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
                logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh({self.processor_index}):{result}")
            else:
                # 订阅逐笔委托
                result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}")
                logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh({self.processor_index}):{result}")
                # 订阅逐笔成交
                result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
                logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh({self.processor_index}):{result}")
            result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}")
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh({self.processor_index}):{result}")
        if sz:
            result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz:{result}")
            logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz({self.processor_index}):{result}")
            result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz({self.processor_index}):{result}")
            result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}")
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz({self.processor_index}):{result}")
    def __process_codes_data(self, codes_data, from_cache=False, delay=0.0):
@@ -130,7 +131,7 @@
            codes.add(code)
            self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5])
            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5])
        logger_l2_codes_subscript.info("华鑫L2订阅总数:{}", len(codes))
        logger_l2_codes_subscript.info("华鑫L2订阅总数({}):{}", self.processor_index, len(codes))
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
@@ -148,11 +149,13 @@
        self.__unsubscribe(del_codes)
        if add_codes:
            logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}")
            logger_system.info(f"新增L2订阅代码数量({self.processor_index}) ({'缓存' if from_cache else ''}):{len(add_codes)}")
            for c in add_codes:
                logger_l2_codes_subscript.info(f"l2委托数据过滤条件:{c} - {self.codes_volume_and_price_dict.get(c)}")
                logger_l2_codes_subscript.info(
                    f"l2委托数据过滤条件({self.processor_index}):{c} - {self.codes_volume_and_price_dict.get(c)}")
        logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes))
        logger_l2_codes_subscript.info("华鑫L2订阅结束({}),add-{} del-{}", self.processor_index, len(add_codes),
                                       len(del_codes))
        # 设置最近的代码列表
        self.latest_codes_set = codes
@@ -167,16 +170,16 @@
            # 保存一份最新的数据
            self.__set_latest_datas(codes_data)
    @classmethod
    def __set_latest_datas(cls, codes_data):
    def __set_latest_datas(self, codes_data):
        path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}"
        data_str = json.dumps([tool.get_now_date_str(), codes_data])
        with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
        with open(path_str, mode='w') as f:
            f.write(data_str)
    @classmethod
    def __get_latest_datas(cls):
        if os.path.exists(constant.L2_CODES_INFO_PATH):
            with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
    def __get_latest_datas(self):
        path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}"
        if os.path.exists(path_str):
            with open(path_str, mode='r') as f:
                str_ = f.readline()
                data_json = json.loads(str_)
                if data_json[0] == tool.get_now_date_str():
@@ -199,7 +202,7 @@
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            logger_system.info(f"L2行情登录成功({self.processor_index})")
            # 初始设置值
            if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0:
                threading.Thread(
@@ -218,10 +221,11 @@
    def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubOrderDetail", pRspInfo)
        # try:
        print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
        print(f"订阅结果({self.processor_index}):", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"],
              pRspInfo["ErrorID"],
              pRspInfo["ErrorMsg"])
        async_log_util.info(logger_local_huaxin_l2_subscript,
                            f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
                            f"订阅结果({self.processor_index}):{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
@@ -244,7 +248,7 @@
    def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        async_log_util.info(logger_local_huaxin_l2_subscript,
                            f"NGTS订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
                            f"NGTS订阅结果({self.processor_index}):{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
@@ -257,7 +261,7 @@
    def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        try:
            code = pSpecificSecurity['SecurityID']
            logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}")
            logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅({self.processor_index}):{code}")
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                print("取消订阅响应结束", self.subscripted_codes)
@@ -529,7 +533,7 @@
            logging.exception(e)
def __init_l2(l2_data_upload_manager):
def __init_l2(l2_data_upload_manager, processor_index):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
@@ -541,7 +545,7 @@
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api, l2_data_upload_manager)
    spi = Lev2MdSpi(api, l2_data_upload_manager, processor_index)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
@@ -634,12 +638,14 @@
            time.sleep(10)
def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list) -> None:
def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list,
        processor_index) -> None:
    """
    运行
    @param queue_r:
    @param queue_data_callback: 低频数据回调队列
    @param channel_list: [((编号,multiprocessing.Array, zmq_address),(编号, multiprocessing.Array, zmq_address))]
    @param processor_index:处理器索引
    @return:
    """
    logger_system.info("L2进程ID:{}", os.getpid())
@@ -653,8 +659,8 @@
        threading.Thread(target=SubscriptDefend.run, daemon=True).start()
        # 初始化
        data_channel_distribute_manager = CodeDataChannelDistributeManager(channel_list)
        l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager)
        __init_l2(l2_data_upload_manager)
        l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager, queue_data_callback)
        __init_l2(l2_data_upload_manager, processor_index)
        l2_data_manager_v2.run_upload_common()
        l2_data_manager_v2.run_log()
        # TODO 测试
huaxin_client/l2_data_manager_v2.py
@@ -2,6 +2,7 @@
import json
import logging
import marshal
import multiprocessing
import queue
import threading
import time
@@ -35,13 +36,20 @@
# L2上传数据管理器
class L2DataUploadManager:
    """
    L2逐笔委托/L2逐笔成交:通过共享内存+ZMQ上传数据
    L2市场行情: 通过普通数据上传队列写入
    L2订阅的代码: 通过普通数据上传队列写入
    """
    TYPE_DELEGATE = 1
    TYPE_TRANSACTION = 2
    TYPE_MARKET = 3
    def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager):
    def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager,
                 common_data_upload_queue: multiprocessing.Queue):
        self.data_channel_distribute_manager = data_channel_distribute_manager
        self.common_data_upload_queue = common_data_upload_queue
        # 代码分配的对象
        self.temp_order_queue_dict = {}
        self.temp_transaction_queue_dict = {}
@@ -144,13 +152,9 @@
                      data['SellNo'], data['ExecType'], time.time()))
    def add_market_data(self, data):
        # 加入上传队列
        # self.market_data_queue.put_nowait(data)
        code = data['securityID']
        # TODO 改为zmq发送
        callback = self.data_channel_distribute_manager.get_distributed_channel(code)
        if callback:
            callback.OnMarketData(code, data)
        #  改为队列回调发送
        self.common_data_upload_queue.put_nowait({"type": "l2_market", "data": (code, data)})
    # 分配上传队列
    def distribute_upload_queue(self, code, _target_codes=None):
@@ -192,9 +196,6 @@
        if code in self.upload_l2_data_task_dict:
            self.upload_l2_data_task_dict.pop(code)
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
@@ -265,7 +266,8 @@
                                                 (code, temp_list))
                    # 通知获取数据
                    _socket = self.data_channel_distribute_manager.get_zmq_socket(zmq_host)
                    _socket.send(msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}}))
                    _socket.send(
                        msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}}))
                    _socket.recv_string()
                    temp_list = []
                else:
l2/l2_transaction_data_manager.py
@@ -2,6 +2,7 @@
L2成交数据处理器
"""
import json
import time
import l2_data_util
from db import redis_manager_delegate as redis_manager
@@ -13,7 +14,7 @@
from log_module import async_log_util, log_export
from log_module.log import hx_logger_l2_transaction_desc, hx_logger_l2_transaction_sell_order, hx_logger_l2_active_sell, \
    hx_logger_l2_transaction_big_buy_order, hx_logger_l2_transaction_big_sell_order
    hx_logger_l2_transaction_big_buy_order, hx_logger_l2_transaction_big_sell_order, hx_logger_l2_upload
from utils import tool
@@ -465,7 +466,8 @@
        # 是否为主动卖
        def is_active_sell(sell_no, buy_no):
            return sell_no > buy_no
        f_start_time = time.time()
        use_time_list = []
        # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #           data['SellNo'], data['ExecType']))
@@ -482,16 +484,20 @@
        if code not in cls.__latest_all_sell_orders_dict:
            cls.__latest_all_sell_orders_dict[code] = []
        sell_no_map = local_today_sellno_map.get(code)
        total_datas = local_today_datas.get(code)
        if not sell_no_map:
            sell_no_map = {}
        # 保存最近的成交价格:(价格,成交时间)
        cls.__latest_trade_price_dict[code] = (datas[-1][1], datas[-1][3])
        __start_time = time.time()
        # 是否还有涨停卖剩下
        no_left_limit_up_sell = L2TradeSingleDataProcessor.process_passive_limit_up_sell_data(code, datas, limit_up_price)
        use_time = time.time() - __start_time
        __start_time = time.time()
        use_time_list.append(("处理涨停卖", use_time))
        async_log_util.info(hx_logger_l2_upload,
                            f"{code}处理涨停卖:{use_time} 数据数量:{len(datas)}  详情:{use_time_list}")
        for d in datas:
            # 获取当前是否为主动买
@@ -544,6 +550,10 @@
            finally:
                cls.__last_trade_data_dict[code] = d
        use_time = time.time() - __start_time
        __start_time = time.time()
        use_time_list.append(("大单统计", use_time))
        latest_time = l2_huaxin_util.convert_time(datas[-1][3], with_ms=True)
        min_time = tool.trade_time_add_millionsecond(latest_time, -1000)
        min_time_int = int(min_time.replace(":", "").replace(".", ""))
@@ -592,6 +602,11 @@
                    total_sell_info[0] += int(latest_sell_order_info[1] * latest_sell_order_info[2])
            big_sell_orders.reverse()
            total_sell_info[1] = big_sell_orders
        use_time = time.time() - __start_time
        __start_time = time.time()
        use_time_list.append(("最近大单统计", use_time))
        # ----------------统计涨停主动买-----------------
        try:
            limit_up_active_buy_datas = []
@@ -604,9 +619,17 @@
                    # 有涨停主动买
                    limit_up_active_buy_datas.append(d)
            L2TradeSingleDataManager.set_limit_up_active_buy(code, limit_up_active_buy_datas, no_left_limit_up_sell)
            use_time = time.time() - __start_time
            __start_time = time.time()
            use_time_list.append(("涨停主动买成交", use_time))
        except:
            pass
        use_time = time.time() - f_start_time
        if use_time > 0.01:
            async_log_util.info(hx_logger_l2_upload,
                                f"{code}处理成交详细用时:{use_time} 数据数量:{len(datas)}  详情:{use_time_list}")
        return total_sell_info
    # 获取最近成交数据
l2/subscript/l2_subscript_manager.py
@@ -4,6 +4,15 @@
import math
import multiprocessing
import random
import threading
import msgpack
import zmq
from huaxin_client import l2_data_transform_protocol
from utils import shared_memery_util
process_manager = None
class TargetCodeProcessManager:
@@ -29,7 +38,12 @@
        # 代码所在队列ID
        self.__code_queue_dict = {}
    def add_codes(self, codes: set):
    def set_codes(self, codes: set):
        """
        设置订阅代码
        @param codes:
        @return: 返回队列与对应分配的代码:[(队列对象, {"代码1","代码2"}),...]
        """
        add_codes = codes - self.__code_queue_dict.keys()
        del_codes = self.__code_queue_dict.keys() - codes
        # 删除代码
@@ -44,7 +58,7 @@
        for code in add_codes:
            # 寻找未满的队列
            for queue_id in self.__queue_codes:
                count_per_process = min(self.__max_code_count_per_queue_dict.get(queue_id), math.ceil(len(codes) / len(self.__com_queues)))
                count_per_process = self.__max_code_count_per_queue_dict.get(queue_id)
                if len(self.__queue_codes[queue_id]) >= count_per_process:
                    # 队列已满
                    continue
@@ -52,6 +66,8 @@
                self.__queue_codes[queue_id].add(code)
                self.__code_queue_dict[code] = queue_id
                break
        return [(self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes[queue_id]) for queue_id in
                self.__queue_codes]
    def get_queues_with_codes(self):
        """
@@ -64,6 +80,67 @@
        return results
class L2DataListener:
    """
    L2数据监听
    """
    def __init__(self, channel_list):
        """
        @param channel_list:channel_list:[((共享内存编号,委托共享内存数组, zmq地址),(共享内存编号,成交共享内存数组, zmq地址))]
        """
        self.channel_list = channel_list
        # 设置共享内存编号与共享内存数组映射
        self.shared_memery_num_object_dict = {}
        for channel in self.channel_list:
            self.shared_memery_num_object_dict[channel[0][0]] = channel[0][1]
            self.shared_memery_num_object_dict[channel[1][0]] = channel[1][1]
    def create_data_listener(self, l2_data_callback: l2_data_transform_protocol.L2DataCallBack):
        """
        创建数据监听器
        @param
        @return:
        """
        for channel in self.channel_list:
            channel_delegate = channel[0]
            channel_deal = channel[1]
            threading.Thread(target=self.__create_l2_zmq_server, args=(channel_delegate[2], l2_data_callback,),
                             daemon=True).start()
            threading.Thread(target=self.__create_l2_zmq_server, args=(channel_deal[2], l2_data_callback,),
                             daemon=True).start()
    def __create_l2_zmq_server(self, ipc_addr, l2_data_callback: l2_data_transform_protocol.L2DataCallBack):
        """
        创建L2逐笔委托/成交zmq服务
        @param ipc_addr:
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv()
            try:
                #接收数据
                data = msgpack.unpackb(data)
                shared_memery_id = data["data"]["memery_number"]
                datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id))
                if data["type"] == 1:
                    # 委托
                    code, data_list, timestamp = datas[0], datas[1], datas[2]
                    l2_data_callback.OnL2Order(code, data_list, timestamp)
                elif data["type"] == 2:
                    # 成交
                    code, data_list = datas[0], datas[1]
                    l2_data_callback.OnL2Transaction(code, data_list)
            except Exception as e:
                pass
            finally:
                socket.send_string("SUCCESS")
if __name__ == "__main__":
    queues = [multiprocessing.Queue() for i in range(7)]
    manager = TargetCodeProcessManager(queues, 10)
main.py
@@ -7,6 +7,7 @@
import constant
from code_attribute import gpcode_manager
from l2.subscript import l2_subscript_manager
from log_module import log
from log_module.log import logger_l2_trade, logger_system
import logging
@@ -100,6 +101,11 @@
    # ======分组======
    # 记录每个分组的数量
    channel_count_list = []
    # 数据回调队列
    data_callback_queue_list = []
    # 消息传递队列
    sub_single_queue_list = []
    for i in range(l2_process_count):
        channel_count = base_channel_count + (1 if i < left_count else 0)
        channel_count_list.append(channel_count)
@@ -108,10 +114,18 @@
        index += channel_count
        # 订阅信号队列, 数据回调队列(回调频次小的数据通过这种回调)
        sub_single_queue, data_callback_queue = multiprocessing.Queue(), multiprocessing.Queue()
        sub_single_queue_list.append(sub_single_queue)
        data_callback_queue_list.append(data_callback_queue)
        l2_process = multiprocessing.Process(target=l2_client_v2.run,
                                             args=(sub_single_queue, data_callback_queue, channels,))
                                             args=(sub_single_queue, data_callback_queue, channels, i, ))
        l2_process.start()
    l2_subscript_manager.process_manager = l2_subscript_manager.TargetCodeProcessManager(sub_single_queue_list, channel_count_list)
    # 监听L2市场行情数据
    huaxin_trade_server.run_l2_market_info_reciever(data_callback_queue_list)
    # 启动ZMQserver,针对委托队列与成交队列进行监听
    l2_subscript_manager.L2DataListener(channel_list).create_data_listener(huaxin_trade_server.my_l2_data_callback)
if __name__ == '__main__':
    # 可绑定16-31之间的核
servers/huaxin_trade_server.py
@@ -868,6 +868,26 @@
my_trade_response = MyTradeResponse()
def run_l2_market_info_reciever(queues: list):
    """
    接收L2 market数据
    @param queues:
    @return:
    """
    def recieve_data(queue):
        while True:
            try:
                d = queue.get()
                # {"type": "l2_market", "data": (code, data)}
                if d["type"] == "l2_market":
                    code, market_data = d["data"]
                    my_l2_data_callback.OnMarketData(code, market_data)
            except:
                pass
    for q in queues:
        threading.Thread(target=recieve_data, args=(q,), daemon=True).start()
# 预埋单
def __test_pre_place_order():
    codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()