Administrator
27 分钟以前 cb4589db74aac2822f2aeb97eb3c28d2b7d59338
servers/huaxin_trade_server.py
@@ -19,7 +19,8 @@
from code_attribute.code_l1_data_manager import L1DataManager
from code_attribute.gpcode_manager import CodePrePriceManager, CodesNameManager, \
    WantBuyCodesManager
from huaxin_client import l2_data_transform_protocol
from code_attribute.today_max_price_manager import MaxPriceInfoManager
from huaxin_client import l2_data_transform_protocol, l1_subscript_codes_manager
from huaxin_client.trade_transform_protocol import TradeResponse
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, transaction_progress, \
    l2_data_source_util, l2_data_log, data_callback
@@ -38,12 +39,14 @@
from log_module.log import hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_market_data, logger_l2_g_cancel, logger_debug, \
    logger_system, logger_trade, logger_l2_radical_buy, logger_l2_trade
from third_data import block_info, kpl_data_manager, history_k_data_manager, huaxin_l1_data_manager, kpl_api, kpl_util
from third_data import block_info, kpl_data_manager, history_k_data_manager, huaxin_l1_data_manager, kpl_api, kpl_util, \
    third_blocks_manager
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, RealTimeKplMarketData, \
    KPLPlateForbiddenManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from trade import l2_trade_util, \
    trade_data_manager, trade_constant, buy_open_limit_up_strategy
    trade_data_manager, trade_constant, buy_open_limit_up_strategy, auto_add_want_buy_strategy
from trade.buy_radical import radical_buy_data_manager, radical_buy_strategy
from trade.buy_money_count_setting import BuyMoneyAndCountSetting, BuyMoneyUtil
@@ -57,7 +60,7 @@
    EveryLimitupBigDelegateOrderManager
from trade.sell.sell_rule_manager import TradeRuleManager
from trade.trade_data_manager import RadicalBuyDealCodesManager
from trade.trade_manager import CodesTradeStateManager
from trade.trade_manager import CodesTradeStateManager, CodesContinueBuyMoneyManager
from utils import socket_util, middle_api_protocol, tool, huaxin_util, global_util, trade_util, init_data_util, \
    output_util
@@ -233,7 +236,7 @@
                                            code, gpcode_manager.get_limit_up_price_as_num(code))
                                        deal_big_order_info = (
                                            output_util.money_desc(th), output_util.money_desc(deal_big_money_info[1]),
                                            output_util.money_desc(deal_big_money_info[2]))
                                            output_util.money_desc(deal_big_money_info[2]), deal_big_money_info[0] <= 0)
                                    except:
                                        deal_big_order_info = None
                                    code_name = gpcode_manager.get_code_name(code)
@@ -270,7 +273,7 @@
                                            code, gpcode_manager.get_limit_up_price_as_num(code))
                                        deal_big_order_info = (
                                            output_util.money_desc(th), output_util.money_desc(deal_big_money_info[1]),
                                            output_util.money_desc(deal_big_money_info[2]))
                                            output_util.money_desc(deal_big_money_info[2]), deal_big_money_info[0] <= 0)
                                    except:
                                        deal_big_order_info = None
                                    code_name = gpcode_manager.get_code_name(code)
@@ -296,7 +299,7 @@
                            # if d["pre_close"] * tool.get_limit_up_rate(d["sec_id"]) > constant.MAX_SUBSCRIPT_CODE_PRICE:
                            #     continue
                            if (d["listed_date"] + datetime.timedelta(
                                    days=100)).timestamp() > datetime.datetime.now().timestamp():
                                    days=20)).timestamp() > datetime.datetime.now().timestamp():
                                continue
                            fdatas.append(d["sec_id"])
                            code_name_map[d["sec_id"]] = d["sec_name"]
