Administrator
4 天以前 245979e3907d34bcd88ac0c4547f399bf33a44de
bug修复/策略完善
9个文件已修改
268 ■■■■ 已修改文件
strategy/data_analyzer.py 44 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_manager.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_script_v6.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_variable.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_variable_factory.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/test.py 39 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/time_series_backtest.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 142 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/data_analyzer.py
@@ -422,25 +422,57 @@
        return count
    @classmethod
    def is_too_high_and_not_relase_volume(cls, code, k_data):
    def is_too_high_and_not_relase_volume(cls, k_data):
        """
        长得太高且没放量:30个交易日内,出现过最低价(最高价之前的交易日)到最高价之间的涨幅≥35%的票,且今日距离最高价那日无涨停/无炸板
        长得太高且没放量:30个交易日内,出现过最低价(最高价之前的交易日)到最高价之间的涨幅≥35%的票,且今日距离最高价那日无涨停/无炸板且>=3板且必须有2连板
        @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
        @return: 四跌停及以上天数
        """
        k_data = k_data[:30]
        code = k_data[0]["sec_id"]
        # 获取最高价信息
        max_high_price_data = max(k_data, key=lambda x: x["high"])
        min_close_price_data = min([d for d in k_data if d['bob'] < max_high_price_data['bob']], key=lambda x: x["close"])
        before_datas = [d for d in k_data if d['bob'] < max_high_price_data['bob']]
        after_datas = [d for d in k_data if d['bob'] >= max_high_price_data['bob']]
        if not before_datas:
            return False
        if len(before_datas) > 15:
            # 从最高价日期向前最多看15个交易日
            before_datas = before_datas[:15]
        min_close_price_data = min(before_datas, key=lambda x: x["close"])
        if (max_high_price_data['high'] - min_close_price_data['close'])/min_close_price_data['close'] < 0.35:
            # 涨幅小于35%
            return False
        before_k_datas = [d for d in k_data if min_close_price_data['bob'] <= d['bob'] <= max_high_price_data['bob']]
        before_k_datas.sort(key=lambda x: x['bob'])
        # [最低价-最高价]日期内有3个板且有两连扳
        continue_2_limit_up_date = None
        for i in range(len(before_k_datas) - 1):
            if cls.__is_limit_up(code, before_k_datas[i]["close"],
                                 before_k_datas[i]["pre_close"]) and cls.__is_limit_up(code,
                                                                                       before_k_datas[i + 1]["close"],
                                                                                       before_k_datas[i + 1][
                                                                                           "pre_close"]):
                continue_2_limit_up_date = before_k_datas[i + 1]['bob'][:10]
                break
        if not continue_2_limit_up_date:
            # 无两连板
            return False
        # 两连板之后是否有炸板/涨停
        # 取2连板之后的3个交易日
        temp_k_datas = [d for d in before_k_datas if d['bob'][:10] > continue_2_limit_up_date][:3]
        if len([d for d in temp_k_datas if cls.__is_limit_up(code, d["high"], d["pre_close"])]) < 1:
            # 两连板之后有个涨停/炸板且时间在2连板之后的3个交易日内
            return False
        k_data = [d for d in k_data if d['bob'] > max_high_price_data['bob']]
        # 判断是否涨停过
        if len([d for d in k_data if cls.__is_limit_up(code, d["high"], d["pre_close"])]) >0:
            # 最高价之后有过涨停
        if len([d for d in k_data if cls.__is_limit_up(code, d["high"], d["pre_close"])]) > 0 or len(after_datas) >= 10:
            # 最高价之后有过涨停或者是最高价后10个交易日
            return False
        return True
        return True, f"高价日期:{max_high_price_data['bob'][:10]},低价日期:{min_close_price_data['bob'][:10]},两连扳日期:{continue_2_limit_up_date}"
