Administrator
2025-06-06 6df8d9ac75a041377c01c80e6e970e5c75ce7662
huaxin_client/l2_market_client.py
@@ -11,10 +11,7 @@
from huaxin_client import l1_subscript_codes_manager
from huaxin_client import constant
import lev2mdapi
from huaxin_client.l2_data_manager import L2DataUploadManager
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, \
    hx_logger_l2_market_data_before_open, hx_logger_l2_debug
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_contact_debug
from utils import tool
###B类###
@@ -34,16 +31,10 @@
    # 已经订阅的代码
    subscripted_codes = set()
    # 涨停代码
    __limit_up_codes = set()
    # 买入的大单订单号
    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager):
    def __init__(self, api):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
        self.l2_data_upload_manager = l2_data_upload_manager
    def __split_codes(self, codes):
        szse_codes = []
@@ -95,7 +86,7 @@
            self.__process_codes_data(codes)
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
            logger_local_huaxin_l2_subscript.exception(e)
        finally:
            # 保存一份最新的数据
            pass
@@ -144,56 +135,33 @@
    def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                        FirstLevelSellOrderVolumes):
        # 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5
        try:
            d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
                 "preClosePrice": pDepthMarketData['PreClosePrice'],
                 "lastPrice": pDepthMarketData['LastPrice'],
                 "totalBidVolume": pDepthMarketData['TotalBidVolume'],
                 "avgBidPrice": pDepthMarketData['AvgBidPrice'],
             "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'],
             "totalValueTrade": pDepthMarketData['TotalValueTrade'],
                 "totalAskVolume": pDepthMarketData['TotalAskVolume'],
                 "avgAskPrice": pDepthMarketData["AvgAskPrice"]
                 # "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
                 #         (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
                 #         (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
                 #         (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
                 #         (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
                 # "sell": [
                 #     (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
                 #     (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
                 #     (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
                 #     (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
                 #     (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
                 # ]
             "avgAskPrice": pDepthMarketData["AvgAskPrice"],
             "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
                     (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
                     (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
                     (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
                     (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
             "sell": [
                 (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
                 (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
                 (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
                 (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
                 (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
             ]
                 }
            limit_up_count = len(self.__limit_up_codes)
            # 获取是否涨停价
            limit_up_price = float(
                tool.to_price(decimal.Decimal(str(pDepthMarketData['PreClosePrice'])) * decimal.Decimal(
                    tool.get_limit_up_rate(pDepthMarketData['SecurityID']))))
            if abs(limit_up_price - pDepthMarketData['LastPrice']) < 0.001 or abs(
                    limit_up_price - pDepthMarketData['BidPrice1']) < 0.001:
                huaxin_l2_log.info(hx_logger_l2_market_data_before_open, f"{d}")
                self.__limit_up_codes.add(pDepthMarketData['SecurityID'])
            else:
                self.__limit_up_codes.discard(pDepthMarketData['SecurityID'])
            if pDepthMarketData.SecurityID in self.__limit_up_codes:
                market_code_dict[pDepthMarketData.SecurityID] = (
                    pDepthMarketData.SecurityID, pDepthMarketData.BidPrice1, 0.1, pDepthMarketData.TotalBidVolume,
                    time.time(),
                    pDepthMarketData.BidPrice1, pDepthMarketData.BidVolume1, pDepthMarketData.BidPrice2,
                    pDepthMarketData.BidVolume2, pDepthMarketData.UpdateTime, pDepthMarketData.PreClosePrice)
            else:
                if pDepthMarketData.SecurityID in market_code_dict:
                    market_code_dict.pop(pDepthMarketData.SecurityID)
            if limit_up_count != len(self.__limit_up_codes):
                huaxin_l2_log.info(hx_logger_l2_market_data_before_open, f"涨停代码:{self.__limit_up_codes}")
        except:
            pass
        # (代码, 时间戳, 价格, 总交易量, 总交易额, 买5, 卖5)
        data = (
        d["securityID"], d["dataTimeStamp"], d["lastPrice"], d["totalVolumeTrade"], d["totalValueTrade"], d["buy"],
        d["sell"])
        market_code_dict[pDepthMarketData.SecurityID] = data
def __init_l2(l2_data_upload_manager):
def __init_l2():
    # print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
@@ -205,7 +173,7 @@
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api, l2_data_upload_manager)
    spi = Lev2MdSpi(api)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
@@ -244,13 +212,13 @@
    if queue_l1_w_strategy_r is not None:
        queue_l1_w_strategy_r.put_nowait(fdata)
    # 记录新增加的代码
    codes = set([x[0] for x in datas])
    add_codes = codes - __latest_subscript_codes
    __latest_subscript_codes.clear()
    for c in codes:
        __latest_subscript_codes.add(c)
    if add_codes:
        hx_logger_l2_market_data_before_open.info(f"({request_id})新增加订阅的代码:{add_codes}")
    # codes = set([x[0] for x in datas])
    # add_codes = codes - __latest_subscript_codes
    # __latest_subscript_codes.clear()
    # for c in codes:
    #     __latest_subscript_codes.add(c)
    # if add_codes:
    #     hx_logger_l2_market_data_before_open.info(f"({request_id})新增加订阅的代码:{add_codes}")
def run(queue_l1_w_strategy_r) -> None:
@@ -258,39 +226,18 @@
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
        # log.close_print()
        # 初始化
        # data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
        # l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)
        __init_l2(None)
        __init_l2()
    except Exception as e:
        logger_system.exception(e)
    while True:
        if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") >= 0:
            # 只读竞价数据
            break
        # 只读9:20-9:25的数据
        if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") < 0:
        # 只读9:30之后的数据
        if tool.get_now_time_str() < '09:24:59':
            continue
        try:
            # (代码,现价,涨幅,量,时间)
            list_ = [market_code_dict[k] for k in market_code_dict]
            flist = []
            plist = []
            for d in list_:
                if d[2] >= constant.L1_MIN_RATE:
                    # 涨幅小于5%的需要删除
                    flist.append(d)
            flist.sort(key=lambda x: x[2], reverse=True)
            datas = flist[:1000]
            hx_logger_l2_debug.info(f"集合竞价涨停:{datas}")
            # 将持仓股加入进去
            datas.extend(plist)
            __upload_codes_info(queue_l1_w_strategy_r, datas)
            __upload_codes_info(queue_l1_w_strategy_r, list_)
        except Exception as e:
            pass
        finally:
            time.sleep(2)
if __name__ == "__main__":
    run(None)