Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
huaxin_client/l1_client.py
@@ -9,7 +9,9 @@
from huaxin_client import socket_util, l1_subscript_codes_manager
import xmdapi
from huaxin_client import tool, constant
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug
from third_data import custom_block_in_money_manager
from utils import tool as out_tool
################B类##################
ADDRESS = "udp://224.224.1.19:7880"
@@ -54,24 +56,27 @@
    def subscribe_codes(self, codes_sh, codes_sz):
        # 重新订阅代码
        print(f"订阅数量:sh-{len(codes_sh)}  sz-{len(codes_sz)}")
        if codes_sh:
            ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
            if ret != 0:
                print('SubscribeMarketData fail, ret[%d]' % ret)
                # print('SubscribeMarketData fail, ret[%d]' % ret)
                pass
            else:
                print('SubscribeMarketData success, ret[%d]' % ret)
                # print('SubscribeMarketData success, ret[%d]' % ret)
                pass
        if codes_sz:
            ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
            if ret != 0:
                print('SubscribeMarketData fail, ret[%d]' % ret)
                # print('SubscribeMarketData fail, ret[%d]' % ret)
                pass
            else:
                print('SubscribeMarketData success, ret[%d]' % ret)
                # print('SubscribeMarketData success, ret[%d]' % ret)
                pass
    def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID):
        if pRspInfoField.ErrorID == 0:
            print('Login success! [%d]' % nRequestID)
            # print('Login success! [%d]' % nRequestID)
            '''
            订阅行情
@@ -90,49 +95,53 @@
        else:
            print('Login fail!!! [%d] [%d] [%s]'
                  % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
            pass
            # print('Login fail!!! [%d] [%d] [%s]'
            #       % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspSubMarketData: OK!')
            # print('OnRspSubMarketData: OK!')
            pass
        else:
            print('OnRspSubMarketData: Error! [%d] [%s]'
                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
            # print('OnRspSubMarketData: Error! [%d] [%s]'
            #       % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
            pass
    def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField):
        if pRspInfoField.ErrorID == 0:
            print('OnRspUnSubMarketData: OK!')
            # print('OnRspUnSubMarketData: OK!')
            pass
        else:
            print('OnRspUnSubMarketData: Error! [%d] [%s]'
                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
            pass
            # print('OnRspUnSubMarketData: Error! [%d] [%s]'
            #       % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
    def OnRtnMarketData(self, pMarketDataField):
        if pMarketDataField.SecurityName.find("S") == 0:
            return
        if pMarketDataField.SecurityName.find("ST") >= 0:
            return
        close_price = round(pMarketDataField.UpperLimitPrice / 1.1, 2)
        rate = round((pMarketDataField.LastPrice - close_price) * 100 / close_price, 2)
        # print(pMarketDataField.SecurityID, pMarketDataField.SecurityName, rate, pMarketDataField.Volume)
        close_price = pMarketDataField.PreClosePrice
        lastPrice = pMarketDataField.LastPrice
        if pMarketDataField.BidPrice1:
            lastPrice = pMarketDataField.BidPrice1
        rate = round((lastPrice - close_price) * 100 / close_price, 2)
        if out_tool.get_limit_up_rate(pMarketDataField.SecurityID) > 1.1001:
            # 涨停板20%以上的打折
            rate = rate / 2
        # (代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间)
        level1_data_dict[pMarketDataField.SecurityID] = (
            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(),
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1)
        # print(
        #     "SecurityID[%s] SecurityName[%s] LastPrice[%.2f] Volume[%d] Turnover[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]"
        #     % (pMarketDataField.SecurityID, pMarketDataField.SecurityName, pMarketDataField.LastPrice,
        #        pMarketDataField.Volume,
        #        pMarketDataField.Turnover, pMarketDataField.BidPrice1, pMarketDataField.BidVolume1,
        #        pMarketDataField.AskPrice1,
        #        pMarketDataField.AskVolume1, pMarketDataField.UpperLimitPrice, pMarketDataField.LowerLimitPrice))
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2,
            pMarketDataField.BidVolume2, pMarketDataField.UpdateTime)
__latest_subscript_codes = set()
def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas):
    if not tool.is_trade_time():
    if not tool.is_trade_time() and not tool.is_pre_trade_time():
        return
    # 上传数据
    type_ = "set_target_codes"
@@ -151,16 +160,9 @@
        logger_local_huaxin_l1.info(f"({request_id})新增加订阅的代码:{add_codes}")
is_re_subscript = False
# 重新订阅代码
def re_subscript(spi: MdSpi):
    try:
        global is_re_subscript
        if is_re_subscript:
            return
        is_re_subscript = True
        codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes()
        if len(codes_sh) > 100 and len(codes_sz) > 100:
            logger_local_huaxin_l1.info(f"重新订阅 sh-{len(codes_sh)} sz-{len(codes_sz)}")
@@ -182,18 +184,50 @@
                codes = set(data["data"])
                global __position_codes
                __position_codes = codes
            logger_local_huaxin_l1.info(f"收到策略消息:{data}", )
        except:
            pass
        finally:
            time.sleep(1)
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w):
    logger_local_huaxin_l1.info("运行l1订阅服务")
def __run_subscript_task(spi):
    """
    运行订阅任务,在9:19到9:29之间开始订阅
    @return:
    """
    is_re_subscript = False
    while True:
        try:
            # 判断是否需要重新订阅
            if tool.is_pre_trade_time():
                re_subscript(spi)
                is_re_subscript = True
            if is_re_subscript:
                break
        except:
            pass
        finally:
            time.sleep(3)
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fixed_codes=None):
    """
    运行l1订阅任务
    @param queue_l1_w_strategy_r: L1方写,策略方读
    @param queue_l1_r_strategy_w: L1方读,策略方写
    @param fixed_codes: 固定要返回数据的代码
    @return:
    """
    if fixed_codes is None:
        fixed_codes = set()
    logger_local_huaxin_l1.info(f"运行l1订阅服务,固定代码:{fixed_codes}")
    codes_sh = []
    codes_sz = []
    for i in range(15):
        try:
            logger_local_huaxin_l1.info("开始获取目标代码")
            codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
            logger_local_huaxin_l1.info(f"获取上证,深证代码数量:sh-{len(codes_sh)} sz-{len(codes_sz)}")
            break
@@ -238,9 +272,12 @@
    #     "002292", 8.06, 9.96, 969500 * 100, time.time())
    threading.Thread(target=__read_from_strategy, args=(queue_l1_r_strategy_w,), daemon=True).start()
    threading.Thread(target=__run_subscript_task, args=(spi,), daemon=True).start()
    # 等待程序结束
    while True:
        print("数量", len(level1_data_dict))
        # print("数量", len(level1_data_dict))
        try:
            if len(level1_data_dict) < 1:
                continue
@@ -249,33 +286,34 @@
            # (代码,现价,涨幅,量,时间)
            list_ = [level1_data_dict[k] for k in level1_data_dict]
            flist = []
            plist = []
            now_time_int = int(tool.get_now_time_str().replace(":", ""))
            threshold_rate = constant.L1_MIN_RATE
            for d in list_:
                if d[2] >= constant.L1_MIN_RATE:
                    # 涨幅小于5%的需要删除
                if d[2] >= threshold_rate or d[0] in fixed_codes:
                    # 涨幅小于3%的需要删除
                    flist.append(d)
                if d[0] in __position_codes:
                    plist.append(d)
            flist.sort(key=lambda x: x[2], reverse=True)
            datas = flist[:200]
            # 将持仓股加入进去
            datas.extend(plist)
            print("代码数量:", len(datas))
            logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas))
            __upload_codes_info(queue_l1_w_strategy_r, datas)
            # 将固定代码的排在最前
            for code in fixed_codes:
                if code in level1_data_dict:
                    flist.insert(0, level1_data_dict[code])
            # 正式交易之前先处理比较少的数据,不然处理时间久造成数据拥堵
            MAX_COUNT = 500
            if now_time_int < int("092600"):
                MAX_COUNT = 200
            elif now_time_int < int("092800"):
                MAX_COUNT = 300
            elif now_time_int < int("092900"):
                MAX_COUNT = 400
            datas = flist[:MAX_COUNT]
            if len(datas) > 0:
                logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas))
                __upload_codes_info(queue_l1_w_strategy_r, datas)
        except Exception as e:
            logging.exception(e)
            logger_debug.exception(e)
        finally:
            time.sleep(3)
            try:
                # 判断是否需要重新订阅
                if tool.is_pre_trade_time():
                    re_subscript(spi)
                else:
                    global is_re_subscript
                    is_re_subscript = False
            except:
                pass
    # 释放接口对象
    api.Release()