admin
2023-08-04 ca310f014336d93eba73ed5010c1c5645424a1e0
l2_client.py
@@ -2,6 +2,7 @@
# -*- coding: UTF-8 -*-
import json
import logging
import queue
import threading
import time
@@ -36,6 +37,9 @@
SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
                 b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
spi = None
set_codes_data_queue = queue.Queue()
market_code_dict = {}
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
@@ -50,58 +54,57 @@
        self.__api = api
        self.is_login = False
    # 订阅代码,[(代码,最低手数,涨停价)]
    def set_codes_data(self, codes_data):
        print("set_codes_data", codes_data)
    def __split_codes(self, codes):
        szse_codes = []
        sse_codes = []
        for code in codes:
            if code.find("00") == 0:
                szse_codes.append(code.encode())
            elif code.find("60") == 0:
                sse_codes.append(code.encode())
        return sse_codes, szse_codes
        def split_codes(codes):
            szse_codes = []
            sse_codes = []
            for code in codes:
                if code.find("00") == 0:
                    szse_codes.append(code.encode())
                elif code.find("60") == 0:
                    sse_codes.append(code.encode())
            return sse_codes, szse_codes
    # 新增订阅
        # 新增订阅
        def subscribe(_codes):
            sh, sz = split_codes(_codes)
            logger_l2_subscript.info(f"订阅上证:{sh}")
            logger_l2_subscript.info(f"订阅深证:{sz}")
            if sh:
                # 订阅逐笔委托
                result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_l2_subscript.info(f"逐笔委托订阅结果sh:{result}")
                # 订阅逐笔成交
                result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
    # 取消订阅
    def __unsubscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_l2_subscript.info(f"取消订阅上证:{sh}")
        logger_l2_subscript.info(f"取消订阅深证:{sz}")
        if sh:
            # 取消订阅逐笔委托
            self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            # 取消订阅逐笔成交
            self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
        if sz:
            self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
                result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_l2_subscript.info(f"市场订阅结果sh:{result}")
            if sz:
                result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
                logger_l2_subscript.info(f"逐笔委托订阅结果sz:{result}")
                result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
                logger_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
                result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
                logger_l2_subscript.info(f"市场订阅结果sz:{result}")
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_l2_subscript.info(f"订阅上证:{sh}")
        logger_l2_subscript.info(f"订阅深证:{sz}")
        if sh:
            # 订阅逐笔委托
            result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_l2_subscript.info(f"逐笔委托订阅结果sh:{result}")
            # 订阅逐笔成交
            result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
        # 取消订阅
        def unsubscribe(_codes):
            sh, sz = split_codes(_codes)
            logger_l2_subscript.info(f"取消订阅上证:{sh}")
            logger_l2_subscript.info(f"取消订阅深证:{sz}")
            if sh:
                # 取消订阅逐笔委托
                self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                # 取消订阅逐笔成交
                self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            if sz:
                self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
                self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
                self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_l2_subscript.info(f"市场订阅结果sh:{result}")
        if sz:
            result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_l2_subscript.info(f"逐笔委托订阅结果sz:{result}")
            result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
            result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_l2_subscript.info(f"市场订阅结果sz:{result}")
    def __process_codes_data(self, codes_data):
        if not self.is_login and not constant.TEST:
            raise Exception("L2尚未登录")
@@ -120,13 +123,15 @@
            l2_data_manager.target_codes.discard(c)
        for c in add_codes:
            l2_data_manager.run_upload_task(c)
        if len(add_codes) > 10:
            unsubscribe(add_codes)
        subscribe(add_codes)
        unsubscribe(del_codes)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
        # 设置最近的代码列表
        self.latest_codes_set = codes
    # 订阅代码,[(代码,最低手数,涨停价)]
    def set_codes_data(self, codes_data):
        self.__process_codes_data(codes_data)
    def set_code_special_watch_volume(self, code, volume):
        # 有效期为3s
