Administrator
2024-07-03 59d0c089c7c0749d121b6fef6cba05509cff6a8c
trade/buy_strategy.py
@@ -7,10 +7,11 @@
from log_module.log import logger_trade
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager
from trade.l2_transaction_data_manager import HuaXinBuyOrderManager
from utils import tool
from utils import tool, l2_huaxin_util
class BuyStrategyDataManager:
    __latest_transaction_price_dict = {}
    """
    买入策略管理
    """
@@ -18,10 +19,12 @@
    def __init__(self):
        # 涨停价管理
        self.__limit_up_price_dict = {}
        self.__pre_close_price_dict = {}  # 昨日收盘价
        self.__latest_trade_price_dict = {}
        self.__latest_market_info_dict = {}
    def set_pre_close_price(self, code, price):
        self.__pre_close_price_dict[code] = price
        limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal(f"{tool.get_limit_up_rate(code)}"))
        self.__limit_up_price_dict[code] = round(float(limit_up_price), 2)
@@ -34,7 +37,7 @@
        """
        code = info[0]
        # 设置昨日收盘价格
        if code not in self.__limit_up_price_dict:
        if code not in self.__limit_up_price_dict or code not in self.__pre_close_price_dict:
            self.set_pre_close_price(code, info[8])
        self.__latest_market_info_dict[info[0]] = info
@@ -53,9 +56,49 @@
                                                           info['OrderTime'], info['MainSeq'], info['SubSeq'],
                                                           info['BuyNo'],
                                                           info['SellNo'], info['ExecType'])])
        if int(l2_huaxin_util.convert_time(info['OrderTime']).replace(":", "")) < int("093000"):
            return False, "09:30之前不下单"
        code = info["SecurityID"]
        try:
            if info["TradePrice"] == self.__limit_up_price_dict.get(code):
            can_buy, msg = self.__can_place_order(code, info, backtest)
            return can_buy, msg
        finally:
            self.__latest_trade_price_dict[code] = info["TradePrice"]
    def __can_place_order(self, code, info, backtest):
        """
        是否可以下单
        :param code: 代码
        :param transaction_info: 成交信息
        :param backtest: 是否是回测模式
        :return: 返回是否可以下单
        """
        if backtest:
            # 获取2秒内吃的档数
            rised_price = self.__get_rised_price(code, info)
            if rised_price < 0.1:
                return False, "2S内涨幅小于10档"
            pre_close_price = self.__pre_close_price_dict.get(code)
            if pre_close_price is None:
                return False, "没有获取到收盘价"
            if (info["TradePrice"] - pre_close_price) / pre_close_price < 0.07:
                return False, "涨幅小于7%"
            return True, f"2S内上升{int(rised_price * 100)}档"
        else:
            # 判断是否涨停
            is_limit_up = info["TradePrice"] == self.__limit_up_price_dict.get(code)
            limit_up_price = self.__limit_up_price_dict.get(code)
            if not is_limit_up and code.find("30") == 0:
                # 30开始的9.8以上的算涨停
                markekt = self.__latest_market_info_dict.get(info[0])
                if markekt and markekt[2] >= 0.098:
                    is_limit_up = True
                    limit_up_price = info["TradePrice"]
            if is_limit_up:
                # 当前为涨停价
                if code not in self.__latest_trade_price_dict:
                    # 开1
@@ -63,16 +106,41 @@
                    markekt = self.__latest_market_info_dict.get(info[0])
                    if markekt and markekt[3] * markekt[4] >= 1e8:
                        async_log_util.info(logger_trade, f"{code}:买1({markekt[3] * markekt[4]})超过1亿")
                        return True
                        return True, "买1超过1亿,开1可买"
                    else:
                        return False
                        return False, "开1"
                else:
                    if self.__latest_trade_price_dict.get(code) != self.__limit_up_price_dict.get(code):
                    if self.__latest_trade_price_dict.get(code) != limit_up_price:
                        # 之前那一次不是涨停价
                        return True
        finally:
            self.__latest_trade_price_dict[code] = info["TradePrice"]
        return False
                        return True, "涨停"
        return False, ""
    def __get_rised_price(self, code, transaction_info, time_space=2000):
        """
        获取指定时间内股价上升的价格
        :param code:
        :param transaction_info:
        :param time_space:
        :return:
        """
        if code not in self.__latest_transaction_price_dict:
            self.__latest_transaction_price_dict[code] = []
        if not self.__latest_transaction_price_dict[code] or self.__latest_transaction_price_dict[code][-1][0] != \
                transaction_info["OrderTime"]:
            self.__latest_transaction_price_dict[code].append(
                (transaction_info["TradePrice"], transaction_info["OrderTime"]))
            # 删除1s之前的数据
        while True:
            end_time, start_time = self.__latest_transaction_price_dict[code][-1][1], \
                                   self.__latest_transaction_price_dict[code][0][1]
            if tool.trade_time_sub_with_ms(l2_huaxin_util.convert_time(end_time, with_ms=True),
                                           l2_huaxin_util.convert_time(start_time, with_ms=True)) <= time_space:
                break
            else:
                # 删除第一个元素
                del self.__latest_transaction_price_dict[code][0]
        return self.__latest_transaction_price_dict[code][-1][0] - self.__latest_transaction_price_dict[code][0][0]
    def process(self, underlying_code, underlying_transaction_info, underlying_market_info, cb_code, cb_market_info):
        """