Administrator
2025-06-03 c4ed4da4ac8b8bc24e0a3ed0e782e9248b4a511c
servers/huaxin_trade_server.py
@@ -22,7 +22,7 @@
from huaxin_client import l2_data_transform_protocol
from huaxin_client.trade_transform_protocol import TradeResponse
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, transaction_progress, \
    l2_data_source_util, l2_data_log
    l2_data_source_util, l2_data_log, data_callback
from l2.cancel_buy_strategy import GCancelBigNumComputer, \
    DCancelBigNumComputer, RDCancelBigNumComputer
from l2.code_price_manager import Buy1PriceManager
@@ -37,12 +37,12 @@
from log_module import async_log_util, log_export
from log_module.log import hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_market_data, logger_l2_g_cancel, logger_debug, \
    logger_system, logger_trade, logger_l2_radical_buy, logger_l2_not_buy_reasons
    logger_system, logger_trade, logger_l2_radical_buy, logger_l2_trade
from third_data import block_info, kpl_data_manager, history_k_data_manager, huaxin_l1_data_manager, kpl_api, kpl_util
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager, RealTimeKplMarketData, \
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, RealTimeKplMarketData, \
    KPLPlateForbiddenManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager
from trade import l2_trade_util, \
    trade_data_manager, trade_constant, buy_open_limit_up_strategy
from trade.buy_radical import radical_buy_data_manager, radical_buy_strategy
@@ -53,8 +53,9 @@
from api.outside_api_command_callback import OutsideApiCommandCallback
from trade.huaxin.huaxin_trade_record_manager import DelegateRecordManager
from trade.order_statistic import DealAndDelegateWithBuyModeDataManager
from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, RadicalBuyBlockManager, \
    EveryLimitupBigDealOrderManager, RadicalCodeMarketInfoManager, BeforeSubDealBigOrderManager
from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, \
    EveryLimitupBigDealOrderManager, RadicalCodeMarketInfoManager, BeforeSubDealBigOrderManager, \
    EveryLimitupBigDelegateOrderManager
from trade.sell.sell_rule_manager import TradeRuleManager
from trade.trade_data_manager import RadicalBuyDealCodesManager
from trade.trade_manager import CodesTradeStateManager
@@ -78,6 +79,8 @@
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    __KPLCodeJXBlockManager = KPLCodeJXBlockManager()
    __GCancelBigNumComputer = GCancelBigNumComputer()
    # L2进程对应订阅的代码: {"进程ID": 代码列表}
    __pid_l2_subscript_codes = {}
    def setup(self):
        self.__init()
@@ -241,6 +244,44 @@
                            middle_api_protocol.request(fdata)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "l2_subscript_codes_v2":
                        try:
                            data = data_json["data"]
                            datas = data["data"]
                            pid, datas = datas[0], datas[1]
                            self.__pid_l2_subscript_codes[pid] = datas
                            # print("l2_subscript_codes", data_json)
                            fcodes = []
                            for pid in self.__pid_l2_subscript_codes:
                                codes = self.__pid_l2_subscript_codes[pid]
                                fcodes.extend(codes)
                            # 订阅的代码
                            huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.save_subscript_codes(fcodes)
                            # 上传数据
                            codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes()
                            l2_log.codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
                            fresults = []
                            if codes:
                                for code in codes:
                                    try:
                                        # 获取成交大单:(参考大单金额,已成交大单金额,大单要求金额)
                                        th, is_temp = BeforeSubDealBigOrderManager().get_big_order_threshold_info(code)
                                        deal_big_money_info = radical_buy_data_manager.get_total_deal_big_order_info(
                                            code, gpcode_manager.get_limit_up_price_as_num(code))
                                        deal_big_order_info = (
                                            output_util.money_desc(th), output_util.money_desc(deal_big_money_info[1]),
                                            output_util.money_desc(deal_big_money_info[2]))
                                    except:
                                        deal_big_order_info = None
                                    code_name = gpcode_manager.get_code_name(code)
                                    fresults.append((code, code_name, deal_big_order_info))
                            fdata = middle_api_protocol.load_l2_subscript_codes(fresults)
                            middle_api_protocol.request(fdata)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "get_level1_codes":
                        # print("get_level1_codes")
                        # 获取level1的代码
@@ -330,7 +371,7 @@
            return
        for d in datas:
            # 计算当前是否是涨停状态
            if len(d)==11:
            if len(d) == 11:
                async_log_util.info(logger_debug, f"开1数据:{d}")
            if gpcode_manager.BuyOpenLimitUpCodeManager().is_in_cache(d[0]):
                # 09:19:50 到 09:20:00判断是否要撤单