@@ -397,6 +400,16 @@
        datas = data["data"]
        cls.__save_l1_current_price(datas)
        cls.__process_buy_open_limit_up_datas(datas)
        try:
            # 记录今日最高价
            # 09:25之后才开始记录
            if datas and tool.get_now_time_str() > '09:25:00':
                for d in datas:
                    MaxPriceInfoManager().set_price_info(d[0], price=d[1], time=d[9], sell1_info=(d[10], d[11]))
        except Exception as e:
            logger_debug.exception(e)
        # 根据高标的实时涨幅计算拉黑板块
        rate_dict = {d[0]: d[2] for d in datas}
        cls.__process_l1_data_thread_pool.submit(
@@ -419,8 +432,8 @@
        thread_id = random.randint(0, 100000)
        l2_log.threadIds[code] = thread_id
        l2_data_count = len(_datas)
        l2_log.info(code, hx_logger_l2_orderdetail,
                    f"{code}#耗时:{use_time}-{thread_id}#数量:{l2_data_count}#{_datas[-1]}")
        # l2_log.info(code, hx_logger_l2_orderdetail,
        #             f"{code}#耗时:{use_time}-{thread_id}#数量:{l2_data_count}#{_datas[-1]}")
        # l2_data_log.l2_time_log(code, "开始处理L2逐笔委托")
        try:
@@ -435,7 +448,7 @@
    def l2_transaction(cls, code, datas):
        # async_log_util.info(hx_logger_l2_transaction, f"{code}#{datas}")
        if datas:
            HuaXinTransactionDatasProcessor().process_huaxin_transaction_datas_v2(code, datas)
            HuaXinTransactionDatasProcessor().process_huaxin_transaction_datas(code, datas)
    @classmethod
    def l2_market_data(cls, code, data):
@@ -464,7 +477,7 @@
        except Exception as e:
            logger_debug.exception(e)
        pre_close_price = CodePrePriceManager.get_price_pre_cache(code)
        pre_close_price = CodePrePriceManager().get_price_pre_cache(code)
        if pre_close_price is not None:
            average_rate = None
            try:
@@ -629,6 +642,8 @@
            huaxin_trade_record_manager.DelegateRecordManager.add([data])
            if huaxin_util.is_deal(order_status):
                if int(str(data["direction"])) == huaxin_util.TORA_TSTP_D_Buy:
                    # 订单成交回调,移除续买金额+拉黑
                    CodesContinueBuyMoneyManager().remove_continue_buy_money(data["securityID"])
                    l2_trade_util.forbidden_trade(data["securityID"], msg="已成交", force=True)
                    if TradePointManager.get_latest_place_order_mode(
                            data["securityID"]) == OrderBeginPosInfo.MODE_RADICAL:
@@ -650,98 +665,32 @@
    __radical_buy_by_blocks_result_cache = {}
    def OnTradeSingle(self, code, big_buy_order_count, _type, data):
        # 只处理深证的票
        try:
            # 判断是否下单
            state = CodesTradeStateManager().get_trade_state_cache(code)
            if state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_SUCCESS:
                # 已经下单了
                return
        """
        最近涨停卖被吃掉
        @param code:
        @param big_buy_order_count:
        @param _type:
        @param data:
        @return:
        """
        l2_log.debug(code, "最近涨停卖被吃掉{}, {}", code, f"{data}")
        # 暂时不处理
        refer_sell_data = L2MarketSellManager().get_current_total_sell_data(code)
        # 参考总卖额
        refer_sell_money = 0
        if refer_sell_data:
            refer_sell_money = refer_sell_data[1]
        if refer_sell_money < 5000e4:
            l2_log.debug(code, "最近涨停卖被吃,总抛压小于5000w")
            return
            l2_log.debug(code, "成交触发买入计算 触发模式:{} 大单数量:{}", _type, big_buy_order_count)
            total_datas = l2_data_util.local_today_datas.get(code)
            mode_descs = []
            # if big_buy_order_count > 0:
            #     mode_descs.append("300w")
            if l2_data_manager_new.L2TradeDataProcessor.get_active_buy_blocks(code):
                mode_descs.append("身位")
            current_total_sell_data = L2MarketSellManager().get_current_total_sell_data(code)
            sell_info = None
            if current_total_sell_data:
                sell_info = (current_total_sell_data[0], current_total_sell_data[1])
            if _type == L2TradeSingleDataManager.TYPE_PASSIVE and mode_descs:
                # 可以激进下单且必须是首次下单才能激进
                place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
                if tool.is_sz_code(code) and place_order_count == 0 and current_total_sell_data[
                    1] > 500 * 10000 and global_util.zyltgb_map.get(
                    code) < 50 * 100000000:
                    # 首次下单,自由流通50亿以下,总卖额500w才能激进下单
                    mode_descs.insert(0, "成交触发")
                    last_index = total_datas[-1]["index"]
                    volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code)
                    order_begin_pos = OrderBeginPosInfo(buy_single_index=last_index,
                                                        buy_exec_index=last_index,
                                                        buy_compute_index=last_index,
                                                        num=0, count=1,
                                                        max_num_set=set(),
                                                        buy_volume_rate=volume_rate,
                                                        mode=OrderBeginPosInfo.MODE_ACTIVE,
                                                        mode_desc=",".join(mode_descs),
                                                        sell_info=sell_info,
                                                        threshold_money=0)
                    l2_data_manager_new.L2TradeDataProcessor.save_order_begin_data(code, order_begin_pos)
                    l2_log.debug(code, "积极下单,获取到买入执行位置:{} 成交数据触发模式:{} 大单数量:{}",
                                 order_begin_pos.buy_exec_index,
                                 _type, big_buy_order_count)
                    l2_data_manager_new.L2TradeDataProcessor.start_buy(code, total_datas[-1], total_datas[-1]["index"],
                                                                       True, None)
                else:
                    l2_log.debug(code, "积极下单,不满足扫入下单条件,无法扫入")
            else:
                if not tool.is_sz_code(code):
                    return
                # 找到最近的大买单
                for i in range(len(total_datas) - 1, -1, -1):
                    d = total_datas[i]
                    val = d['val']
                    if not L2DataUtil.is_limit_up_price_buy(val):
                        continue
                    if val['num'] * float(val['price']) < 5000:
                        continue
                    if val['orderNo'] < data[0][6]:
                        continue
                    result = L2TradeSingleDataManager.is_can_place_order(code, d)
                    if result and result[0]:
                        volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code)
                        order_begin_pos = OrderBeginPosInfo(buy_single_index=i,
                                                            buy_exec_index=i,
                                                            buy_compute_index=i,
                                                            num=0, count=1,
                                                            max_num_set=set(),
                                                            buy_volume_rate=volume_rate,
                                                            mode=OrderBeginPosInfo.MODE_FAST,
                                                            mode_desc="成交触发",
                                                            sell_info=sell_info,
                                                            threshold_money=0)
                        l2_data_manager_new.L2TradeDataProcessor.save_order_begin_data(code, order_begin_pos)
                        l2_log.debug(code, "非激进下单,获取到买入执行位置:{} 成交数据触发模式:{}",
                                     order_begin_pos.buy_exec_index,
                                     _type)
                        l2_data_manager_new.L2TradeDataProcessor.start_buy(code, total_datas[-1],
                                                                           total_datas[-1]["index"],
                                                                           True, None)
                        break
        except Exception as e:
            logger_debug.exception(e)
        self.process_limit_up_active_buy(code, [data[0]], is_last_sell_deal=True)
    def process_limit_up_active_buy(self, code, transaction_datas, is_almost_open_limit_up=False,
                                    l2_market_time_str='', no_left_limit_up_sell=False):
                                    l2_market_time_str='', no_left_limit_up_sell=False, is_last_sell_deal=False):
        """
        处理涨停主动买
        @param is_last_sell_deal: 是否最近一笔涨停卖被吃
        @param no_left_limit_up_sell: 是否还有剩余涨停卖尚未成交
        @param code:
        @param transaction_datas:
@@ -769,7 +718,7 @@
                if len(deal_codes) >= MAX_COUNT:
                    l2_log.info(code, logger_l2_radical_buy, f"扫入成交代码个数大于{MAX_COUNT}个:{code}-{deal_codes}")
                    return True
            if code in deal_codes:
            if code in deal_codes and not CodesContinueBuyMoneyManager().get_continue_buy_money(code):
                l2_log.info(code, logger_l2_radical_buy, f"该代码已经成交:{code}")
                return True
@@ -795,9 +744,12 @@
                        return True
                    # -----根据成交比例判断是否可买------
                    result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas,
                    if not is_last_sell_deal:
                        result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas,
                                                                                             is_almost_open_limit_up,
                                                                                             no_left_limit_up_sell=no_left_limit_up_sell)
                    else:
                        result_by_volume = radical_buy_strategy.BUY_MODE_BY_L2, f"最后一笔涨停卖成交"
                    l2_log.info(code, logger_l2_radical_buy, f"量买入结果判断:{code}, 结果:{result_by_volume} 板块:{buy_blocks}")
                    in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks()
                    buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),
@@ -825,7 +777,7 @@
                        radical_buy_data_manager.ExcludeIndexComputeCodesManager.remove_code(code)
                        if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT and not tool.is_sh_code(code):
                        if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT:
                            # 上证不能根据成交买入
                            latest_deal_time = l2_huaxin_util.convert_time(transaction_datas[-1][3])
                            refer_sell_data = L2MarketSellManager().get_refer_sell_data(code, latest_deal_time)
@@ -836,6 +788,13 @@
                            if refer_sell_data:
                                sell_info = (refer_sell_data[0], refer_sell_data[1])
                            threshold_money = 0
                            every_deal_orders = EveryLimitupBigDealOrderManager.list_big_buy_deal_orders(code)
                            if every_deal_orders:
                                min_order_no_info = min(every_deal_orders, key=lambda x: x[0])
                                min_order_no = min_order_no_info[0]
                            else:
                                min_order_no = transaction_datas[-1][6]
                            order_begin_pos_info = OrderBeginPosInfo(buy_single_index=buy_single_index,
                                                                     buy_exec_index=buy_exec_index,
                                                                     buy_compute_index=buy_exec_index,
@@ -843,9 +802,11 @@
                                                                     max_num_set=set(),
                                                                     buy_volume_rate=buy_volume_rate,
                                                                     mode=OrderBeginPosInfo.MODE_RADICAL,
                                                                     mode_desc=f"扫入买入:{buy_blocks}",
                                                                     mode_desc=f"扫入买入:{buy_blocks}, 大单成交最小订单号:{min_order_no}",
                                                                     sell_info=sell_info,
                                                                     threshold_money=threshold_money)
                                                                     threshold_money=threshold_money,
                                                                     min_order_no=min_order_no
                                                                     )
                            L2TradeDataProcessor.save_order_begin_data(code, order_begin_pos_info)
                            buy_result = L2TradeDataProcessor.start_buy(code, total_datas[-1], total_datas[-1]["index"],
                                                                        True, block_info=buy_blocks_with_money)
@@ -853,9 +814,9 @@
                                # 下单成功
                                radical_buy_data_manager.BlockPlaceOrderRecordManager().add_record(code, buy_blocks)
                                radical_buy_strategy.clear_data(code, force=True)
                                RDCancelBigNumComputer().clear_data(code)
                                # RDCancelBigNumComputer().clear_data(code)
                                # 大单成交足够
                                RadicalBuyDataManager().big_order_deal_enough(code)
                                # RadicalBuyDataManager().big_order_deal_enough(code)
                            return True
                        else:
                            if transaction_datas:
@@ -893,11 +854,11 @@
                l2_log.info(code, logger_debug, f"扫入处理时长:{code}-{use_time}")
    def OnLimitUpActiveBuy(self, code, transaction_datas, no_left_limit_up_sell):
        # can_clear_before_data = self.process_limit_up_active_buy(code, transaction_datas,
        #                                                          no_left_limit_up_sell=no_left_limit_up_sell)
        # if can_clear_before_data:
        #     # 清除
        #     EveryLimitupBigDealOrderManager.clear(code, "处理涨停成交数据")
        can_clear_before_data = self.process_limit_up_active_buy(code, transaction_datas,
                                                                 no_left_limit_up_sell=no_left_limit_up_sell)
        if can_clear_before_data:
            # 清除
            EveryLimitupBigDealOrderManager.clear(code, "处理涨停成交数据")
        pass
    def OnLastLimitUpSellDeal(self, code, data):
@@ -907,6 +868,9 @@
        @param data:  (data['SecurityID'], data['TradePrice'], data['TradeVolume'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], data['SellNo'], data['ExecType'])
        @return:
        """
        if True:
            return
        if data[6] < data[7]:
            # 非主动买
            return
