Administrator
2025-07-08 283a7c89f85b1584fde8ff429028506dc00e53d7
strategy/strategy_manager.py
@@ -4,18 +4,25 @@
import json
from code_attribute import gpcode_manager, code_nature_analyse
from code_attribute.gpcode_manager import BlackListCodeManager
from db import redis_manager_delegate as redis_manager
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
from strategy.data_analyzer import KPLLimitUpDataAnalyzer
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
from log_module import async_log_util
from log_module.log import logger_trade, logger_debug
from strategy.data.data_analyzer import KPLLimitUpDataAnalyzer
from strategy.data.data_manager import LowSuctionOriginDataExportManager
from strategy.place_order_queue_manager import PlaceOrderRecordManager
from strategy.strategy_params_settings import StrategyParamsSettingsManager
from strategy.strategy_variable import StockVariables
from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory
import constant
from third_data import kpl_util
from trade.trade_manager import DealCodesManager
from trade import trade_record_log_util
from trade.trade_manager import DealCodesManager, PlatePlaceOrderManager, TradeStateManager
from utils import huaxin_util, tool
BIG_ORDER_MAX_SPACE_TIME = 3
@tool.singleton
@@ -97,11 +104,46 @@
        RedisUtils.setex_async(self.__db, f"tick_low_price-{code}", tool.get_expire(), price)
@tool.singleton
class PlateWhiteListManager:
    """
    板块白名单管理
    """
    def __init__(self):
        self.__plate_white_list = set()
        self.__db = 13
        self.__redis_manager = redis_manager.RedisManager(self.__db)
        self.__load_data()
    def __get_redis(self):
        return self.__redis_manager.getRedis()
    def __load_data(self):
        val = RedisUtils.smembers(self.__get_redis(), "plate_white_list")
        if val:
            self.__plate_white_list = set(val)
    def get_plates(self):
        return self.__plate_white_list
    def add_plate(self, plate):
        self.__plate_white_list.add(plate)
        RedisUtils.sadd_async(self.__db, "plate_white_list", plate)
        RedisUtils.expire_async(self.__db, "plate_white_list", tool.get_expire())
    def remove_plate(self, plate):
        self.__plate_white_list.discard(plate)
        RedisUtils.srem_async(self.__db, "plate_white_list", plate)
class LowSuctionStrategy:
    """
    低吸策略
    """
    def __init__(self, day, script_name="strategy_script_v6.py", settings=StrategyParamsSettingsManager().get_settings()):
    def __init__(self, day, script_name="strategy_script_v6.py",
                 settings=StrategyParamsSettingsManager().get_settings(), need_load_data=False):
        self.now_day = day
        # 买大单:{代码:[大单数据]}
        self.big_order_buy = {}
@@ -112,7 +154,8 @@
        # 历史日K数据
        self.kline_data = {}
        # 历史涨停数据
        self.limit_up_record_data = {}
        self.limit_up_record_data_dict = {}
        self.limit_up_record_data_list = {}
        # 历史数据
        self.timeline_data = {}
        # 今日数据
@@ -135,7 +178,8 @@
        self.current_block_in_datas = []
        # 加载策略脚本文件
        with open(script_name if constant.is_windows() else f'{constant.get_path_prefix()}/{script_name}', mode='r', encoding='utf-8') as f:
        with open(script_name if constant.is_windows() else f'{constant.get_path_prefix()}/{script_name}', mode='r',
                  encoding='utf-8') as f:
            lines = f.readlines()
            scripts = "\n".join(lines)
            # 注释掉里面的import与变量
@@ -146,9 +190,10 @@
            self.scripts = scripts
        self.settings = settings
        self.data_loader = DataLoader(self.now_day, cache_path=f"{constant.get_path_prefix()}/datas")
        self.data_loader = DataLoader(self.now_day)
        self.__LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(self.now_day)
        self.load_data()
        if need_load_data:
            self.load_data()
    def load_data(self):
        # 加载历史数据
@@ -164,13 +209,17 @@
        trade_days = self.data_loader.trade_days
        # 加载历史数据
        self.kline_data = self.data_loader.load_kline_data()
        self.limit_up_record_data = self.data_loader.load_limit_up_data()
        self.limit_up_record_data_list = self.data_loader.load_limit_up_data()
        for d in self.limit_up_record_data_list:
            if d[0] not in self.limit_up_record_data_dict:
                self.limit_up_record_data_dict[d[0]] = []
            self.limit_up_record_data_dict[d[0]].append(d)
        self.next_trade_day = self.data_loader.load_next_trade_day()
        if not trade_days:
            raise Exception("交易日历获取失败")
        if not self.kline_data:
            raise Exception("历史日K获取失败")
        if not self.limit_up_record_data:
        if not self.limit_up_record_data_list:
            raise Exception("历史涨停获取失败")
    def __load_current_date_data_by_timeline(self):
@@ -181,7 +230,8 @@
        """
        IS_BY_BIG_ORDER = False
        BIG_ORDER_MONEY_THRESHOLD = 200e4
        big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal(BIG_ORDER_MONEY_THRESHOLD)
        big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal(BIG_ORDER_MONEY_THRESHOLD,
                                                                                         max_deal_space=BIG_ORDER_MAX_SPACE_TIME)
        if not big_order_deals or IS_BY_BIG_ORDER:
            big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal_by(
                BIG_ORDER_MONEY_THRESHOLD)
@@ -225,10 +275,9 @@
        """
        if code_ in self.stock_variables_dict:
            return
        stock_variables = StrategyVariableFactory.create_from_history_data(
            self.kline_data.get(code_), None,
            self.limit_up_record_data.get(code_), self.data_loader.trade_days)
            self.limit_up_record_data_dict.get(code_), self.data_loader.trade_days)
        # 加载今日涨停价
        pre_close = self.kline_data.get(code_)[0]["close"]
