Administrator
2025-06-03 c4ed4da4ac8b8bc24e0a3ed0e782e9248b4a511c
trade/current_price_process_manager.py
@@ -7,30 +7,162 @@
from l2.huaxin import huaxin_target_codes_manager
from log_module import async_log_util
from log_module.log import logger_l2_codes_subscript
from log_module.log import logger_l2_codes_subscript, logger_debug
import constant
from code_attribute import gpcode_manager
from third_data import kpl_data_constant
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, LimitUpDataConstant
from trade.buy_money_count_setting import BuyMoneyUtil
from trade.buy_radical import radical_buy_data_manager, new_block_processor
from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager, RadicalBuyDataManager
from trade.order_statistic import DealAndDelegateWithBuyModeDataManager
from utils import tool, import_util
from ths.l2_code_operate import L2CodeOperate
from trade import trade_manager, l2_trade_util, trade_constant
from trade.trade_data_manager import CodeActualPriceProcessor
from trade.trade_data_manager import CodeActualPriceProcessor, RadicalBuyDealCodesManager
import concurrent.futures
trade_gui = import_util.import_lib("trade.trade_gui")
__actualPriceProcessor = CodeActualPriceProcessor()
__pre_big_order_deal_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
latest_add_codes = set()
def accept_prices(prices, request_id=None):
    print("总价格代码数量:", len(prices))
    now_str = tool.get_now_time_str()
    # 获取想买单
    want_codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
def compute_code_order(code, top_in_blocks=None, yesterday_limit_up_codes=None, today_history_limit_up_codes=None,
                       top_out_blocks=None):
    """
    计算代码的排序
    @param code:
    @param top_in_blocks: 净流入前几
    @return: 排序值,若为负值不订阅
    """
    # 想买单/隔夜单排序位
    if yesterday_limit_up_codes is None:
        yesterday_limit_up_codes = set()
    if top_in_blocks is None:
        top_in_blocks = []
    if top_out_blocks is None:
        top_out_blocks = []
    if today_history_limit_up_codes is None:
        today_history_limit_up_codes = set()
        # 高位板
    if code in yesterday_limit_up_codes:
        return -1
        # 黑名单
    trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
    # 处于委托状态的必须订阅
    if trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED or trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER:
        return 0
    if l2_trade_util.is_in_forbidden_trade_codes(code):
        # 没有成交
        if trade_state == trade_constant.TRADE_STATE_BUY_SUCCESS:
            # 成交的票
            return 998
        else:
            return -1
    deal_codes = RadicalBuyDealCodesManager().get_deal_codes()
    result = RadicalBuyDataManager().is_code_can_buy(code, deal_codes)
    if not result[0]:
        if len(result) > 2 and result[2]:
            # 可以拉黑
            if new_block_processor.is_can_forbidden(code):
                l2_trade_util.forbidden_trade(code, msg=result[1])
        return -1
    # 想买单
    if gpcode_manager.WantBuyCodesManager().is_in_cache(code):
        return 0
    # 隔夜单排一
    if gpcode_manager.BuyOpenLimitUpCodeManager().is_in_cache(code):
        return 0
    # 如果当前清单处于委托状态就不能移除
    if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED:
        return 0
    # 板块
    blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
    if not blocks:
        return -1
    if code in today_history_limit_up_codes:
        # 涨停过的代码才会计算板块身位
        for b in blocks:
            index = None
            # 只订阅有辨识度的票和新板块前3
            special_codes = BlockSpecialCodesManager().get_block_codes(b)
            if special_codes and code in special_codes:
                # 有辨识度在净流入中则订阅
                if b in top_in_blocks:
                    index = top_in_blocks.index(b)
                    return index + 1
                else:
                    # 辨识度的票没在净流入中,只要不在净流出中就订阅
                    if b not in top_out_blocks:
                        return 200
            else:
                # 没有辨识度,新板块订阅前3
                new_blocks = kpl_data_constant.get_new_blocks(code)
                if new_blocks and b in new_blocks:
                    info = RadicalBuyBlockManager().get_history_index(code, b, yesterday_limit_up_codes)
                    if info[0] > 0:
                        info = RadicalBuyBlockManager().filter_before_codes(code, b, info[0], info[1],
                                                                            yesterday_limit_up_codes)
                    if info[0] < 3:
                        if b in top_in_blocks:
                            index = top_in_blocks.index(b)
                            return index + 1
                        else:
                            return 200
    else:
        # 尚未涨停过的代码,订阅板块有辨识度的前6
        for b in blocks:
            if b not in top_in_blocks and tool.get_now_time_as_int() >= 100000:
                # 10点之后才考虑净流入
                continue
            if b in top_in_blocks:
                index = top_in_blocks.index(b)
            else:
                index = 1000
            special_codes = BlockSpecialCodesManager().get_block_codes(b)
            if not special_codes or code not in special_codes:
                continue
            # 净流入 + 辨识度
            return index + 1
    # 判断今日辨识度
    try:
        for b in blocks:
            if radical_buy_data_manager.RadicalBuyBlockManager.is_today_block_special_codes(code, b,
                                                                                            yesterday_limit_up_codes):
                if b in top_in_blocks:
                    index = top_in_blocks.index(b)
                    return index + 1
    except Exception as e:
        logger_debug.exception(e)
    return 10000
