Administrator
2024-11-18 57d5b97bb7355473f6dd1aee8174f4ccc3f57a5c
L2并发测试
3个文件已添加
357 ■■■■■ 已修改文件
huaxin_client/l2_client_test.py 285 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_test.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_test.spec 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_test.py
New file
@@ -0,0 +1,285 @@
# -*- coding: utf-8 -*-
import logging
import queue
import time
import lev2mdapi
from log_module import log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system
from utils import tool
IS_TEST = True
###B类###
Front_Address = "tcp://10.0.1.101:6900"
Multicast_Address = "udp://224.224.2.19:7889"
Multicast_Address2 = "udp://224.224.224.234:7890"
Local_Interface_Address = "192.168.84.126"
###测试地址###
if IS_TEST:
    Front_Address = "tcp://210.14.72.17:16900"
    Multicast_Address = "udp://224.224.2.19:7889"
    Multicast_Address2 = "udp://224.224.224.234:7890"
    Local_Interface_Address = "192.168.84.126"
g_SubMarketData = False
g_SubTransaction = False
g_SubOrderDetail = False
g_SubXTSTick = False
g_SubXTSMarketData = False
g_SubNGTSTick = False
g_SubBondMarketData = False
g_SubBondTransaction = False
g_SubBondOrderDetail = False
set_codes_data_queue = queue.Queue()
market_code_dict = {}
ENABLE_NGST = True
class L2TransactionDataManager:
    def __init__(self, code):
        self.code = code
        self.__latest_buy_order = None
        self.__big_buy_orders = []
        self.__latest_sell_order = None
        self.__big_sell_orders = []
    def get_big_buy_orders(self):
        return self.__big_buy_orders
    def get_big_sell_orders(self):
        return self.__big_sell_orders
    def add_transaction_data(self, data):
        item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"])
        # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
        #         "TradeVolume": pTransaction['TradeVolume'],
        #         "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
        #         "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
        #         "SellNo": pTransaction['SellNo'],
        #         "ExecType": pTransaction['ExecType'].decode()}
        money = round(item[2] * item[3])
        volume = item[3]
        if not self.__latest_buy_order:
            self.__latest_buy_order = [item[0], 0, 0]
        if self.__latest_buy_order[0] == item[0]:
            self.__latest_buy_order[1] += volume
            self.__latest_buy_order[2] += money
        else:
            if self.__latest_buy_order[2] > 1e6:
                self.__big_buy_orders.append((self.__latest_buy_order[0],self.__latest_buy_order[1], self.__latest_buy_order[2]))
            self.__latest_buy_order = [item[0],volume,  money]
        if not self.__latest_sell_order:
            self.__latest_sell_order = [item[1], 0, 0]
        if self.__latest_sell_order[0] == item[1]:
            self.__latest_sell_order[1] += volume
            self.__latest_sell_order[2] += money
        else:
            if self.__latest_sell_order[2] > 1e6:
                self.__big_sell_orders.append((self.__latest_sell_order[0],self.__latest_sell_order[1], self.__latest_sell_order[2]))
            self.__latest_sell_order = [item[1], volume,  money]
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
    latest_codes_set = set()
    special_code_volume_for_order_dict = {}
    # 已经订阅的代码
    subscripted_codes = set()
    # 代码的上次成交的订单唯一索引
    __last_transaction_keys_dict = {}
    # 买入的大单订单号
    __l2_transaction_data_dict = {}
    def __init__(self, api, codes):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
        self.codes = codes
        self.codes_volume_and_price_dict = {}
    def __split_codes(self, codes):
        szse_codes = []
        sse_codes = []
        for code in codes:
            market_type = tool.get_market_type(code)
            if market_type == tool.MARKET_TYPE_SZSE:
                szse_codes.append(code.encode())
            elif market_type == tool.MARKET_TYPE_SSE:
                sse_codes.append(code.encode())
        return sse_codes, szse_codes
    # 新增订阅
    # 取消订阅
    def __unsubscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
        if sh:
            if ENABLE_NGST:
                result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
            else:
                # 取消订阅逐笔成交
                self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
        if sz:
            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
        if sh:
            if ENABLE_NGST:
                result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
            else:
                # 订阅逐笔成交
                result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
        if sz:
            result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
    def OnFrontConnected(self):
        print("OnFrontConnected")
        logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
        logout_req = lev2mdapi.CTORATstpUserLogoutField()
        self.__api.ReqUserLogout(logout_req, 1)
        time.sleep(1)
        # 请求登录
        login_req = lev2mdapi.CTORATstpReqUserLoginField()
        self.__api.ReqUserLogin(login_req, 2)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
            pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            self.__subscribe(self.codes)
    def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubMarketData")
    def OnRspSubIndex(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubIndex")
    def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubTransaction")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
    def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
    def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        try:
            code = pSpecificSecurity['SecurityID']
            logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}")
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                print("取消订阅响应结束", self.subscripted_codes)
        except Exception as e:
            logging.exception(e)
    def OnRtnTransaction(self, pTransaction):
        code = str(pTransaction['SecurityID'])
        # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
            pass
        else:
            item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
                    "TradeVolume": pTransaction['TradeVolume'],
                    "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
                    "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
                    "SellNo": pTransaction['SellNo'],
                    "ExecType": pTransaction['ExecType'].decode()}
            if item["SecurityID"] not in self.__l2_transaction_data_dict:
                self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
            self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
    def OnRtnNGTSTick(self, pTick):
        """
        上证股票的逐笔委托与成交
        @param pTick:
        @return:
        """
        try:
            if pTick['TickType'] == b'T':
                # 成交
                item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'],
                        "TradeVolume": pTick['Volume'],
                        "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
                        "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'],
                        "SellNo": pTick['SellNo'],
                        "ExecType": '1'}
                if item["SecurityID"] not in self.__l2_transaction_data_dict:
                    self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
                self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
        except Exception as e:
            logger_local_huaxin_l2_subscript.exception(e)