@@ -134,44 +139,23 @@
    def OnFrontConnected(self):
        print("OnFrontConnected")
        logout_req = lev2mdapi.CTORATstpUserLogoutField()
        self.__api.ReqUserLogout(logout_req, 1)
        time.sleep(1)
        # 请求登录
        login_req = lev2mdapi.CTORATstpReqUserLoginField()
        self.__api.ReqUserLogin(login_req, 1)
        self.__api.ReqUserLogin(login_req, 2)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
            pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            if g_SubMarketData:
                self.__api.SubscribeMarketData(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
                self.__api.SubscribeMarketData(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubTransaction:
                self.__api.SubscribeTransaction(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
                self.__api.SubscribeTransaction(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubOrderDetail:
                self.__api.SubscribeOrderDetail(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
                self.__api.SubscribeOrderDetail(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubXTSTick:
                self.__api.SubscribeXTSTick(SH_XTS_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
            if g_SubXTSMarketData:
                self.__api.SubscribeXTSMarketData(SH_XTS_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
            if g_SubBondMarketData:
                self.__api.SubscribeBondMarketData(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubBondTransaction:
                self.__api.SubscribeBondTransaction(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubBondOrderDetail:
                self.__api.SubscribeBondOrderDetail(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            # 4.0.5版本接口
            if g_SubNGTSTick:
                self.__api.SubscribeNGTSTick(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
            # t1 = threading.Thread(target=lambda: self.__set_codes_data(), daemon=True)
            # # 后台运行
            # t1.start()
    def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubMarketData")
@@ -183,23 +167,29 @@
        print("OnRspSubTransaction")
    def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubOrderDetail")
        print("OnRspSubOrderDetail", pRspInfo)
        # try:
        print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
              pRspInfo["ErrorMsg"])
        logger_l2_subscript.info(f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        logger_l2_subscript.info(
            f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
    def OnRspUnSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspUnSubOrderDetail")
        code = pSpecificSecurity['SecurityID']
        if code in self.subscripted_codes:
            self.subscripted_codes.remove(code)
        if bIsLast == 1:
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
        print("OnRspUnSubOrderDetail", bIsLast)
        try:
            code = pSpecificSecurity['SecurityID']
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                print("取消订阅响应结束", self.subscripted_codes)
                l2_data_manager.add_subscript_codes(self.subscripted_codes)
        except Exception as e:
            logging.exception(e)
    def OnRspSubBondMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubBondMarketData")
@@ -238,6 +228,8 @@
                 (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
                 (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
             ]}
        market_code_dict[pDepthMarketData['SecurityID']] = time.time()
        l2_data_manager.add_market_data(d)
        # 输出行情快照数据
@@ -273,14 +265,14 @@
    def OnRtnTransaction(self, pTransaction):
        code = str(pTransaction['SecurityID'])
        min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        if min_volume is None:
            # 默认筛选50w
            if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
                return
        elif pTransaction['TradeVolume'] < min_volume:
            return
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
            if min_volume is None:
                # 默认筛选50w
                if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
                    return
            elif pTransaction['TradeVolume'] < min_volume:
                return
            # 撤单
            item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'],
                    "Volume": pTransaction['TradeVolume'],
@@ -302,7 +294,7 @@
            print("逐笔委托", item)
            l2_data_manager.add_l2_order_detail(item, True)
        else:
            if abs(pTransaction['TradePrice'] - limit_up_price) < 0.001:
            if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201:
                # 涨停价
                # 成交
                item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
@@ -311,7 +303,7 @@
                        "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
                        "SellNo": pTransaction['SellNo'],
                        "ExecType": pTransaction['ExecType'].decode()}
                print("逐笔成交", item)
                # print("逐笔成交", item)
                l2_data_manager.add_transaction_detail(item)
        logger_l2_transaction.info(
@@ -483,6 +475,8 @@
            SendResponseSkManager.send_normal_response("l2_cmd",
                                                       SendResponseSkManager.load_response(client_id, request_id,
                                                                                           {"code": 0, "msg": "设置成功"}))
        except Exception as e:
            logging.exception(e)
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
@@ -542,6 +536,7 @@
        # 后台运行
        t1.setDaemon(True)
        t1.start()
    l2_data_manager.run_upload_common()
    global l2CommandManager
    l2CommandManager = command_manager.L2CommandManager()
    l2CommandManager.init(constant.SERVER_IP, constant.SERVER_PORT, MyL2ActionCallback())