@@ -357,6 +398,10 @@
        datas = data["data"]
        cls.__save_l1_current_price(datas)
        cls.__process_buy_open_limit_up_datas(datas)
        # 根据高标的实时涨幅计算拉黑板块
        rate_dict = {d[0]: d[2] for d in datas}
        cls.__process_l1_data_thread_pool.submit(
            lambda: KPLPlateForbiddenManager().compute(rate_dict))
        # 9:30之前采用非线程
        if int(tool.get_now_time_str().replace(":", "")) < int("093000") or True:
            HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id)
@@ -375,8 +420,8 @@
        thread_id = random.randint(0, 100000)
        l2_log.threadIds[code] = thread_id
        l2_data_count = len(_datas)
        l2_log.info(code, hx_logger_l2_orderdetail,
                    f"{code}#耗时:{use_time}-{thread_id}#数量:{l2_data_count}#{_datas[-1]}")
        # l2_log.info(code, hx_logger_l2_orderdetail,
        #             f"{code}#耗时:{use_time}-{thread_id}#数量:{l2_data_count}#{_datas[-1]}")
        # l2_data_log.l2_time_log(code, "开始处理L2逐笔委托")
        try:
@@ -469,10 +514,10 @@
            # 即将炸开
            total_deal_big_order_info = radical_buy_data_manager.get_total_deal_big_order_info(code, limit_up_price)
            if total_deal_big_order_info and total_deal_big_order_info[0] <= 0:
                EveryLimitupBigDealOrderManager.clear(code)
                EveryLimitupBigDealOrderManager.clear(code, f"板上放量:{time_str}")
                # 大单足够
                l2_trade_single_callback.process_limit_up_active_buy(code, [], is_almost_open_limit_up=True,
                                                                     l2_market_time_str=time_str)
                # l2_trade_single_callback.process_limit_up_active_buy(code, [], is_almost_open_limit_up=True,
                #                                                      l2_market_time_str=time_str)
    @classmethod
    def trading_order_canceled(cls, code, order_no):
@@ -606,6 +651,9 @@
    __radical_buy_by_blocks_result_cache = {}
    def OnTradeSingle(self, code, big_buy_order_count, _type, data):
        # 暂时不处理
        if True:
            return
        # 只处理深证的票
        try:
            # 判断是否下单
@@ -713,10 +761,9 @@
                # 不处于可下单状态
                return True
            if transaction_datas:
                async_log_util.info(logger_l2_radical_buy, f"涨停主动买:{code}-{transaction_datas[-1]}")
                l2_log.info(code, logger_l2_radical_buy, f"涨停主动买:{code}-{transaction_datas[-1]}")
            else:
                async_log_util.info(logger_l2_radical_buy,
                                    f"即将炸板:{code}-{is_almost_open_limit_up}-{l2_market_time_str}")
                l2_log.info(code, logger_l2_radical_buy, f"即将炸板:{code}-{is_almost_open_limit_up}-{l2_market_time_str}")
            deal_codes = RadicalBuyDealCodesManager().get_deal_codes()
            # 判断今日扫入的代码数量是否大于阈值
            radical_buy_setting = BuyMoneyAndCountSetting().get_radical_buy_setting()