def accept_prices(prices, request_id=None, top_in_blocks=None, yesterday_limit_up_codes=None, top_out_blocks=None):
    """
    接收价格,处理订阅
    @param yesterday_limit_up_codes: 昨日涨停数据
    @param history_limit_up_datas: 历史涨停数据
    @param prices:
    @param request_id:
    @param top_in_blocks: 净流入前几的代码
    @return:
    """
    # logger_debug.debug(f"接收L1数据测试:流入前20-{top_in_blocks}")
    if True:
        today_history_limit_up_codes = set([d[3] for d in LimitUpDataConstant.history_limit_up_datas])
        _code_list = []
        _delete_list = []
        temp_prices = []
        now_time_int = tool.get_now_time_as_int()
        for d in prices:
            code, price = d["code"], float(d["price"])
            temp_prices.append((code, price))
@@ -38,20 +170,18 @@
            pricePre = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code)
            if pricePre is not None:
                # 是否是想买单
                is_want_buy = code in want_codes
                trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
                # 如果当前清单处于委托状态就不能移除
                if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED:
                    is_want_buy = True
                order_index = compute_code_order(code, top_in_blocks, yesterday_limit_up_codes,
                                                 today_history_limit_up_codes, top_out_blocks=top_out_blocks)
                rate = round((price - pricePre) * 100 / pricePre, 2)
                if tool.is_ge_code(code):
                    # 创业板的涨幅需要打折
                    rate = rate/2
                if rate >= 0 and not trade_manager.ForbiddenBuyCodeByScoreManager().is_in_cache(code):
                    # 暂存涨幅为正的代码
                    _code_list.append((rate, code, 1 if is_want_buy else 0))
                elif is_want_buy:
                    _code_list.append((rate, code, 1 if is_want_buy else 0))
                    rate = rate / 2
                if order_index >= 0:
                    if order_index < 1000 and rate >= 5:
                        # 涨幅大于3%的才能订阅
                        _code_list.append((rate, code, order_index))
                    else:
                        _delete_list.append((rate, code, 0))
                else:
                    # 暂存涨幅为负的代码
                    _delete_list.append((rate, code, 0))
@@ -66,19 +196,24 @@
        gpcode_manager.set_prices(temp_prices)
        # -------------------------------处理交易位置分配---------------------------------
        # 排序
        new_code_list = sorted(_code_list, key=lambda e: (e.__getitem__(2), e.__getitem__(0)), reverse=True)
        new_code_list = sorted(_code_list, key=lambda e: (e[2], -e[0]))
        # logger_debug.debug(f"接收L1数据测试:排序过后的代码-{new_code_list[:75]}")
        # -------------------------------处理L2监听---------------------------------
        max_count = constant.HUAXIN_L2_MAX_CODES_COUNT
        _delete_list = []
        for item in new_code_list:
            trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(item[1])
            if l2_trade_util.is_in_forbidden_trade_codes(
                    item[1]) or item[0] < 0:
                    item[1]) and trade_state != trade_constant.TRADE_STATE_BUY_SUCCESS:
                # 拉黑的尚未成交的代码
                _delete_list.append(item)
            elif item[0] < 0:
                # 在(黑名单)/(涨幅小于)的数据
                if trade_manager.CodesTradeStateManager().get_trade_state_cache(
                        item[1]) != trade_constant.TRADE_STATE_BUY_SUCCESS:
                    # 没成交才会加入删除
                    _delete_list.append(item)
                # if trade_manager.CodesTradeStateManager().get_trade_state_cache(
                #         item[1]) != trade_constant.TRADE_STATE_BUY_SUCCESS:
                # 没成交才会加入删除
                _delete_list.append(item)
        for item in _delete_list:
            new_code_list.remove(item)
