Administrator
2025-06-30 bb2c567db00cc12b16808fc71bbb4c91615d110b
bug修复/策略完善
3个文件已修改
154 ■■■■■ 已修改文件
server/local_data_server.py 71 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_manager.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/time_series_backtest.py 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/local_data_server.py
@@ -2,19 +2,30 @@
本地http服务器
"""
import json
import threading
import uuid
from http.server import SimpleHTTPRequestHandler, HTTPServer
import urllib.parse as urlparse
from urllib.parse import parse_qs
import constant
from code_attribute import gpcode_manager
from db.mysql_data_delegate import Mysqldb
from strategy.data.data_manager import LowSuctionOriginDataExportManager
from strategy.strategy_variable_factory import DataLoader
from strategy.time_series_backtest import BackTest
from third_data import kpl_util
from utils import tool, output_util, huaxin_util
# 正在运行的回测任务字典
running_task_dict = {}
class CORSRequestHandler(SimpleHTTPRequestHandler):
    # 数据加载字典
    __data_loader_dict = {}
    def end_headers(self):
        # 添加 CORS 头
        self.send_header('Access-Control-Allow-Origin', '*')
@@ -146,9 +157,67 @@
            sql = f" select code, jx_blocks  from kpl_code_blocks where  day = '{date}' and " + " and ".join(
                [f"jx_blocks like '%{p}%'" for p in plates])
            datas = Mysqldb().select_all(sql)
            fdatas = [(x[0], "、".join([ f"<red>{dd}</red>" if dd in plates else dd for dd in json.loads(x[1])])) for x in datas if len(set(json.loads(x[1])) & plates) == len(plates)]
            fdatas = [(x[0], gpcode_manager.get_code_name(x[0]),
                       "、".join([f"<red>{dd}</red>" if dd in plates else dd for dd in json.loads(x[1])])) for x in
                      datas if len(set(json.loads(x[1])) & plates) == len(plates)]
            response_data = json.dumps({"code": 0, "data": fdatas})
        elif url.path == "/get_codes_by_limit_up_count":
            # 根据精选板块获取代码
            plates = ps_dict.get("plates")
            date = ps_dict.get("date")
            plates = set(json.loads(plates))
            if date not in self.__data_loader_dict:
                self.__data_loader_dict[date] = DataLoader(date)
            dates = set(self.__data_loader_dict.keys())
            dates.discard(date)
            for d in dates:
                self.__data_loader_dict.pop(d)
            results = self.__data_loader_dict[date].load_target_plate_and_codes()
            codes = set()
            for p in plates:
                codes |= set(results.get(p))
            fdatas = [(code, gpcode_manager.get_code_name(code)) for code in codes]
            response_data = json.dumps({"code": 0, "data": fdatas})
        elif url.path == "/start_run_backtest":
            def run_backtest(task_id):
                __back_test = BackTest(date, r"D:\workspace\trade_low_suction\strategy\strategy_script_v6.py",
                                       target_codes=codes)
                running_task_dict[task_id] = __back_test
                __back_test.run()
            date = ps_dict.get("date")
            codes = ps_dict.get("codes")
            codes = json.loads(codes)
            # 开始运行回撤
            task_id = str(uuid.uuid1())
            threading.Thread(target=lambda: run_backtest(task_id), daemon=True).start()
            response_data = json.dumps({"code": 0, "data": {"task_id": task_id}})
        elif url.path == "/stop_run_backtest":
            task_id = ps_dict.get("task_id")
            if task_id not in running_task_dict:
                response_data = json.dumps({"code": 1, "msg": "任务不存在"})
            else:
                backtest: BackTest = running_task_dict.pop(task_id)
                del backtest
                response_data = json.dumps({"code": 0, "data": {}})
        elif url.path == "/get_backtest_result":
            task_id = ps_dict.get("task_id")
            if task_id not in running_task_dict:
                response_data = json.dumps({"code": 1, "msg": "任务不存在"})
            else:
                backtest: BackTest = running_task_dict.get(task_id)
                progress = tool.trade_time_sub(backtest.current_time, backtest.RANGE_TIMES[0]) / tool.trade_time_sub(
                    backtest.RANGE_TIMES[1], backtest.RANGE_TIMES[0])
                backtest.backtest_results.sort(key=lambda x: (0 if x[3][0] else 1, x[0]))
                response_data = json.dumps(
                    {"code": 0, "data": {"results": backtest.backtest_results, "finish": backtest.finish,
                                         "progress": round(progress * 100, 2)}},
                    cls=tool.SetEncoder)
        print("GET请求")
        self.send_response(200)
        # 发给请求客户端的响应数据
strategy/strategy_manager.py
@@ -294,6 +294,7 @@
                                            max_day=days[0]))
        stock_variables.连续老题材 = KPLLimitUpDataAnalyzer.get_continuous_limit_up_reasons(
            self.limit_up_record_data_list, self.data_loader.trade_days[:2])
        stock_variables.连续老题材.clear()
        # 加载Tick信息
        open_price_info = TickSummaryDataManager().open_price_info_dict.get(code_)
strategy/time_series_backtest.py
@@ -14,7 +14,8 @@
class BackTest:
    def __init__(self, day, script_name="低吸脚本_辨识度_v3.py", settings=StrategyParamsSettingsManager().get_settings()):
    def __init__(self, day, script_name="低吸脚本_辨识度_v3.py", settings=StrategyParamsSettingsManager().get_settings(),
                 target_codes=set()):
        self.day = day
        scripts = ""
        with open(script_name, mode='r', encoding='utf-8') as f:
@@ -27,7 +28,7 @@
        self.settings = settings
        self.scripts = scripts
        self.RANGE_TIMES = ("09:25:00", "11:30:00")
        self.current_time = '09:25:00'
        self.current_time = self.RANGE_TIMES[0]
        self.stock_variables_dict = {}
        self.data_loader: DataLoader = None
@@ -41,6 +42,9 @@
        self.deal_codes = set()
        # 板块已经成交的代码
        self.deal_block_codes = {}
        self.target_codes = target_codes
        self.backtest_results = []
        self.finish = False
    def set_script(self, script):
        self.scripts = script
@@ -250,43 +254,6 @@
        exec(self.scripts, global_dict)
        return global_dict["compute_result"]
    def __filter_codes(self, current_data, timeline_data):
        code_plates = current_data["code_plates"]
        start_time, end_time = self.RANGE_TIMES[0], self.RANGE_TIMES[1]
        fplates = set()
        for i in range(60 * 60 * 5):
            time_str = tool.trade_time_add_second(start_time, i)
            if time_str > end_time:
                break
            self.current_time = time_str
            # 统计当前涨停数据
            current_limit_up_list = current_data["limit_up_list"].get(time_str)
            if current_limit_up_list:
                # 统计板块涨停
                plate_codes_info = {}
                for x in current_limit_up_list:
                    plates = code_plates.get(x[0])
                    if plates:
                        for p in plates:
                            if p not in plate_codes_info:
                                plate_codes_info[p] = []
                            plate_codes_info[p].append((x[0], x[2]))
                # print(time_str, "汽车零部件", plate_codes_info.get("汽车零部件"))
                # 有效的板块
                valid_plates = set([p for p in plate_codes_info if len(plate_codes_info[p]) >= 2])
                fplates |= valid_plates
        codes = set([code for code in code_plates if code_plates[code] & fplates])
        fcodes = set()
        for c in codes:
            if c not in timeline_data["kline_data"]:
                continue
            # 自由流通市值30-300亿
            pre_close = timeline_data["kline_data"].get(c)[0]["close"]
            if 30e8 <= current_data["zylt_volume"].get(c) * pre_close <= 300e8:
                fcodes.add(c)
        return fcodes
    def __get_target_codes_v4(self):
        valid_codes = self.timeline_data["valid_codes"]
        return set(self.current_data["code_plates_for_buy"].keys()) & valid_codes
@@ -321,8 +288,6 @@
        stock_variables.新代码板块 = timeline_data["code_blocks"].get(code_)
        stock_variables.辨识度代码 = self.fcodes
        stock_variables.领涨板块信息 = self.head_rise_code_blocks.get(code_)
        if code_ in DEBUG_CODES:
            print(code_, stock_variables.领涨板块信息)
        for day in [2, 5, 10, 30, 60, 120]:
            days = timeline_data["trade_days"][:day]
@@ -332,8 +297,7 @@
                                            max_day=days[0]))
        stock_variables.连续老题材 = KPLLimitUpDataAnalyzer.get_continuous_limit_up_reasons(
            timeline_data["limit_up_record_data_list"], self.data_loader.trade_days[:2])
        stock_variables.连续老题材.clear()
        self.stock_variables_dict[code_] = stock_variables
    def load_data(self):
@@ -417,6 +381,7 @@
        return ";".join([f"{x[0]}==净额:{x[1]},买单:{x[2]},卖单:{x[3]}" for x in infos])
    def run(self):
        try:
        self.load_data()
        # print(self.fcodes)
        limit_up_record_data_dict = {}
@@ -454,14 +419,18 @@
        all_new_plates = set()
        for i in range(60 * 60 * 5):
                if self.finish:
                    break
            time_str = tool.trade_time_add_second(start_time, i)
            # print(f"[{tool.get_now_time_str()}]", time_str)
            if time_str > end_time:
                break
                self.current_time = time_str
            ticks = self.current_tick_data.get(time_str) if self.current_tick_data else None
            # ===============统计当前涨停数据
            origin_current_limit_up_list = self.current_data["limit_up_list"].get(time_str, [])
            current_limit_up_list = [x for x in origin_current_limit_up_list if kpl_util.get_high_level_count(x[4]) < 3]
                current_limit_up_list = [x for x in origin_current_limit_up_list if
                                         kpl_util.get_high_level_count(x[4]) < 3]
            if current_limit_up_list:
                latest_current_limit_up_list = current_limit_up_list
@@ -609,7 +578,7 @@
                    code = tick["symbol"][-6:]
                    if code not in self.fcodes:
                        continue
                    if DEBUG_CODES and code not in DEBUG_CODES:
                        if self.target_codes and code not in self.target_codes:
                        continue
                    if code not in self.stock_variables_dict:
@@ -646,7 +615,8 @@
                    if tick["created_at"][-8:] < '09:30:00':
                        stock_variables.今日开盘价 = tick["price"]
                        # 今日开盘涨幅
                        stock_variables.今日开盘涨幅 = round((tick["price"] - stock_variables.昨日收盘价) / stock_variables.昨日收盘价,
                            stock_variables.今日开盘涨幅 = round(
                                (tick["price"] - stock_variables.昨日收盘价) / stock_variables.昨日收盘价,
                                                       4)
                    stock_variables.今日成交量 = tick["cum_volume"]
                    stock_variables.今日成交额 = tick["cum_amount"]
@@ -701,7 +671,7 @@
                    code = big_order[0]
                    if code not in self.fcodes:
                        continue
                    if DEBUG_CODES and code not in DEBUG_CODES:
                        if self.target_codes and code not in self.target_codes:
                        continue
                    self.init_stock_variables(code, self.timeline_data, self.current_data)
                    stock_variables: StockVariables = self.stock_variables_dict.get(code)
@@ -731,15 +701,22 @@
                        logging.exception(e)
        print("可买题材:", all_new_plates)
        finally:
            self.finish = True
    def __process_test_result(self, code, stock_variables: StockVariables, next_trade_day, buy_price, time_str,
                              compute_result):
        # if code == '000628':
        #     print(time_str, code, compute_result)
        # (时间,(代码,代码名称), 涨幅 , 买入结果)
        backtest_result = [time_str, (code, gpcode_manager.get_code_name(code)),
                           round((buy_price - stock_variables.昨日收盘价) * 100 / stock_variables.昨日收盘价, 2),
                           compute_result]
        try:
        if not compute_result[0]:
            if code in DEBUG_CODES:
                if code in self.target_codes:
                print(time_str, code, compute_result[1])
            # if compute_result[1].find("大单") >= 0 or compute_result[1].find("价格超过昨日最低价") >= 0:
            pass
@@ -787,6 +764,8 @@
                        c_rate = round((bar[0]["close"] - buy_price) * 100 / bar[0]["pre_close"], 2)
                    else:
                        c_rate = "未知"
                backtest_result.append(c_rate)
                backtest_result.append(t_rate)
            print(f"{len(self.deal_codes)}==回测结果:", code, gpcode_manager.CodesNameManager().get_code_name(code),
                  f"溢价率:{t_rate},当日盈亏:{c_rate},下单时间:{time_str},涨幅:{round((buy_price - stock_variables.昨日收盘价) * 100 / stock_variables.昨日收盘价, 2)}",
                  compute_result[1],
@@ -797,11 +776,12 @@
                self.deal_block_codes[b].add(code)
            stock_variables.板块成交代码 = self.deal_block_codes
        finally:
            self.backtest_results.append(backtest_result)
# 锂电池 ['002882', '002667', '002846', '300530', '002074', '301662', '002580', '300584', '603399', '601515']
# 化工 ['600610', '002427', '002165', '002809', '000565', '002365', '603192', '600370', '600800', '603188']
DEBUG_CODES = ['605005', '002590', '002813', '000826', '002973', '603686', '001230', '600684', '000981', '002232']
# DEBUG_CODES = []
VOLUME_LOG_ENABLE = False
# 备用大单
@@ -818,7 +798,7 @@
            "2025-05-30", "2025-06-03", "2025-06-04", "2025-06-05", "2025-06-06", "2025-06-09", "2025-06-10"]
    days = ["2025-06-03", "2025-06-04", "2025-06-05", "2025-06-06", "2025-06-09", "2025-06-10",
            "2025-06-11", "2025-06-12", "2025-06-13", "2025-06-16", "2025-06-17", "2025-06-18", "2025-06-19",
            "2025-06-20", "2025-06-23", "2025-06-24", "2025-06-25"]
            "2025-06-20", "2025-06-23", "2025-06-24", "2025-06-25", "2025-06-26", "2025-06-27", "2025-06-30"]
    # days = ["2025-05-23"]
    #