Administrator
2024-05-17 7ce75af51275ec57ffefac5eab89477a6d8f7957
可转债仿真交易/print方法替换
2个文件已修改
172 ■■■■ 已修改文件
huaxin_client/cb/l2_client_for_cb.py 170 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/async_log_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/cb/l2_client_for_cb.py
@@ -13,7 +13,7 @@
import concurrent.futures
from code_atrribute.history_k_data_util import JueJinHttpApi, JueJinApi
from huaxin_client import command_manager
from huaxin_client import command_manager, l2_data_manager
from huaxin_client import constant
import lev2mdapi
from huaxin_client.command_manager import L2ActionCallback
@@ -49,16 +49,18 @@
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
    latest_codes_set = set()
    special_code_volume_for_order_dict = {}
    # 已经订阅的代码
    subscripted_codes = set()
    subscripted_market_codes = set()
    subscripted_transaction_codes = set()
    # 代码的上次成交的订单唯一索引
    __last_transaction_keys_dict = {}
    limit_up_price_dict = {}
    __is_limit_up_dict = {}
    # 高涨幅的代码
    __high_rate_codes = set()
    # 买入的大单订单号
@@ -81,7 +83,7 @@
    # 新增订阅
    # 取消订阅
    def __unsubscribe(self, _codes):
    def __unsubscribe_trans(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
@@ -91,7 +93,7 @@
        if sz:
            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def __subscribe(self, _codes):
    def __subscribe_trans(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
@@ -103,32 +105,67 @@
            result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
    def __process_codes_data(self, codes, from_cache=False, delay=0.0):
    def __unsubscribe_market(self, _codes):
        sh, sz = self.__split_codes(_codes)
        if sh:
            # 取消订阅逐笔成交
            self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
        if sz:
            self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def __subscribe_market(self, _codes):
        sh, sz = self.__split_codes(_codes)
        if sh:
            # 订阅逐笔成交
            result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
        if sz:
            result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
    def __process_market_codes(self, codes, from_cache=False, delay=0.0):
        codes = set(codes)
        if not self.is_login and not constant.TEST:
            raise Exception("L2尚未登录")
        if delay > 0:
            time.sleep(delay)
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        add_codes = codes - self.subscripted_market_codes
        del_codes = self.subscripted_market_codes - codes
        printlog("add del codes", add_codes, del_codes)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
        self.__subscribe_market(add_codes)
        self.__unsubscribe_market(del_codes)
    def __process_transaction_codes(self, codes, from_cache=False, delay=0.0):
        codes = set(codes)
        if not self.is_login and not constant.TEST:
            raise Exception("L2尚未登录")
        if delay > 0:
            time.sleep(delay)
        add_codes = codes - self.subscripted_transaction_codes
        del_codes = self.subscripted_transaction_codes - codes
        printlog("add del codes", add_codes, del_codes)
        self.__subscribe_trans(add_codes)
        self.__unsubscribe_trans(del_codes)
        if add_codes:
            logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}")
        logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes))
        # 设置最近的代码列表
        self.latest_codes_set = codes
    # 订阅代码,[代码,...]
    def set_codes_data(self, codes):
    def set_market_codes(self, codes):
        try:
            self.__process_codes_data(codes)
            self.__process_market_codes(codes)
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
        finally:
            pass
    def sub_high_rate_codes(self):
        """
        订阅高涨幅的代码
        :return:
        """
        self.__process_transaction_codes(self.__high_rate_codes)
    @classmethod
    def __set_latest_datas(cls, codes_data):