@@ -98,64 +233,46 @@
        for d in _delete_list:
            del_code_list.append(d[1])
        if del_code_list:
            async_log_util.info(logger_l2_codes_subscript,
                                f"({request_id})需要删除订阅的代码:{del_code_list}")
        if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN:
            # 华鑫L2,获取加入代码的涨停价
            # 是否和上次一样
            try:
                add_code_set = set(add_code_list)
                # global latest_add_codes
                # if not latest_add_codes:
                #     latest_add_codes = set()
                global latest_add_codes
                if not latest_add_codes:
                    latest_add_codes = set()
                # # 判断设置的代码是否相同
                # dif1 = latest_add_codes - add_code_set
                # dif2 = add_code_set - latest_add_codes
                # if dif1 or dif2:
                dif2 = add_code_set - latest_add_codes
                if dif2:
                    # 新增加的订阅需要拉取之前的大单
                    __pre_big_order_deal_thread_pool.submit(radical_buy_data_manager.pull_pre_deal_big_orders_by_codes, dif2)
                if True:
                    global latest_add_codes
                    async_log_util.info(logger_l2_codes_subscript,
                                        f"({request_id})预处理新增订阅代码:{add_code_set - latest_add_codes}")
                                        f"({request_id})预处理新增订阅代码:{dif2}")
                    latest_add_codes = add_code_set
                    add_datas = []
                    for d in add_code_list:
                        limit_up_price = gpcode_manager.get_limit_up_price(d)
                        limit_up_price = round(float(limit_up_price), 2)
                        limit_up_price = gpcode_manager.get_limit_up_price_as_num(d)
                        min_volume = int(round(50 * 10000 / limit_up_price))
                        # 传递笼子价
                        # 需要订阅的特殊的量
                        special_volumes = BuyMoneyUtil.get_possible_buy_volumes(limit_up_price)
                        special_volumes |= set([tool.get_buy_volume_by_money(limit_up_price, x) for x in
                                                constant.AVAILABLE_BUY_MONEYS])
                        add_datas.append(
                            # (代码, 最小量, 涨停价,影子订单价格,买量, 特殊价格)
                            (d, min_volume, limit_up_price, round(tool.get_shadow_price(limit_up_price), 2),
                             tool.get_buy_volume(limit_up_price)))
                             tool.get_buy_volume(limit_up_price), list(special_volumes)))
                    huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.push(add_datas, request_id)
            except Exception as e:
                logging.exception(e)
                logger_debug.exception(e)
        else:
            # 后面的代码数量
            # 先删除应该删除的代码
            for code in del_code_list:
                if gpcode_manager.is_listen_old(code):
                    cid, pid = gpcode_manager.get_listen_code_pos(code)
                    # 强制移除
                    if cid and pid:
                        gpcode_manager.set_listen_code_by_pos(cid, pid, "")
                    # 判断是否在监听里面
                    L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
            # 增加应该增加的代码
            for code in add_code_list:
                if not gpcode_manager.is_listen_old(code):
                    L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
            # 获取卡位数量
            free_count = gpcode_manager.get_free_listen_pos_count()
            if free_count < 2:
                # 空闲位置不足
                listen_codes = gpcode_manager.get_listen_codes()
                for code in listen_codes:
                    if not gpcode_manager.is_in_gp_pool(code):
                        client_id, pos = gpcode_manager.get_listen_code_pos(code)
                        gpcode_manager.set_listen_code_by_pos(client_id, pos, "")
                        free_count += 1
                        if free_count > 2:
                            break
            pass
__trade_price_dict = {}
@@ -165,12 +282,18 @@
# 设置成交价
def set_trade_price(code, price, time_str, limit_up_price):
def set_trade_price(code, price):
    __trade_price_dict[code] = price
    # 需要记录最近一次非涨停价成交的时间
    if limit_up_price and abs(limit_up_price - price) > 0.001:
        # 非涨停价成交
        __trade_price_not_limit_up_info_dict[code] = (price, time_str)
def set_latest_not_limit_up_time(code, time_str_with_ms):
    """
    记录最近的一次上板时间(最近的一笔主动买就是上板时间)
    @param code:
    @param time_str:
    @return:
    """
    __trade_price_not_limit_up_info_dict[code] = time_str_with_ms
# 获取成交价
@@ -178,10 +301,10 @@
    return __trade_price_dict.get(code)
def get_trade_not_limit_up_info(code):
def get_trade_not_limit_up_time_with_ms(code):
    """
    获取最近的非涨停价成交的信息
    获取最近的非板上成交的时间
    @param code:
    @return:(价格,时间)
    @return:(价格, 时间)
    """
    return __trade_price_not_limit_up_info_dict.get(code)