def __init_l2(codes):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
    # case 2: 组播方式
    g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
    if IS_TEST:
        g_SubMode = lev2mdapi.TORA_TSTP_MST_TCP
    # case 1缓存模式
    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api, codes)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
        api.RegisterFront(Front_Address)
    else:
        # case 1 从一个组播地址收取行情
        api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
        # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "")
    # case 2:注册多个组播地址同时收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "");
    # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "");
    # case 3:efvi模式收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True);
    # case 1 不绑核运行
    api.Init()
def run(codes) -> None:
    try:
        log.close_print()
        __init_l2(codes)
        logger_system.info(f"L2订阅服务启动成功:")
    except Exception as e:
        logger_system.exception(e)
    while True:
        time.sleep(2)
if __name__ == "__main__":
    run({"000009", "601618"})
    input()
l2_test.py
New file
@@ -0,0 +1,22 @@
import multiprocessing
from huaxin_client import l2_client_test, l1_subscript_codes_manager
def run():
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    codes = [x.decode() for x in codes_sh]
    codes.extend([x.decode() for x in codes_sz])
    codes.sort()
    cpu_count = 32
    page_size = int(len(codes) / cpu_count) + 1
    for i in range(cpu_count):
        process = multiprocessing.Process(target=l2_client_test.run,
                                          args=(codes[i * page_size:(i + 1) * page_size],))
        process.start()
if __name__ == "__main__":
    run()
    input()
l2_test.spec
New file
@@ -0,0 +1,50 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
    ['l2_test.py'],
    pathex=[],
    binaries=[],
    datas=[],
    hiddenimports=[],
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=[],
    win_no_prefer_redirects=False,
    win_private_assemblies=False,
    noarchive=False,
    cipher=block_cipher,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
    pyz,
    a.scripts,
    [],
    exclude_binaries=True,
    name='gp_server',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=True,
    console=True,
    disable_windowed_traceback=False,
    argv_emulation=False,
    target_arch=None,
    codesign_identity=None,
    entitlements_file=None,
)
coll = COLLECT(
    exe,
    a.binaries,
    a.zipfiles,
    a.datas,
    strip=False,
    upx=True,
    upx_exclude=[],
    name='l2_test',
)