class K60SLineAnalyzer:
strategy/strategy_manager.py
@@ -16,7 +16,7 @@
from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory
import constant
from third_data import kpl_util
from trade.trade_manager import DealCodesManager
from trade.trade_manager import DealCodesManager, PlatePlaceOrderManager
from utils import huaxin_util, tool
@@ -391,10 +391,14 @@
        # 注入板块流入信息
        if self.current_block_in_datas:
            sv.资金流入板块 = self.current_block_in_datas
        # 注入已成交代码
        place_order_plate_codes = DealCodesManager().get_place_order_plate_codes()
        # 注入已成交代码,成交代码以委托数据来计算
        place_order_plate_codes = PlatePlaceOrderManager().get_plate_codes()
        sv.板块成交代码 = place_order_plate_codes
        sv.成交代码 = DealCodesManager().get_deal_codes()
        code_sets = [set(lst) for lst in place_order_plate_codes.values()]
        # 2. 使用 set.union() 求并集
        union_code_sets = set().union(*code_sets)
        sv.成交代码 = union_code_sets
        global_dict = {
            "sv": sv,
            "target_code": code,
@@ -407,8 +411,11 @@
                return
            # 可以下单
            # 判断是否可以买
            order_ref = huaxin_util.create_order_ref()
            price = tool.get_buy_max_price(sv.当前价)
            volume = 100
            DealCodesManager().place_order(set(compute_result[3]), code, order_ref, price, volume)
            for b in compute_result[3]:
                DealCodesManager().place_order(b, code)
                async_log_util.info(logger_trade, f"{code}下单,板块:{compute_result[3]}")
strategy/strategy_script_v6.py
@@ -94,6 +94,9 @@
    if sv.日三板个数_10 >= 1:
        return False, f"10个交易日有>=3连板"
    if sv.涨得高未放量:
        return False, f"涨得高未放量"
    # if sv.当前价 > sv.昨日最低价 * 1.1:
    #     return False, f"买入时的价格必须≤昨日最低价*110%"
strategy/strategy_variable.py
@@ -213,6 +213,7 @@
        self.辨识度代码 = set()
        self.领涨板块信息 = set()
        self.连续老题材 = set()
        self.涨得高未放量 = False
    def replace_variables(self, expression):
        """
strategy/strategy_variable_factory.py
@@ -602,6 +602,12 @@
            kline_data_60s = kline_data_60s_dict.get(trade_days[0])
            fdata = K60SLineAnalyzer.get_close_price_of_max_volume(kline_data_60s)
            instance.__setattr__(f"昨日分时最高量价", fdata)
        if KTickLineAnalyzer.is_too_high_and_not_relase_volume(kline_data_1d):
            instance.涨得高未放量 = True
        else:
            instance.涨得高未放量 = False
        return instance
strategy/test.py
@@ -1,13 +1,15 @@
from huaxin_client import l1_subscript_codes_manager
from strategy import strategy_manager
from strategy import strategy_manager, data_analyzer
from strategy.strategy_variable import StockVariables
# 统计当日的平均溢价率
from strategy.strategy_variable_factory import DataLoader
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
def statistic_average(path):
    rate_list = []
    yjl_list = []
    with open(path, mode='r', encoding='utf-8') as f:
        lines = f.readlines()
        for line in lines:
@@ -17,22 +19,39 @@
                continue
            r = round(float(line.split("当日盈亏:")[1].split(",")[0].replace("%", "")), 2)
            rate_list.append(r)
    print("平均利润率:", round(sum(rate_list) / len(rate_list), 2))
    print("总利润率:", round(sum(rate_list), 2), "总买票数量:", len(rate_list))
            r = line.split("溢价率:")[1].split(",")[0].replace("%", "")
            if r.find("未知") < 0:
                yjl_list.append(round(float(r), 2))
    print("当日平均利润率:", round(sum(rate_list) / len(rate_list), 2))
    print("当日总利润率:", round(sum(rate_list), 2), "总买票数量:", len(rate_list))
    print("次日开盘平均利润率:", round(sum(yjl_list) / len(yjl_list), 2))
    print("次日开盘总利润率:", round(sum(yjl_list), 2), "总买票数量:", len(yjl_list))
if __name__ == "__main__":
    print("======3个票涨停之后买_不买长得太高未放量")
    statistic_average(r"C:\Users\Administrator\Desktop\3个票涨停之后买_不买长得太高未放量.txt")
    print("======3个票涨停之后买")
    statistic_average(r"C:\Users\Administrator\Desktop\3个票涨停之后买.txt")
    # print("======3个票涨停之后买+不限开盘涨幅+3个涨停之后大单打折")
    # statistic_average(r"C:\Users\Administrator\Desktop\3个票涨停之后买_不限开盘涨幅.txt")
    codes = set()
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    codes |= set([x.decode() for x in codes_sh])
    codes |= set([x.decode() for x in codes_sz])
    KPLCodeJXBlocksManager('2025-06-17', codes).start_download_blocks()
    # codes = set()
    # codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    # codes |= set([x.decode() for x in codes_sh])
    # codes |= set([x.decode() for x in codes_sz])
    # KPLCodeJXBlocksManager('2025-06-17', codes).start_download_blocks()
    # target_block = {"石油石化", "天然气", "化工"}
    # for code in code_blocks:
    #     blocks = code_blocks.get(code)
    #     if len(blocks & target_block) == len(target_block):
    #         print(code, blocks)
    __DataLoader = DataLoader("2025-06-18")
    kline_datas = __DataLoader.load_kline_data()
    codes = []
    for code in kline_datas:
        # if code !='003010':
        #     continue
        result = data_analyzer.KTickLineAnalyzer.is_too_high_and_not_relase_volume(kline_datas[code])
        if result:
            print("未放量", code, result[1])
            codes.append(code)
    print(len(codes))
strategy/time_series_backtest.py
@@ -807,13 +807,11 @@
if __name__ == "__main__":
    back_test_dict = {}
    days = ["2025-05-12", "2025-05-13", "2025-05-14", "2025-05-15", "2025-05-16", "2025-05-19", "2025-05-20",
            "2025-05-21", "2025-05-22", "2025-05-23", "2025-05-26", "2025-05-27", "2025-05-28", "2025-05-29",
            "2025-05-30", "2025-06-03"]
    # days = ["2025-05-12", "2025-05-13", "2025-05-14", "2025-05-15", "2025-05-16", "2025-05-19", "2025-05-20",
    #         "2025-05-21", "2025-05-22", "2025-05-23", "2025-05-26", "2025-05-27", "2025-05-28", "2025-05-29",
    #         "2025-05-30", "2025-06-03", "2025-06-04", "2025-06-05", "2025-06-06", "2025-06-09", "2025-06-10",
    #         "2025-06-11", "2025-06-12", "2025-06-13", "2025-06-16", "2025-06-17"]
    #         "2025-05-30", "2025-06-03"]
    days = ["2025-06-03", "2025-06-04", "2025-06-05", "2025-06-06", "2025-06-09", "2025-06-10",
            "2025-06-11", "2025-06-12", "2025-06-13", "2025-06-16", "2025-06-17", "2025-06-18"]
    # days = ["2025-05-23"]
trade/trade_manager.py
@@ -176,14 +176,11 @@
    def __init__(self):
        self.musql = Mysqldb()
        # 成交得订单信息
        self.__deal_code_orders_info = {}
        self.redis_manager = redis_manager.RedisManager(12)
        # 下过单的板块代码
        self.__place_order_plate_codes_info = {}
        # 委托得订单信息:{code#order_ref:}
        self.__delegate_code_orders = {}
        self.__load_data()
    def __get_redis(self):
        return self.redis_manager.getRedis()
    def __load_data(self):
        # 不算打板的数据
@@ -193,13 +190,9 @@
            for r in results:
                self.add_deal_order(r[1], r[4], round(float(r[3]), 2), r[0], r[2])
        val = RedisUtils.get(self.__get_redis(), "place_order_plate_codes_info")
        if val:
            self.__place_order_plate_codes_info = json.loads(val)
    def add_deal_order(self, code, volume, price, trade_id, order_sys_id):
        """
        添加成交大单
        添加成交订单
        @param code:
        @param volume:
        @param price:
@@ -217,31 +210,148 @@
            return
        self.__deal_code_orders_info[code][trade_id] = (volume, price, order_sys_id)
    def set_order_status(self, code, order_ref, order_sys_id, price, volume, status):
        """
        设置订单状态
        @param code:
        @param order_ref:
        @param order_sys_id:
        @param price:
        @param volume:
        @param status:
        @return:
        """
        k = f"{code}#{order_ref}"
        if k not in self.__delegate_code_orders:
            return
        # [代码,订单索引,订单号,价格,量,状态,板块集合]
        data = self.__delegate_code_orders[k]
        data[2] = order_sys_id
        data[5] = status
        data[3] = price
        data[4] = volume
        # 如果订单已经取消就需要删除
        if status == huaxin_util.TORA_TSTP_OST_AllCanceled or status == huaxin_util.TORA_TSTP_OST_Rejected:
            data = self.__delegate_code_orders.pop(k)
            if data:
                PlatePlaceOrderManager().remove_plates_code(data[6], code)
    def get_deal_codes(self):
        if not self.__deal_code_orders_info:
            return set()
        return set(self.__deal_code_orders_info.keys())
    def place_order(self, plate, code):
    def place_order(self, plates, code, order_ref, price, volume):
        """
        下单
        @param plate:
        @param plates:
        @param code:
        @param order_ref:
        @param price:
        @param volume:
        @return:
        """
        # 初始化委托数据 [代码,订单索引,订单号,价格,量,状态,板块集合]
        data = [code, order_ref, '', price, volume, huaxin_util.TORA_TSTP_OST_Unknown, plates]
        k = f"{code}#{order_ref}"
        if k not in self.__delegate_code_orders:
            self.__delegate_code_orders[k] = data
        PlatePlaceOrderManager().add_plates_code(plates, code)
    def place_order_fail(self, code, order_ref):
        """
        下单失败了
        @param code:
        @param order_ref:
        @return:
        """
        k = f"{code}#{order_ref}"
        if k in self.__delegate_code_orders:
            data = self.__delegate_code_orders.pop(k)
            if data:
                PlatePlaceOrderManager().remove_plates_code(data[6], code)
    def get_deal_or_delegated_codes(self):
        """
        获取已经成交或者委托的代码
        @return:
        """
        codes = set()
        if self.__delegate_code_orders:
            for k in self.__delegate_code_orders:
                codes.add(self.__delegate_code_orders[k][0])
        if self.__deal_code_orders_info:
            codes |= set(self.__deal_code_orders_info.keys())
        return codes
@tool.singleton
class PlatePlaceOrderManager:
    """
    板块下单管理
    """
    def __init__(self):
        self.__db = 12
        self.redis_manager = redis_manager.RedisManager(self.__db)
        # 下过单的板块代码
        self.__place_order_plate_codes_info = {}
        self.__load_data()
    def __get_redis(self):
        return self.redis_manager.getRedis()
    def __load_data(self):
        val = RedisUtils.get(self.__get_redis(), "place_order_plate_codes_info")
        if val:
            self.__place_order_plate_codes_info = json.loads(val)
    def add_plates_code(self, plates, code):
        """
        添加板块下单
        @param plates:
        @param code:
        @return:
        """
        for plate in plates:
        if plate not in self.__place_order_plate_codes_info:
            self.__place_order_plate_codes_info[plate] = []
        if code not in self.__place_order_plate_codes_info[plate]:
            self.__place_order_plate_codes_info[plate].append(code)
        self.__sync_plate_place_order_info()
    def __sync_plate_place_order_info(self):
        """
        同步板块下单信息
        @return:
        """
        RedisUtils.setex_async(self.__db, "place_order_plate_codes_info", tool.get_expire(),
                               json.dumps(self.__place_order_plate_codes_info))
    def get_place_order_plate_codes(self):
    def remove_plates_code(self, plates, code):
        """
        移除板块下单
        @param plates:
        @param code:
        @return:
        """
        for plate in plates:
            if plate in self.__place_order_plate_codes_info:
                if code in self.__place_order_plate_codes_info[plate]:
                    self.__place_order_plate_codes_info[plate].remove(code)
        self.__sync_plate_place_order_info()
    def get_plate_codes(self):
        return self.__place_order_plate_codes_info
__CodesTradeStateManager = CodesTradeStateManager()
if __name__ == "__main__":
    codes = DealCodesManager().get_codes()
    print(codes)
    PlatePlaceOrderManager().add_plates_code({"通信","计算机"}, "000333")
    place_order_plate_codes = PlatePlaceOrderManager().get_plate_codes()
    code_sets = [set(lst) for lst in place_order_plate_codes.values()]
    # 2. 使用 set.union() 求并集
    union_code_sets = set().union(*code_sets)
    print(union_code_sets)
utils/tool.py
@@ -318,6 +318,14 @@
    return max(price1, price2)
# 获取买入价格笼子的最高价
def get_buy_max_price(price):
    price1 = price * (1 + 0.02)
    price1 = math.ceil(price1 * 100) / 100
    price2 = price + 0.1
    return max(price1, price2)
# 获取买入价格笼子的最低价
def get_shadow_price(price):
    # fprice = round((100 - random.randint(2, 10)) * price / 100, 2)