@@ -159,47 +196,36 @@
        if pRspInfo['ErrorID'] == 0:
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            self.set_codes_data(self.codes)
            self.set_market_codes(self.codes)
    def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        if pRspInfo["ErrorID"] == 0:
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
        logger_local_huaxin_l2_subscript.info(f"订阅成功:{len(self.subscripted_codes)}")
            self.subscripted_transaction_codes.add(pSpecificSecurity['SecurityID'])
        logger_local_huaxin_l2_subscript.info(f"成交订阅成功:{len(self.subscripted_transaction_codes)}")
    def OnRspUnSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        try:
            code = pSpecificSecurity['SecurityID']
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                l2_data_manager.add_subscript_codes(self.subscripted_codes)
            self.subscripted_transaction_codes.discard(code)
        except Exception as e:
            logging.exception(e)
    def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        if pRspInfo["ErrorID"] == 0:
            self.subscripted_market_codes.add(pSpecificSecurity['SecurityID'])
    def OnRspUnSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        try:
            code = pSpecificSecurity['SecurityID']
            self.subscripted_market_codes.discard(code)
        except Exception as e:
            logging.exception(e)
    def OnRtnTransaction(self, pTransaction):
        try:
            # 输出逐笔成交数据
            if pTransaction['ExecType'] == b"2":
                # 撤单
                item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'],
                        "Volume": pTransaction['TradeVolume'],
                        "OrderType": "2",
                        "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
                        "SubSeq": pTransaction['SubSeq'],
                        "OrderStatus": "D"}
                buyNo = pTransaction['BuyNo']
                sellNo = pTransaction['SellNo']
                if buyNo > 0:
                    # 买
                    item["OrderNO"] = buyNo
                    item["Side"] = "1"
                elif sellNo > 0:
                    # 卖
                    item["OrderNO"] = sellNo
                    item["Side"] = "2"
            else:
            if pTransaction['ExecType'] != b"2":
                item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
                        "TradeVolume": pTransaction['TradeVolume'],
                        "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
@@ -221,13 +247,55 @@
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
    def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                        FirstLevelSellOrderVolumes):
        # 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5
        try:
            d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
                 "preClosePrice": pDepthMarketData['PreClosePrice'],
                 "lastPrice": pDepthMarketData['LastPrice'],
                 "totalBidVolume": pDepthMarketData['TotalBidVolume'],
                 "avgBidPrice": pDepthMarketData['AvgBidPrice'],
                 "totalAskVolume": pDepthMarketData['TotalAskVolume'],
                 "avgAskPrice": pDepthMarketData["AvgAskPrice"]
                 # "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
                 #         (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
                 #         (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
                 #         (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
                 #         (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
                 # "sell": [
                 #     (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
                 #     (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
                 #     (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
                 #     (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
                 #     (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
                 # ]
                 }
            # 获取涨幅,如果涨幅大于5%/10%就加入目标代码
            rate = round(
                (pDepthMarketData['LastPrice'] - pDepthMarketData['PreClosePrice']) / pDepthMarketData['PreClosePrice'],
                4)
            code = pDepthMarketData['SecurityID']
            if code.find("00") == 0 or code.find("60") == 0:
                if rate >= 0.05:
                    self.__high_rate_codes.add(code)
                else:
                    self.__high_rate_codes.discard(code)
            else:
                if rate >= 0.10:
                    self.__high_rate_codes.add(code)
                else:
                    self.__high_rate_codes.discard(code)
        except:
            pass
class MyL2ActionCallback(L2ActionCallback):
    def OnSetL2Position(self, codes):
        huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes))
        try:
            spi.set_codes_data(codes)
            spi.set_market_codes(codes)
        except Exception as e:
            logging.exception(e)
@@ -244,7 +312,7 @@
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api,codes)
    spi = Lev2MdSpi(api, codes)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
@@ -375,7 +443,19 @@
    return codes
def start_sub_high_price():
    while True:
        try:
            spi.sub_high_rate_codes()
        except:
            pass
        time.sleep(3)
def run() -> None:
    """
    先订阅所有的L2market行情数据,筛选出比较大的涨幅(主板>5%,科创板/创业板>10%)的票,然后订阅其交成交L2数据
    :return:
    """
    logger_system.info("可转债L2进程ID:{}", os.getpid())
    logger_system.info(f"可转债l2_client 线程ID:{tool.get_thread_id()}")
    try:
@@ -384,8 +464,8 @@
        codes = __init_data()
        __init_l2(codes)
        threading.Thread(target=huaxin_l2_log.run_sync, daemon=True).start()
        threading.Thread(target=start_sub_high_price, daemon=True).start()
        # TODO 测试
        # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
        global l2CommandManager
log_module/async_log_util.py
@@ -6,7 +6,7 @@
import threading
import time
from log_module.log import logger_debug, logger_system
from log_module.log import logger_debug, logger_system, printlog
from utils import tool