@@ -724,46 +771,22 @@
            if not WantBuyCodesManager().is_in_cache(code):
                # 加绿不判断板块是否成交
                if len(deal_codes) >= MAX_COUNT:
                    async_log_util.info(logger_l2_radical_buy, f"扫入成交代码个数大于{MAX_COUNT}个:{code}-{deal_codes}")
                    l2_log.info(code, logger_l2_radical_buy, f"扫入成交代码个数大于{MAX_COUNT}个:{code}-{deal_codes}")
                    return True
            if code in deal_codes:
                async_log_util.info(logger_l2_radical_buy, f"该代码已经成交:{code}")
                l2_log.info(code, logger_l2_radical_buy, f"该代码已经成交:{code}")
                return True
            # 单票是否可买
            can_buy_result = RadicalBuyDataManager.is_code_can_buy(code)
            if can_buy_result[0]:
                # 获取激进买的板块
                result_cache = self.__radical_buy_by_blocks_result_cache.get(code)
                if not result_cache or result_cache[0] < time.time():
                    # 不存在/过期
                    yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
                    if yesterday_codes is None:
                        yesterday_codes = set()
                    # 计算是否可以扫入
                    radical_result = RadicalBuyBlockManager.is_radical_buy(code, yesterday_codes)
                    async_log_util.info(logger_l2_radical_buy, f"计算板块结果:{code}-{radical_result}")
                    result_cache = (time.time() + 3, radical_result)
                    self.__radical_buy_by_blocks_result_cache[code] = result_cache
                    RadicalBuyDealCodesManager().set_code_blocks(code, radical_result[0])
                    if not radical_result[0]:
                        async_log_util.info(logger_l2_not_buy_reasons, f"{code}#{radical_result[1]}")
                # 取缓存
                result = result_cache[1]
                if result[0]:
                f_buy_blocks, orgin_buy_blocks = radical_buy_strategy.compute_can_radical_buy_blocks(code, deal_codes)
                if orgin_buy_blocks:
                    if not f_buy_blocks:
                        return True
                    # 买入的板块
                    buy_blocks = result[0]
                    # 如果关键词包含已成交的原因就不再下单
                    # 获取已经成交代码的板块
                    try:
                        # ---------------判断板块是否还可以买入----------------
                        f_buy_blocks = radical_buy_data_manager.is_block_can_radical_buy(code, buy_blocks, deal_codes)
                        if not f_buy_blocks:
                            return True
                        buy_blocks = f_buy_blocks
                    except Exception as e:
                        logger_debug.exception(e)
                    buy_blocks = f_buy_blocks
                    # 判断当前时间段是否可以买入
                    mode = OrderBeginPosInfo.MODE_RADICAL
                    can_buy, money, msg = BuyMoneyUtil.get_buy_data(tool.get_now_time_str(), mode,
@@ -772,14 +795,14 @@
                                                                    DealAndDelegateWithBuyModeDataManager().get_delegates_codes_info(
                                                                        mode))
                    if not can_buy:
                        async_log_util.info(logger_l2_radical_buy, f"当前时间段已不能扫入:{code}-{msg}")
                        l2_log.info(code, logger_l2_radical_buy, f"当前时间段已不能扫入:{code}-{msg}")
                        return True
                    # -----根据成交比例判断是否可买------
                    result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas,
                                                                                             is_almost_open_limit_up,
                                                                                             no_left_limit_up_sell=no_left_limit_up_sell)
                    async_log_util.info(logger_l2_radical_buy, f"量买入结果判断:{code}, 结果:{result_by_volume} 板块:{buy_blocks}")
                    l2_log.info(code, logger_l2_radical_buy, f"量买入结果判断:{code}, 结果:{result_by_volume} 板块:{buy_blocks}")
                    in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks()
                    buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),
                                              in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks]
@@ -794,8 +817,7 @@
                            # 判断是否开得太高
                            open_price = L1DataManager.get_open_price(code)
                            if not radical_buy_strategy.is_can_buy_with_open_price(code, open_price):
                                async_log_util.info(logger_l2_radical_buy,
                                                    f"开得太高:{code}")
                                l2_log.info(code, logger_l2_radical_buy, f"开得太高:{code}")
                                radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code)
                                return True
                            # if not RadicalCodeMarketInfoManager().is_opened_limit_up(code):
@@ -807,7 +829,7 @@
                        radical_buy_data_manager.ExcludeIndexComputeCodesManager.remove_code(code)
                        if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT and not tool.is_sh_code(code):
                        if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT:
                            # 上证不能根据成交买入
                            latest_deal_time = l2_huaxin_util.convert_time(transaction_datas[-1][3])
                            refer_sell_data = L2MarketSellManager().get_refer_sell_data(code, latest_deal_time)
@@ -818,6 +840,13 @@
                            if refer_sell_data:
                                sell_info = (refer_sell_data[0], refer_sell_data[1])
                            threshold_money = 0
                            every_deal_orders = EveryLimitupBigDealOrderManager.list_big_buy_deal_orders(code)
                            if every_deal_orders:
                                min_order_no_info = min(every_deal_orders, key=lambda x: x[0])
                                min_order_no = min_order_no_info[0]
                            else:
                                min_order_no = transaction_datas[-1][6]
                            order_begin_pos_info = OrderBeginPosInfo(buy_single_index=buy_single_index,
                                                                     buy_exec_index=buy_exec_index,
                                                                     buy_compute_index=buy_exec_index,
@@ -825,9 +854,11 @@
                                                                     max_num_set=set(),
                                                                     buy_volume_rate=buy_volume_rate,
                                                                     mode=OrderBeginPosInfo.MODE_RADICAL,
                                                                     mode_desc=f"扫入买入:{buy_blocks}",
                                                                     mode_desc=f"扫入买入:{buy_blocks}, 大单成交最小订单号:{min_order_no}",
                                                                     sell_info=sell_info,
                                                                     threshold_money=threshold_money)
                                                                     threshold_money=threshold_money,
                                                                     min_order_no= min_order_no
                                                                     )
                            L2TradeDataProcessor.save_order_begin_data(code, order_begin_pos_info)
                            buy_result = L2TradeDataProcessor.start_buy(code, total_datas[-1], total_datas[-1]["index"],
                                                                        True, block_info=buy_blocks_with_money)
