Administrator
67 分钟以前 2f2516749615da866e96d8d24e499b7ecbb63a3e
huaxin_client/l1_client.py
@@ -9,7 +9,8 @@
from huaxin_client import socket_util, l1_subscript_codes_manager
import xmdapi
from huaxin_client import tool, constant
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug
from third_data import custom_block_in_money_manager
from utils import tool as out_tool
################B类##################
@@ -121,20 +122,7 @@
            return
        if pMarketDataField.SecurityName.find("ST") >= 0:
            return
        close_price = round(pMarketDataField.UpperLimitPrice / out_tool.get_limit_up_rate(pMarketDataField.SecurityID),
                            2)
        try:
            # 测试L1数据
            if pMarketDataField.SecurityID == '600636' or pMarketDataField.SecurityID == '002430' or pMarketDataField.SecurityID == '300466':
                d = {"SecurityID": pMarketDataField.SecurityID, "PreClosePrice": pMarketDataField.PreClosePrice,
                     "LastPrice": pMarketDataField.LastPrice, "BidPrice1": pMarketDataField.BidPrice1,
                     "BidVolume1": pMarketDataField.BidVolume1, "AskVolume1": pMarketDataField.AskVolume1,
                     "AskPrice1": pMarketDataField.AskPrice1, "UpperLimitPrice": pMarketDataField.UpperLimitPrice,
                     "UpdateTime": pMarketDataField.UpdateTime, "UpdateMillisec": pMarketDataField.UpdateMillisec}
                logger_local_huaxin_l1.info(f"L1数据:{d}")
        except:
            pass
        close_price = pMarketDataField.PreClosePrice
        lastPrice = pMarketDataField.LastPrice
        if pMarketDataField.BidPrice1:
            lastPrice = pMarketDataField.BidPrice1
@@ -142,26 +130,18 @@
        if out_tool.get_limit_up_rate(pMarketDataField.SecurityID) > 1.1001:
            # 涨停板20%以上的打折
            rate = rate / 2
        # print(pMarketDataField.SecurityID, pMarketDataField.SecurityName, rate, pMarketDataField.Volume)
        # (代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间)
        level1_data_dict[pMarketDataField.SecurityID] = (
            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(),
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1)
        # print(
        #     "SecurityID[%s] SecurityName[%s] LastPrice[%.2f] Volume[%d] Turnover[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]"
        #     % (pMarketDataField.SecurityID, pMarketDataField.SecurityName, pMarketDataField.LastPrice,
        #        pMarketDataField.Volume,
        #        pMarketDataField.Turnover, pMarketDataField.BidPrice1, pMarketDataField.BidVolume1,
        #        pMarketDataField.AskPrice1,
        #        pMarketDataField.AskVolume1, pMarketDataField.UpperLimitPrice, pMarketDataField.LowerLimitPrice))
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2,
            pMarketDataField.BidVolume2, pMarketDataField.UpdateTime)
__latest_subscript_codes = set()
def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas):
    if not tool.is_trade_time():
    if not tool.is_trade_time() and not tool.is_pre_trade_time():
        return
    # 上传数据
    type_ = "set_target_codes"
@@ -231,8 +211,18 @@
            time.sleep(3)
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w):
    logger_local_huaxin_l1.info("运行l1订阅服务")
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fixed_codes=None):
    """
    运行l1订阅任务
    @param queue_l1_w_strategy_r: L1方写,策略方读
    @param queue_l1_r_strategy_w: L1方读,策略方写
    @param fixed_codes: 固定要返回数据的代码
    @return:
    """
    if fixed_codes is None:
        fixed_codes = set()
    logger_local_huaxin_l1.info(f"运行l1订阅服务,固定代码:{fixed_codes}")
    codes_sh = []
    codes_sz = []
    for i in range(15):
@@ -287,7 +277,7 @@
    # 等待程序结束
    while True:
        print("数量", len(level1_data_dict))
        # print("数量", len(level1_data_dict))
        try:
            if len(level1_data_dict) < 1:
                continue
@@ -296,17 +286,17 @@
            # (代码,现价,涨幅,量,时间)
            list_ = [level1_data_dict[k] for k in level1_data_dict]
            flist = []
            plist = []
            now_time_int = int(tool.get_now_time_str().replace(":", ""))
            threshold_rate = constant.L1_MIN_RATE_PRE if now_time_int < int(
                "094000") else constant.L1_MIN_RATE
            threshold_rate = constant.L1_MIN_RATE
            for d in list_:
                if d[2] >= threshold_rate:
                    # 涨幅小于5%的需要删除
                if d[2] >= threshold_rate or d[0] in fixed_codes:
                    # 涨幅小于3%的需要删除
                    flist.append(d)
                if d[0] in __position_codes:
                    plist.append(d)
            flist.sort(key=lambda x: x[2], reverse=True)
            # 将固定代码的排在最前
            for code in fixed_codes:
                if code in level1_data_dict:
                    flist.insert(0, level1_data_dict[code])
            # 正式交易之前先处理比较少的数据,不然处理时间久造成数据拥堵
            MAX_COUNT = 500
            if now_time_int < int("092600"):
@@ -316,13 +306,12 @@
            elif now_time_int < int("092900"):
                MAX_COUNT = 400
            datas = flist[:MAX_COUNT]
            # 将持仓股加入进去
            datas.extend(plist)
            if len(datas) > 0:
                logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas))
                __upload_codes_info(queue_l1_w_strategy_r, datas)
        except Exception as e:
            logging.exception(e)
            logger_debug.exception(e)
        finally:
            time.sleep(3)