Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
huaxin_client/l2_market_client.py
@@ -14,7 +14,7 @@
from huaxin_client.l2_data_manager import L2DataUploadManager
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, \
    hx_logger_l2_market_data_before_open
    hx_logger_l2_market_data_before_open, hx_logger_l2_debug
from utils import tool
###B类###
@@ -23,7 +23,7 @@
Multicast_Address2 = "udp://224.224.224.234:7890"
Local_Interface_Address = constant.LOCAL_IP
set_codes_data_queue = queue.Queue()
set_codes_data_queue = queue.Queue(maxsize=1000)
market_code_dict = {}
@@ -84,7 +84,6 @@
            raise Exception("L2尚未登录")
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
        # 设置最近的代码列表
@@ -92,7 +91,6 @@
    # 订阅代码,[代码,...]
    def set_codes_data(self, codes):
        print("订阅代码数量:", len(codes))
        try:
            self.__process_codes_data(codes)
        except Exception as e:
@@ -119,7 +117,6 @@
        return []
    def OnFrontConnected(self):
        print("OnFrontConnected")
        logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
        logout_req = lev2mdapi.CTORATstpUserLogoutField()
        self.__api.ReqUserLogout(logout_req, 1)
@@ -129,10 +126,7 @@
        self.__api.ReqUserLogin(login_req, 2)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
            pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            # 初始设置值
@@ -175,7 +169,8 @@
            limit_up_count = len(self.__limit_up_codes)
            # 获取是否涨停价
            limit_up_price = float(
                tool.to_price(decimal.Decimal(str(pDepthMarketData['PreClosePrice'])) * decimal.Decimal(tool.get_limit_up_rate(pDepthMarketData['SecurityID']))))
                tool.to_price(decimal.Decimal(str(pDepthMarketData['PreClosePrice'])) * decimal.Decimal(
                    tool.get_limit_up_rate(pDepthMarketData['SecurityID']))))
            if abs(limit_up_price - pDepthMarketData['LastPrice']) < 0.001 or abs(
                    limit_up_price - pDepthMarketData['BidPrice1']) < 0.001:
                huaxin_l2_log.info(hx_logger_l2_market_data_before_open, f"{d}")
@@ -186,7 +181,8 @@
                market_code_dict[pDepthMarketData.SecurityID] = (
                    pDepthMarketData.SecurityID, pDepthMarketData.BidPrice1, 0.1, pDepthMarketData.TotalBidVolume,
                    time.time(),
                    pDepthMarketData.BidPrice1, pDepthMarketData.BidVolume1)
                    pDepthMarketData.BidPrice1, pDepthMarketData.BidVolume1, pDepthMarketData.BidPrice2,
                    pDepthMarketData.BidVolume2, pDepthMarketData.UpdateTime, pDepthMarketData.PreClosePrice)
            else:
                if pDepthMarketData.SecurityID in market_code_dict:
                    market_code_dict.pop(pDepthMarketData.SecurityID)
@@ -198,7 +194,7 @@
def __init_l2(l2_data_upload_manager):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
    # case 2: 组播方式
@@ -272,6 +268,9 @@
        if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") >= 0:
            # 只读竞价数据
            break
        # 只读9:20-9:25的数据
        if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") < 0:
            continue
        try:
            # (代码,现价,涨幅,量,时间)
            list_ = [market_code_dict[k] for k in market_code_dict]
@@ -283,9 +282,9 @@
                    flist.append(d)
            flist.sort(key=lambda x: x[2], reverse=True)
            datas = flist[:1000]
            hx_logger_l2_debug.info(f"集合竞价涨停:{datas}")
            # 将持仓股加入进去
            datas.extend(plist)
            print("代码数量:", len(datas))
            __upload_codes_info(queue_l1_w_strategy_r, datas)
        except Exception as e:
            pass