admin
2023-07-21 f51466f1d4563f97b1ec620b70a1c94f01a6a2e1
交易优化
6个文件已修改
1个文件已添加
558 ■■■■ 已修改文件
client_network.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
command_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_client.py 129 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mylog.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_l2.py 296 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_client.py 104 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client_network.py
@@ -2,6 +2,7 @@
import socket
import constant
import socket_util
class SendResponseSkManager:
@@ -17,6 +18,10 @@
    @classmethod
    def del_send_response_sk(cls, type):
        if type in cls.__send_response_sk_dict:
            try:
                cls.__send_response_sk_dict[type].close()
            except:
                pass
            cls.__send_response_sk_dict.pop(type)
    @classmethod
@@ -35,10 +40,9 @@
        # 添加内容长度头
        msg = cls.format_response(msg)
        sk.send(msg)
        result = sk.recv(1024)
        result, header_str = socket_util.recv_data(sk)
        # print("响应", result)
        if result:
            result = result.decode("utf-8")
            result_json = json.loads(result)
            if result_json.get("code") == 0:
                return True
command_manager.py
@@ -183,7 +183,7 @@
            try:
                sk.send(SendResponseSkManager.format_response(
                    json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
                print("心跳信息发送成功", client_id)
                # print("心跳信息发送成功", client_id)
            except Exception as e:
                print("心跳信息发送失败")
                logging.exception(e)
@@ -332,7 +332,7 @@
            try:
                sk.send(SendResponseSkManager.format_response(
                    json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
                print("心跳信息发送成功", client_id)
                # print("心跳信息发送成功", client_id)
            except Exception as e:
                print("心跳信息发送失败")
                logging.exception(e)
l2_client.py
@@ -2,6 +2,7 @@
# -*- coding: UTF-8 -*-
import json
import logging
import queue
import threading
import time
@@ -36,6 +37,9 @@
SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
                 b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
spi = None
set_codes_data_queue = queue.Queue()
market_code_dict = {}
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
@@ -50,11 +54,7 @@
        self.__api = api
        self.is_login = False
    # 订阅代码,[(代码,最低手数,涨停价)]
    def set_codes_data(self, codes_data):
        print("set_codes_data", codes_data)
        def split_codes(codes):
    def __split_codes(self, codes):
            szse_codes = []
            sse_codes = []
            for code in codes:
@@ -65,8 +65,25 @@
            return sse_codes, szse_codes
        # 新增订阅
        def subscribe(_codes):
            sh, sz = split_codes(_codes)
    # 取消订阅
    def __unsubscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_l2_subscript.info(f"取消订阅上证:{sh}")
        logger_l2_subscript.info(f"取消订阅深证:{sz}")
        if sh:
            # 取消订阅逐笔委托
            self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            # 取消订阅逐笔成交
            self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
        if sz:
            self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
            logger_l2_subscript.info(f"订阅上证:{sh}")
            logger_l2_subscript.info(f"订阅深证:{sz}")
            if sh:
@@ -87,21 +104,7 @@
                result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
                logger_l2_subscript.info(f"市场订阅结果sz:{result}")
        # 取消订阅
        def unsubscribe(_codes):
            sh, sz = split_codes(_codes)
            logger_l2_subscript.info(f"取消订阅上证:{sh}")
            logger_l2_subscript.info(f"取消订阅深证:{sz}")
            if sh:
                # 取消订阅逐笔委托
                self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                # 取消订阅逐笔成交
                self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            if sz:
                self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
                self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
                self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def __process_codes_data(self, codes_data):
        if not self.is_login and not constant.TEST:
            raise Exception("L2尚未登录")
@@ -120,13 +123,15 @@
            l2_data_manager.target_codes.discard(c)
        for c in add_codes:
            l2_data_manager.run_upload_task(c)
        if len(add_codes) > 10:
            unsubscribe(add_codes)
        subscribe(add_codes)
        unsubscribe(del_codes)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
        # 设置最近的代码列表
        self.latest_codes_set = codes
    # 订阅代码,[(代码,最低手数,涨停价)]
    def set_codes_data(self, codes_data):
        self.__process_codes_data(codes_data)
    def set_code_special_watch_volume(self, code, volume):
        # 有效期为3s
@@ -134,44 +139,23 @@
    def OnFrontConnected(self):
        print("OnFrontConnected")
        logout_req = lev2mdapi.CTORATstpUserLogoutField()
        self.__api.ReqUserLogout(logout_req, 1)
        time.sleep(1)
        # 请求登录
        login_req = lev2mdapi.CTORATstpReqUserLoginField()
        self.__api.ReqUserLogin(login_req, 1)
        self.__api.ReqUserLogin(login_req, 2)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
            pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            if g_SubMarketData:
                self.__api.SubscribeMarketData(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
                self.__api.SubscribeMarketData(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubTransaction:
                self.__api.SubscribeTransaction(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
                self.__api.SubscribeTransaction(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubOrderDetail:
                self.__api.SubscribeOrderDetail(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
                self.__api.SubscribeOrderDetail(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubXTSTick:
                self.__api.SubscribeXTSTick(SH_XTS_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
            if g_SubXTSMarketData:
                self.__api.SubscribeXTSMarketData(SH_XTS_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
            if g_SubBondMarketData:
                self.__api.SubscribeBondMarketData(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubBondTransaction:
                self.__api.SubscribeBondTransaction(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubBondOrderDetail:
                self.__api.SubscribeBondOrderDetail(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            # 4.0.5版本接口
            if g_SubNGTSTick:
                self.__api.SubscribeNGTSTick(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
            # t1 = threading.Thread(target=lambda: self.__set_codes_data(), daemon=True)
            # # 后台运行
            # t1.start()
    def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubMarketData")
@@ -184,22 +168,32 @@
    def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubOrderDetail")
        # try:
        print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
              pRspInfo["ErrorMsg"])
        logger_l2_subscript.info(f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        logger_l2_subscript.info(
            f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
            t1 = threading.Thread(target=lambda: l2_data_manager.add_subscript_codes(self.subscripted_codes),
                                  daemon=True)
            # 后台运行
            t1.start()
    def OnRspUnSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspUnSubOrderDetail")
        try:
        code = pSpecificSecurity['SecurityID']
        if code in self.subscripted_codes:
            self.subscripted_codes.remove(code)
            self.subscripted_codes.discard(code)
        if bIsLast == 1:
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
                t1 = threading.Thread(target=lambda: l2_data_manager.add_subscript_codes(self.subscripted_codes),
                                      daemon=True)
                # 后台运行
                t1.start()
        except Exception as e:
            logging.exception(e)
    def OnRspSubBondMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubBondMarketData")
@@ -238,7 +232,10 @@
                 (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
                 (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
             ]}
        l2_data_manager.add_market_data(d)
        market_code_dict[pDepthMarketData['SecurityID']] = time.time()
        t1 = threading.Thread(target=lambda: l2_data_manager.add_market_data(d), daemon=True)
        t1.start()
        # 输出行情快照数据
        # print(
@@ -273,14 +270,14 @@
    def OnRtnTransaction(self, pTransaction):
        code = str(pTransaction['SecurityID'])
        min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
        if min_volume is None:
            # 默认筛选50w
            if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
                return
        elif pTransaction['TradeVolume'] < min_volume:
            return
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
            # 撤单
            item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'],
                    "Volume": pTransaction['TradeVolume'],
@@ -302,7 +299,7 @@
            print("逐笔委托", item)
            l2_data_manager.add_l2_order_detail(item, True)
        else:
            if abs(pTransaction['TradePrice'] - limit_up_price) < 0.001:
            if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201:
                # 涨停价
                # 成交
                item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
@@ -483,6 +480,8 @@
            SendResponseSkManager.send_normal_response("l2_cmd",
                                                       SendResponseSkManager.load_response(client_id, request_id,
                                                                                           {"code": 0, "msg": "设置成功"}))
        except Exception as e:
            logging.exception(e)
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
l2_data_manager.py
@@ -102,7 +102,6 @@
# 上传数据
def upload_data(code, _type, datas):
    print(time.time() * 1000)
    uid = random.randint(0, 100000)
    key = f"{_type}_{code}"
    fdata = json.dumps(
@@ -138,7 +137,7 @@
            if udatas:
                upload_data(code, "l2_order", udatas)
            time.sleep(0.04)
            time.sleep(0.01)
        except Exception as e:
            logger_l2_error.error(f"上传订单数据出错:{str(e)}")
@@ -160,7 +159,7 @@
                udatas.append(temp)
            if udatas:
                upload_data(code, "l2_trans", udatas)
            time.sleep(0.1)
            time.sleep(0.01)
        except Exception as e:
            logger_l2_error.error(f"上传成交数据出错:{str(e)}")
mylog.py
@@ -1,6 +1,8 @@
"""
日志
"""
import platform
from loguru import logger
@@ -24,9 +26,16 @@
                   filter=lambda record: record["extra"].get("name") == "debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("trade", "trade_debug"),
                   filter=lambda record: record["extra"].get("name") == "trade_debug",
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
        return "/home/userzjj/logs/{}/{}".format(dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
        system = platform.system()
        base_dir = "/home/userzjj/logs"
        if system == 'Windows':
            base_dir = "D:/logs/huaxin"
        return "{}/{}/{}".format(base_dir, dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
    def get_logger(self, log_name):
        return logger.bind(name=log_name)
@@ -39,3 +48,4 @@
logger_l2_error = __mylogger.get_logger("error")
logger_l2_subscript = __mylogger.get_logger("subscript")
logger_contact_debug = __mylogger.get_logger("debug")
logger_trade_debug = __mylogger.get_logger("trade_debug")
test_l2.py
New file
@@ -0,0 +1,296 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import sys
import lev2mdapi
Front_Address = "tcp://10.0.1.101:6900"
Multicast_Address = "udp://224.224.2.19:7889"
Multicast_Address2 = "udp://224.224.224.234:7890"
Local_Interface_Address = "192.168.84.75"
Sender_Interface_Address = "10.0.1.101"
g_SubMarketData = True;
g_SubTransaction = True;
g_SubOrderDetail = True;
g_SubXTSTick = True;
g_SubXTSMarketData = True;
g_SubNGTSTick = True;
g_SubBondMarketData = True;
g_SubBondTransaction = True;
g_SubBondOrderDetail = True;
SH_Securities = [b"600035", b"510050", b"600000"];
SH_XTS_Securities = [b"018003", b"113565"];
SZ_Securities = [b"000001", b"128048", b"128125"];
SZ_Bond_Securities = [b"100303", b"109559", b"112617"];
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
    def __init__(self, api):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
    def OnFrontConnected(self):
        print("OnFrontConnected")
        # 请求登录
        login_req = lev2mdapi.CTORATstpReqUserLoginField()
        self.__api.ReqUserLogin(login_req, 1)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
        pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
        if pRspInfo['ErrorID'] == 0:
            if g_SubMarketData:
                self.__api.SubscribeMarketData(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
                self.__api.SubscribeMarketData(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubTransaction:
                self.__api.SubscribeTransaction(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
                self.__api.SubscribeTransaction(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubOrderDetail:
                self.__api.SubscribeOrderDetail(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
                self.__api.SubscribeOrderDetail(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubXTSTick:
                self.__api.SubscribeXTSTick(SH_XTS_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
            if g_SubXTSMarketData:
                self.__api.SubscribeXTSMarketData(SH_XTS_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
            if g_SubBondMarketData:
                self.__api.SubscribeBondMarketData(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubBondTransaction:
                self.__api.SubscribeBondTransaction(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            if g_SubBondOrderDetail:
                self.__api.SubscribeBondOrderDetail(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
            # 4.0.5版本接口
            if g_SubNGTSTick:
                self.__api.SubscribeNGTSTick(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
    def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubMarketData")
    def OnRspSubIndex(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubIndex")
    def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubTransaction")
    def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubOrderDetail")
    def OnRspSubBondMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubBondMarketData")
    def OnRspSubBondTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubBondTransaction")
    def OnRspSubBondOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubBondOrderDetail")
    def OnRspSubXTSMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubXTSMarketData")
    def OnRspSubXTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubXTSTick")
    # 4.0.5版本接口
    def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubNGTSTick")
    def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                        FirstLevelSellOrderVolumes):
        # 输出行情快照数据
        print(
            "OnRtnMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % (
            pDepthMarketData['SecurityID'],
            pDepthMarketData['LastPrice'],
            pDepthMarketData['TotalValueTrade'],
            pDepthMarketData['TotalValueTrade'],
            pDepthMarketData['BidPrice1'],
            pDepthMarketData['BidVolume1'],
            pDepthMarketData['AskPrice1'],
            pDepthMarketData['AskVolume1']))
        # 输出一档价位买队列前50笔委托数量
        for buy_index in range(0, FirstLevelBuyNum):
            print("first level buy  [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index]))
        # 输出一档价位卖队列前50笔委托数量
        for sell_index in range(0, FirstLevelSellNum):
            print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
    def OnRtnIndex(self, pIndex):
        # 输出指数行情数据
        print(
            "OnRtnIndex SecurityID[%s] LastIndex[%.2f] LowIndex[%.2f] HighIndex[%.2f] TotalVolumeTraded[%d] Turnover[%.2f]" % (
            pIndex['SecurityID'],
            pIndex['LastIndex'],
            pIndex['LowIndex'],
            pIndex['HighIndex'],
            pIndex['TotalVolumeTraded'],
            pIndex['Turnover']))
    def OnRtnTransaction(self, pTransaction):
        # 输出逐笔成交数据
        print(
            "OnRtnTransaction SecurityID[%s] TradePrice[%.2f] TradeVolume[%d] TradeTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
            pTransaction['SecurityID'],
            pTransaction['TradePrice'],
            pTransaction['TradeVolume'],
            pTransaction['TradeTime'],
            pTransaction['MainSeq'],
            pTransaction['SubSeq'],
            pTransaction['BuyNo'],
            pTransaction['SellNo']))
    def OnRtnOrderDetail(self, pOrderDetail):
        # 输出逐笔委托数据
        print(
            "OnRtnOrderDetail SecurityID[%s] Price[%.2f] Volume[%d] Side[%s] OrderType[%s] OrderTime[%d] MainSeq[%d] SubSeq[%d]" % (
            pOrderDetail['SecurityID'],
            pOrderDetail['Price'],
            pOrderDetail['Volume'],
            pOrderDetail['Side'],
            pOrderDetail['OrderType'],
            pOrderDetail['OrderTime'],
            pOrderDetail['MainSeq'],
            pOrderDetail['SubSeq']))
    def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                            FirstLevelSellOrderVolumes):
        # 输出行情快照数据
        print(
            "OnRtnBondMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % (
                pDepthMarketData['SecurityID'],
                pDepthMarketData['LastPrice'],
                pDepthMarketData['TotalValueTrade'],
                pDepthMarketData['TotalValueTrade'],
                pDepthMarketData['BidPrice1'],
                pDepthMarketData['BidVolume1'],
                pDepthMarketData['AskPrice1'],
                pDepthMarketData['AskVolume1']))
        # 输出一档价位买队列前50笔委托数量
        for buy_index in range(0, FirstLevelBuyNum):
            print("first level buy  [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index]))
        # 输出一档价位卖队列前50笔委托数量
        for sell_index in range(0, FirstLevelSellNum):
            print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
    def OnRtnBondTransaction(self, pTransaction):
        # 输出逐笔成交数据
        print(
            "OnRtnBondTransaction SecurityID[%s] TradePrice[%.2f] TradeVolume[%d] TradeTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
                pTransaction['SecurityID'],
                pTransaction['TradePrice'],
                pTransaction['TradeVolume'],
                pTransaction['TradeTime'],
                pTransaction['MainSeq'],
                pTransaction['SubSeq'],
                pTransaction['BuyNo'],
                pTransaction['SellNo']))
    def OnRtnBondOrderDetail(self, pOrderDetail):
        # 输出逐笔委托数据
        print(
            "OnRtnBondOrderDetail SecurityID[%s] Price[%.2f] Volume[%d] Side[%s] OrderType[%s] OrderTime[%d] MainSeq[%d] SubSeq[%d]" % (
                pOrderDetail['SecurityID'],
                pOrderDetail['Price'],
                pOrderDetail['Volume'],
                pOrderDetail['Side'],
                pOrderDetail['OrderType'],
                pOrderDetail['OrderTime'],
                pOrderDetail['MainSeq'],
                pOrderDetail['SubSeq']))
    def OnRtnXTSMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                           FirstLevelSellOrderVolumes):
        # 输出行情快照数据
        print(
            "OnRtnXTSMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % (
                pDepthMarketData['SecurityID'],
                pDepthMarketData['LastPrice'],
                pDepthMarketData['TotalValueTrade'],
                pDepthMarketData['TotalValueTrade'],
                pDepthMarketData['BidPrice1'],
                pDepthMarketData['BidVolume1'],
                pDepthMarketData['AskPrice1'],
                pDepthMarketData['AskVolume1']))
        # 输出一档价位买队列前50笔委托数量
        for buy_index in range(0, FirstLevelBuyNum):
            print("first level buy  [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index]))
        # 输出一档价位卖队列前50笔委托数量
        for sell_index in range(0, FirstLevelSellNum):
            print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
    def OnRtnXTSTick(self, pTick):
        # 输出上海债券逐笔数据’
        print(
            "OnXTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
                pTick['TickType'],
                pTick['SecurityID'],
                pTick['Price'],
                pTick['Volume'],
                pTick['TickTime'],
                pTick['MainSeq'],
                pTick['SubSeq'],
                pTick['BuyNo'],
                pTick['SellNo']))
    def OnRtnNGTSTick(self, pTick):
        # 输出上海股基逐笔数据’
        print(
            "OnRtnNGTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
                pTick['TickType'],
                pTick['SecurityID'],
                pTick['Price'],
                pTick['Volume'],
                pTick['TickTime'],
                pTick['MainSeq'],
                pTick['SubSeq'],
                pTick['BuyNo'],
                pTick['SellNo']))
if __name__ == "__main__":
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
    # case 2: 组播方式
    g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
    # case 1缓存模式
    global api
    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api)
    api.RegisterSpi(spi)
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
        api.RegisterFront(Front_Address)
    else:
        # case 1 从一个组播地址收取行情
        api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
    # case 2:注册多个组播地址同时收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, Sender_Interface_Address);
    # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, Sender_Interface_Address);
    # case 3:efvi模式收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, Sender_Interface_Address, "enp101s0f0",4096, True);
    # case 1 不绑核运行
    api.Init()
    # case 2 绑核运行
    # api.Init("2,17")
    str = input("\n")
trade_client.py
@@ -2,43 +2,47 @@
import logging
import os
import socket
import threading
import time
import command_manager
import constant
import socket_util
import traderapi
from client_network import SendResponseSkManager
from log import logger
# 正式账号
UserID = '388000013349'
# 登陆密码
Password = '110808'
# 投资者账户
InvestorID = '388000013349'
# 经济公司部门代码
DepartmentID = '0003'
# 资金账户
AccountID = '388000013349'
# 沪市股东账号
SSE_ShareHolderID = 'A641420991'
# 深市股东账号
SZSE_ShareHolderID = '0345104949'
# # 仿真
# UserID = '00043201'
# UserID = '388000013349'
# # 登陆密码
# Password = '45249973'
# Password = '110808'
# # 投资者账户
# InvestorID = '11160150'
# InvestorID = '388000013349'
# # 经济公司部门代码
# DepartmentID = '0003'
# # 资金账户
# AccountID = '00043201'
# AccountID = '388000013349'
# # 沪市股东账号
# SSE_ShareHolderID = 'A00043201'
# SSE_ShareHolderID = 'A641420991'
# # 深市股东账号
# SZSE_ShareHolderID = '700043201'
# SZSE_ShareHolderID = '0345104949'
# 仿真
from mylog import logger_trade_debug
UserID = '00043201'
# 登陆密码
Password = '45249973'
# 投资者账户
InvestorID = '11160150'
# 经济公司部门代码
DepartmentID = '0003'
# 资金账户
AccountID = '00043201'
# 沪市股东账号
SSE_ShareHolderID = 'A00043201'
# 深市股东账号
SZSE_ShareHolderID = '700043201'
# # 登录用户
# UserID = '00572083'
@@ -719,8 +723,8 @@
    __tradeSimpleApi = TradeSimpleApi()
    def OnTrade(self, client_id, request_id, type_, data):
        print("请求进程ID", os.getpid())
        if type_ == 1:
            logger_trade_debug.info(f"请求下单:client_id-{client_id} request_id-{request_id} data-{data}")
            # 下单
            # 1-买 2-卖
            direction = data["direction"]
@@ -749,6 +753,7 @@
                        json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"))
        elif type_ == 2:
            logger_trade_debug.info(f"请求撤单:client_id-{client_id} request_id-{request_id} data-{data}")
            # 撤单
            direction = data["direction"]
            code = data["code"]
@@ -773,6 +778,7 @@
                        json.dumps({"code": 1, "msg": str(e)}).encode("utf-8"))
    def OnDealList(self, client_id, request_id):
        logger_trade_debug.info(f"请求成交列表:client_id-{client_id} request_id-{request_id}")
        try:
            print("开始请求成交列表")
            req_id = self.__tradeSimpleApi.list_traded_orders()
@@ -782,6 +788,7 @@
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
    def OnDelegateList(self, client_id, request_id, is_cancel):
        logger_trade_debug.info(f"请求委托列表:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.list_delegate_orders(is_cancel)
            req_rid_dict[req_id] = (client_id, request_id)
@@ -789,6 +796,7 @@
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
    def OnMoney(self, client_id, request_id):
        logger_trade_debug.info(f"请求账户:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.get_money_account()
            req_rid_dict[req_id] = (client_id, request_id)
@@ -796,6 +804,7 @@
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
    def OnPositionList(self, client_id, request_id):
        logger_trade_debug.info(f"请求持仓:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.list_positions()
            req_rid_dict[req_id] = (client_id, request_id)
@@ -822,13 +831,13 @@
    if 1:  # 模拟环境,TCP 直连Front方式
        # 注册单个交易前置服务地址
        api.RegisterFront("tcp://192.168.84.31:6500")  # 正式环境主地址
        api.RegisterFront("tcp://192.168.84.32:26500")  # 正式环境备用地址
        # api.RegisterFront("tcp://192.168.84.31:6500")  # 正式环境主地址
        # api.RegisterFront("tcp://192.168.84.32:26500")  # 正式环境备用地址
        # TD_TCP_FrontAddress = "tcp://210.14.72.21:4400"  # 仿真交易环境
        TD_TCP_FrontAddress = "tcp://210.14.72.21:4400"  # 仿真交易环境
        # TD_TCP_FrontAddress = "tcp://210.14.72.15:4400"  # 24小时环境A套
        # TD_TCP_FrontAddress="tcp://210.14.72.16:9500" #24小时环境B套
        # api.RegisterFront(TD_TCP_FrontAddress)
        api.RegisterFront(TD_TCP_FrontAddress)
        # 注册多个交易前置服务地址,用逗号隔开 形如: api.RegisterFront("tcp://10.0.1.101:6500,tcp://10.0.1.101:26500")
        # print("TD_TCP_FensAddress[sim or 24H]::%s\n" % TD_TCP_FrontAddress)
@@ -856,17 +865,33 @@
    api.Init()
def __send_response(type, data_bytes):
    sk = SendResponseSkManager.get_send_response_sk(type)
    data_bytes = socket_util.load_header(data_bytes)
    sk.sendall(data_bytes)
    result, header_str = socket_util.recv_data(sk)
    result = json.loads(result)
    if result["code"] != 0:
        raise Exception(result['msg'])
# 交易反馈回调
def traderapi_callback(type, req_id, data):
    def send_response(data_str):
def __traderapi_callback(type, req_id, data):
    def send_response(data_str, _client_id, _request_id):
        for i in range(3):
        try:
            SendResponseSkManager.get_send_response_sk(type).sendall(data_str)
                __send_response(f"{type}#{_client_id}", data_str)
                print("发送数据成功")
                logger_trade_debug.info(f"第{i}次发送数据成功:type-{type},request_id-{_request_id}")
                break
        except ConnectionResetError as e:
            SendResponseSkManager.del_send_response_sk(type)
            SendResponseSkManager.get_send_response_sk(type).sendall(data_str)
        except BrokenPipeError as e:
            SendResponseSkManager.del_send_response_sk(type)
            SendResponseSkManager.get_send_response_sk(type).sendall(data_str)
            except Exception as e:
                logger_trade_debug.info(f"第{i}次发送数据失败:type-{type},request_id-{_request_id}")
                logger_trade_debug.exception(e)
                pass
    print("回调", type, req_id, data)
    print("进程ID", os.getpid())
@@ -876,20 +901,31 @@
    try:
        print("traderapi_callback", req_rid_dict)
        if req_rid_dict and key in req_rid_dict:
            print("API回调")
            client_id, request_id = req_rid_dict.pop(key)
            # 测试
            send_response(
                json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                            "request_id": request_id}).encode('utf-8'))
            print("结果发送完毕")
                            "request_id": request_id}).encode('utf-8'), client_id, request_id)
            print("API回调结束")
        else:
            print("非API回调")
            # 非API回调
            send_response(
                json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}).encode('utf-8'))
                json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}).encode('utf-8'),
                None,
                req_id)
            print("非API结束")
    except Exception as e:
        logging.exception(e)
# 采用异步回调
def traderapi_callback(type, req_id, data):
    t1 = threading.Thread(target=lambda: __traderapi_callback(type, req_id, data), daemon=True)
    t1.start()
addr, port = constant.SERVER_IP, constant.SERVER_PORT