@@ -857,35 +888,139 @@
                                latest_deal_time, buy_blocks_with_money, is_almost_open_limit_up)
                            return False
                    else:
                        async_log_util.info(logger_l2_radical_buy, f"不能下单:{code}-{result_by_volume}")
                        l2_log.info(code, logger_l2_radical_buy, f"不能下单:{code}-{result_by_volume}")
                        return False
                else:
                    volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code)
                    async_log_util.info(logger_l2_radical_buy, f"没有可扫入的板块:{code},量比:{volume_rate}")
                    l2_log.info(code, logger_l2_radical_buy, f"没有可扫入的板块:{code},量比:{volume_rate}")
                    return True
            else:
                async_log_util.info(logger_l2_radical_buy, f"目前代码不可交易:{code}-{can_buy_result[1]}")
                l2_log.info(code, logger_l2_radical_buy, f"目前代码不可交易:{code}-{can_buy_result[1]}")
                return True
        except Exception as e:
            async_log_util.info(logger_debug, f"激进买计算异常:{str(e)}")
            l2_log.info(code, logger_debug, f"激进买计算异常:{str(e)}")
            logger_debug.exception(e)
        finally:
            use_time = time.time() - __start_time
            if use_time > 0.005:
                async_log_util.info(logger_debug, f"扫入处理时长:{code}-{use_time}")
                l2_log.info(code, logger_debug, f"扫入处理时长:{code}-{use_time}")
    def OnLimitUpActiveBuy(self, code, transaction_datas, no_left_limit_up_sell):
        can_clear_before_data = self.process_limit_up_active_buy(code, transaction_datas,
                                                                 no_left_limit_up_sell=no_left_limit_up_sell)
        if can_clear_before_data:
            # 清除
            EveryLimitupBigDealOrderManager.clear(code)
            EveryLimitupBigDealOrderManager.clear(code, "处理涨停成交数据")
        pass
    def OnLastLimitUpSellDeal(self, code, data):
        """
        最后一笔涨停卖数据成交
        @param code:
        @param data:  (data['SecurityID'], data['TradePrice'], data['TradeVolume'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], data['SellNo'], data['ExecType'])
        @return:
        """
        if True:
            return
        if data[6] < data[7]:
            # 非主动买
            return
        # 根据板块判断是否可买
        state = CodesTradeStateManager().get_trade_state_cache(code)
        if not trade_util.is_can_order_by_state(state):
            # 不处于可下单状态
            return
        l2_log.info(code, logger_l2_radical_buy, f"最后一笔涨停卖被吃:{code}-{data}")
        deal_codes = RadicalBuyDealCodesManager().get_deal_codes()
        # 判断今日扫入的代码数量是否大于阈值
        radical_buy_setting = BuyMoneyAndCountSetting().get_radical_buy_setting()
        MAX_COUNT = 4 if radical_buy_setting is None else radical_buy_setting[0]
        if not WantBuyCodesManager().is_in_cache(code):
            # 加绿不判断板块是否成交
            if len(deal_codes) >= MAX_COUNT:
                l2_log.info(code, logger_l2_radical_buy, f"扫入成交代码个数大于{MAX_COUNT}个:{code}-{deal_codes}")
                return
        if code in deal_codes:
            l2_log.info(code, logger_l2_radical_buy, f"该代码已经成交:{code}")
            return
        # 单票是否可买
        can_buy_result = RadicalBuyDataManager.is_code_can_buy(code)
        if not can_buy_result[0]:
            return
        # 获取激进买的板块
        f_buy_blocks, orgin_buy_blocks = radical_buy_strategy.compute_can_radical_buy_blocks(code, deal_codes)
        if not orgin_buy_blocks:
            l2_log.info(code, logger_l2_radical_buy, f"没有可扫入的板块:{code}")
            return
        if not f_buy_blocks:
            return
        # 买入的板块
        buy_blocks = f_buy_blocks
        # 判断当前时间段是否可以买入
        mode = OrderBeginPosInfo.MODE_RADICAL
        can_buy, money, msg = BuyMoneyUtil.get_buy_data(tool.get_now_time_str(), mode,
                                                        DealAndDelegateWithBuyModeDataManager().get_deal_codes_info(
                                                            mode),
                                                        DealAndDelegateWithBuyModeDataManager().get_delegates_codes_info(
                                                            mode))
        if not can_buy:
            l2_log.info(code, logger_l2_radical_buy, f"当前时间段已不能扫入:{code}-{msg}")
            return
        in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks()
        buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),
                                  in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks]
        if not WantBuyCodesManager().is_in_cache(code):
            # 判断是否开得太高
            open_price = L1DataManager.get_open_price(code)
            if not radical_buy_strategy.is_can_buy_with_open_price(code, open_price):
                l2_log.info(code, logger_l2_radical_buy, f"开得太高:{code}")
                radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code)
                return
        radical_buy_data_manager.ExcludeIndexComputeCodesManager.remove_code(code)
        # 根据L2下单
        latest_buy_no = data[6]
        latest_deal_time = l2_huaxin_util.convert_time(data[3])
        # 清除大单委托数据
        EveryLimitupBigDelegateOrderManager.clear(code, '')
        l2_log.info(code, logger_l2_trade, f"计算完板块与大单,准备下单:{data}")
        RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict[code] = (
            time.time() + 1, latest_buy_no, buy_blocks,
            latest_deal_time, buy_blocks_with_money, False)