@@ -244,10 +293,11 @@
            days = self.data_loader.trade_days[:day]
            stock_variables.__setattr__(f"日出现的板块_{day}",
                                        KPLLimitUpDataAnalyzer.get_limit_up_reasons(
                                            self.limit_up_record_data, min_day=days[-1],
                                            self.limit_up_record_data_list, min_day=days[-1],
                                            max_day=days[0]))
        stock_variables.连续老题材 = KPLLimitUpDataAnalyzer.get_continuous_limit_up_reasons(
            self.limit_up_record_data, self.data_loader.trade_days[:2])
            self.limit_up_record_data_list, self.data_loader.trade_days[:2])
        stock_variables.连续老题材.clear()
        # 加载Tick信息
        open_price_info = TickSummaryDataManager().open_price_info_dict.get(code_)
@@ -269,19 +319,43 @@
        @param big_orders: [(代码, 买/卖, [订单号,量,金额,最后时间戳,最后价格, 初始时间戳, 初始价格])] 如:[ ('002741', 0, [475820, 91600, 1610328, 92500000, 17.58, 92500000, 17.58])]
        @return:
        """
        big_orders = [x for x in big_orders if x[0] in self.fcodes]
        codes = []
        for d in big_orders:
            code = d[0]
            if d[1] == 0:
                # 买单
                if code not in self.big_order_buy:
                    self.big_order_buy[code] = []
                self.big_order_buy[code].append(d[2])
            else:
                # 卖单
                if code not in self.big_order_sell:
                    self.big_order_sell[code] = []
                self.big_order_sell[code].append(d[2])
        # 驱动下单
            try:
                code = d[0]
                try:
                    # 只计算200w以上的买单
                    if d[2][2] < 200e4:
                        continue
                    if d[1] == 0:
                        if tool.trade_time_sub(huaxin_util.convert_time(d[2][3]),
                                               huaxin_util.convert_time(d[2][5])) > BIG_ORDER_MAX_SPACE_TIME:
                            # 成交超过指定时间
                            continue
                        # 买单
                        if code not in self.big_order_buy:
                            self.big_order_buy[code] = []
                        self.big_order_buy[code].append(d[2])
                        if code not in codes:
                            codes.append(code)
                    else:
                        # 卖单
                        if code not in self.big_order_sell:
                            self.big_order_sell[code] = []
                        self.big_order_sell[code].append(d[2])
                finally:
                    # 设置现价
                    if code in self.stock_variables_dict:
                        self.stock_variables_dict[code].当前价 = d[2][4]
            except Exception as e:
                logger_debug.error(f"{d}")
            # 驱动下单
        for code in codes:
            try:
                self.__run(code, self.stock_variables_dict.get(code))
            except Exception as e:
                logger_debug.exception(e)
    def add_ticks(self, ticks):
        """
@@ -355,6 +429,8 @@
        self.current_block_in_datas = _block_in_datas
    def __run(self, code, sv: StockVariables):
        if not sv:
            return
        # 运行代码
        # 注入大单
        sv.今日大单数据 = self.big_order_buy.get(code)
@@ -364,10 +440,15 @@
        # 注入板块流入信息
        if self.current_block_in_datas:
            sv.资金流入板块 = self.current_block_in_datas
        # 注入已成交代码
        place_order_plate_codes = DealCodesManager().get_place_order_plate_codes()
        # 注入已成交代码,成交代码以委托数据来计算
        place_order_plate_codes = PlatePlaceOrderManager().get_plate_codes()
        sv.板块成交代码 = place_order_plate_codes
        sv.成交代码 = DealCodesManager().get_deal_codes()
        code_sets = [set(lst) for lst in place_order_plate_codes.values()]
        # 2. 使用 set.union() 求并集
        union_code_sets = set().union(*code_sets)
        sv.成交代码 = union_code_sets
        sv.板块白名单 = PlateWhiteListManager().get_plates()
        global_dict = {
            "sv": sv,
            "target_code": code,
@@ -375,13 +456,24 @@
        }
        exec(self.scripts, global_dict)
        compute_result = global_dict["compute_result"]
        async_log_util.info(logger_trade, f"{code}:{compute_result}")
        if compute_result[0]:
            if code in sv.成交代码:
                return
            # 可以下单
            # 判断是否可以买
            for b in compute_result[3]:
                DealCodesManager().place_order(b, code)
            if not TradeStateManager().is_can_buy_cache():
                async_log_util.info(logger_trade, f"交易已关闭")
                return
            if BlackListCodeManager().is_in_cache(code):
                return
            try:
                # 添加记录,等待确认买入
                PlaceOrderRecordManager(self.now_day).add_record(code, tool.get_now_time_str(), compute_result[3],
                                                                 compute_result[4], sv.当前价,
                                                                 round((sv.当前价 - sv.昨日收盘价) * 100 / sv.昨日收盘价, 2))
            except Exception as e:
                logger_debug.exception(e)
# 当前的低吸策略对象
low_suction_strtegy = None
low_suction_strtegy: LowSuctionStrategy = None