@@ -1074,6 +1038,20 @@
    logger_debug.info("更新昨日开盘啦实时涨停数据")
def __update_l1_target_codes():
    try:
        codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes()
        if codes_sh and codes_sz:
            l1_subscript_codes_manager.save_codes(codes_sh, codes_sz)
        # 拉取三方板块
        codes = []
        codes.extend(codes_sh)
        codes.extend(codes_sz)
        third_blocks_manager.load_if_less(codes)
    except Exception as e:
        logger_debug.error(e)
# 做一些初始化的操作
def __init():
    def run_pending():
@@ -1086,13 +1064,17 @@
        # 更新K线
        schedule.every().day.at("08:00:01").do(history_k_data_manager.update_history_k_bars)
        schedule.every().day.at("08:30:01").do(history_k_data_manager.update_history_k_bars)
        schedule.every().day.at("09:00:01").do(history_k_data_manager.update_history_k_bars)
        schedule.every().day.at("09:02:01").do(lambda: history_k_data_manager.update_history_k_bars(force=True))
        # 更新账户信息
        schedule.every().day.at("09:00:01").do(huaxin_trade_data_update.add_money_list)
        schedule.every().day.at("09:15:20").do(huaxin_trade_data_update.add_money_list)
        schedule.every().day.at("09:15:20").do(huaxin_trade_data_update.add_money_list)
        # 更新昨日实时涨停数据
        schedule.every().day.at("07:58:00").do(__update_yesterday_kpl_limit_up_datas)
        # 更新代码
        schedule.every().day.at("15:58:00").do(__update_l1_target_codes)
        schedule.every().day.at("08:56:00").do(__update_l1_target_codes)
        # 更新K线
        schedule.every().day.at("16:30:00").do(history_k_data_manager.update_history_k_bars)
        while True:
            try:
@@ -1131,6 +1113,16 @@
    # except:
    #     pass
    # 初始化数据
    BuyMoneyAndCountSetting()
    gpcode_manager.WantBuyCodesManager()
    # 加载历史K线数据
    HistoryKDataManager().load_data()
    # 队列持久化
    threading.Thread(target=lambda: DelegateRecordManager().run(), daemon=True).start()
    # 自动加想策略
    threading.Thread(target=lambda: auto_add_want_buy_strategy.run(), daemon=True).start()
def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr):
    """