# 回调
my_l2_data_callback = MyL2DataCallback()
my_l2_data_callbacks = [MyL2DataCallback() for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT)]
my_trade_response = MyTradeResponse()
def run_l2_market_info_reciever(queues: list):
    """
    接收L2 market数据
    @param queues:
    @return:
    """
    def recieve_data(queue):
        while True:
            try:
                d = queue.get()
                # {"type": "l2_market", "data": (code, data)}
                if d["type"] == "l2_market":
                    code, market_data = d["data"]
                    my_l2_data_callback.OnMarketData(code, market_data)
            except:
                pass
    for q in queues:
        threading.Thread(target=recieve_data, args=(q,), daemon=True).start()
# 预埋单
@@ -911,7 +1046,7 @@
                    result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, volume, limit_up_price,
                                                    blocking=False,
                                                    shadow_price=shadow_price, shadow_volume=volume)
                    async_log_util.info(logger_trade, f"{code}下单结束:{result}")
                    l2_log.info(code, logger_trade, f"{code}下单结束:{result}")
                    buy_open_limit_up_strategy.BuyOpenLimitupDataManager().set_place_order_info(code, volume, volume,
                                                                                                result.get("order_ref"))
                except Exception as e:
@@ -994,22 +1129,29 @@
    # L2成交信号回调
    global l2_trade_single_callback
    l2_trade_single_callback = MyL2TradeSingleCallback()
    data_callback.l2_trade_single_callback = l2_trade_single_callback
    L2TradeSingleDataManager.set_callback(l2_trade_single_callback)
    # 加载自由流通量
    global_data_loader.load_zyltgb_volume_from_db()
    # 获取最近7天涨停数最多的板块
    try:
        if not KPLPlateForbiddenManager().list_all_cache() and tool.get_now_time_as_int() > int("070000"):
            # 没有添加过的时候需要重新添加
            datas_ = LatestLimitUpBlockManager().statistics_limit_up_block_infos()
            if datas_:
                for data_ in datas_:
                    # 连续2天的板块就不买
                    if data_[2] >= 2:
                        KPLPlateForbiddenManager().save_plate(data_[0])
    except:
        pass
    # try:
    #     if not KPLPlateForbiddenManager().list_all_cache() and tool.get_now_time_as_int() > int("070000"):
    #         # 没有添加过的时候需要重新添加
    #         datas_ = LatestLimitUpBlockManager().statistics_limit_up_block_infos()
    #         if datas_:
    #             for data_ in datas_:
    #                 # 连续2天的板块就不买
    #                 if data_[2] >= 2:
    #                     KPLPlateForbiddenManager().save_plate(data_[0])
    # except:
    #     pass
    # 初始化数据
    BuyMoneyAndCountSetting()
    gpcode_manager.WantBuyCodesManager()
    # 加载历史K线数据
    HistoryKDataManager().load_data()
def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr):