Administrator
2025-07-08 283a7c89f85b1584fde8ff429028506dc00e53d7
strategy/strategy_manager.py
@@ -4,20 +4,25 @@
import json
from code_attribute import gpcode_manager, code_nature_analyse
from code_attribute.gpcode_manager import BlackListCodeManager
from db import redis_manager_delegate as redis_manager
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util
from log_module.log import logger_trade, logger_debug
from strategy.data_analyzer import KPLLimitUpDataAnalyzer
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
from strategy.data.data_analyzer import KPLLimitUpDataAnalyzer
from strategy.data.data_manager import LowSuctionOriginDataExportManager
from strategy.place_order_queue_manager import PlaceOrderRecordManager
from strategy.strategy_params_settings import StrategyParamsSettingsManager
from strategy.strategy_variable import StockVariables
from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory
import constant
from third_data import kpl_util
from trade.trade_manager import DealCodesManager
from trade import trade_record_log_util
from trade.trade_manager import DealCodesManager, PlatePlaceOrderManager, TradeStateManager
from utils import huaxin_util, tool
BIG_ORDER_MAX_SPACE_TIME = 3
@tool.singleton
@@ -97,6 +102,39 @@
        """
        self.low_price_dict[code] = price
        RedisUtils.setex_async(self.__db, f"tick_low_price-{code}", tool.get_expire(), price)
@tool.singleton
class PlateWhiteListManager:
    """
    板块白名单管理
    """
    def __init__(self):
        self.__plate_white_list = set()
        self.__db = 13
        self.__redis_manager = redis_manager.RedisManager(self.__db)
        self.__load_data()
    def __get_redis(self):
        return self.__redis_manager.getRedis()
    def __load_data(self):
        val = RedisUtils.smembers(self.__get_redis(), "plate_white_list")
        if val:
            self.__plate_white_list = set(val)
    def get_plates(self):
        return self.__plate_white_list
    def add_plate(self, plate):
        self.__plate_white_list.add(plate)
        RedisUtils.sadd_async(self.__db, "plate_white_list", plate)
        RedisUtils.expire_async(self.__db, "plate_white_list", tool.get_expire())
    def remove_plate(self, plate):
        self.__plate_white_list.discard(plate)
        RedisUtils.srem_async(self.__db, "plate_white_list", plate)
class LowSuctionStrategy:
@@ -192,7 +230,8 @@
        """
        IS_BY_BIG_ORDER = False
        BIG_ORDER_MONEY_THRESHOLD = 200e4
        big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal(BIG_ORDER_MONEY_THRESHOLD)
        big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal(BIG_ORDER_MONEY_THRESHOLD,
                                                                                         max_deal_space=BIG_ORDER_MAX_SPACE_TIME)
        if not big_order_deals or IS_BY_BIG_ORDER:
            big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal_by(
                BIG_ORDER_MONEY_THRESHOLD)
@@ -258,6 +297,7 @@
                                            max_day=days[0]))
        stock_variables.连续老题材 = KPLLimitUpDataAnalyzer.get_continuous_limit_up_reasons(
            self.limit_up_record_data_list, self.data_loader.trade_days[:2])
        stock_variables.连续老题材.clear()
        # 加载Tick信息
        open_price_info = TickSummaryDataManager().open_price_info_dict.get(code_)
@@ -279,30 +319,43 @@
        @param big_orders: [(代码, 买/卖, [订单号,量,金额,最后时间戳,最后价格, 初始时间戳, 初始价格])] 如:[ ('002741', 0, [475820, 91600, 1610328, 92500000, 17.58, 92500000, 17.58])]
        @return:
        """
        big_orders = [x for x in big_orders if x[0] in self.fcodes]
        codes = []
        for d in big_orders:
            try:
                code = d[0]
                # 只计算200w以上的买单
                if d[2][2] < 200e4:
                    continue
                if d[1] == 0:
                    # 买单
                    if code not in self.big_order_buy:
                        self.big_order_buy[code] = []
                    self.big_order_buy[code].append(d[2])
                else:
                    # 卖单
                    if code not in self.big_order_sell:
                        self.big_order_sell[code] = []
                    self.big_order_sell[code].append(d[2])
                    if code not in codes:
                        codes.append(code)
                try:
                    # 只计算200w以上的买单
                    if d[2][2] < 200e4:
                        continue
                    if d[1] == 0:
                        if tool.trade_time_sub(huaxin_util.convert_time(d[2][3]),
                                               huaxin_util.convert_time(d[2][5])) > BIG_ORDER_MAX_SPACE_TIME:
                            # 成交超过指定时间
                            continue
                        # 买单
                        if code not in self.big_order_buy:
                            self.big_order_buy[code] = []
                        self.big_order_buy[code].append(d[2])
                        if code not in codes:
                            codes.append(code)
                    else:
                        # 卖单
                        if code not in self.big_order_sell:
                            self.big_order_sell[code] = []
                        self.big_order_sell[code].append(d[2])
                finally:
                    # 设置现价
                    if code in self.stock_variables_dict:
                        self.stock_variables_dict[code].当前价 = d[2][4]
            except Exception as e:
                logger_debug.error(f"{d}")
            # 驱动下单
        for code in codes:
            self.__run(code, self.stock_variables_dict.get(code))
            try:
                self.__run(code, self.stock_variables_dict.get(code))
            except Exception as e:
                logger_debug.exception(e)
    def add_ticks(self, ticks):
        """
@@ -387,10 +440,15 @@
        # 注入板块流入信息
        if self.current_block_in_datas:
            sv.资金流入板块 = self.current_block_in_datas
        # 注入已成交代码
        place_order_plate_codes = DealCodesManager().get_place_order_plate_codes()
        # 注入已成交代码,成交代码以委托数据来计算
        place_order_plate_codes = PlatePlaceOrderManager().get_plate_codes()
        sv.板块成交代码 = place_order_plate_codes
        sv.成交代码 = DealCodesManager().get_deal_codes()
        code_sets = [set(lst) for lst in place_order_plate_codes.values()]
        # 2. 使用 set.union() 求并集
        union_code_sets = set().union(*code_sets)
        sv.成交代码 = union_code_sets
        sv.板块白名单 = PlateWhiteListManager().get_plates()
        global_dict = {
            "sv": sv,
            "target_code": code,
@@ -398,15 +456,24 @@
        }
        exec(self.scripts, global_dict)
        compute_result = global_dict["compute_result"]
        async_log_util.info(logger_trade, f"{code}:{compute_result}")
        if compute_result[0]:
            if code in sv.成交代码:
                return
            # 可以下单
            # 判断是否可以买
            for b in compute_result[3]:
                DealCodesManager().place_order(b, code)
                async_log_util.info(logger_trade, f"{code}下单,板块:{compute_result[3]}")
            if not TradeStateManager().is_can_buy_cache():
                async_log_util.info(logger_trade, f"交易已关闭")
                return
            if BlackListCodeManager().is_in_cache(code):
                return
            try:
                # 添加记录,等待确认买入
                PlaceOrderRecordManager(self.now_day).add_record(code, tool.get_now_time_str(), compute_result[3],
                                                                 compute_result[4], sv.当前价,
                                                                 round((sv.当前价 - sv.昨日收盘价) * 100 / sv.昨日收盘价, 2))
            except Exception as e:
                logger_debug.exception(e)
# 当前的低吸策略对象
low_suction_strtegy = None
low_suction_strtegy: LowSuctionStrategy = None