Administrator
2025-07-07 d5d2288ba84f2774935b2f866ca6faa8da0aac66
bug修复/策略完善
3个文件已添加
7个文件已修改
781 ■■■■■ 已修改文件
api/outside_api_callback.py 66 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_manager.py 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/place_order_queue_manager.py 77 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_manager.py 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_script_v6.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_script_v7.py 173 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_variable.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_variable_factory.py 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/time_series_backtest.py 335 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_callback.py
@@ -9,16 +9,19 @@
from code_attribute import gpcode_manager
from huaxin_client import l1_subscript_codes_manager
from huaxin_client.client_network import SendResponseSkManager
from log_module import async_log_util
from strategy import strategy_params_settings, env_info, strategy_manager
from strategy.env_info import RealTimeEnvInfo
from strategy.place_order_queue_manager import PlaceOrderRecordManager
from strategy.strategy_manager import PlateWhiteListManager
from strategy.strategy_params_settings import StrategyParamsSettingsManager, StrategyParamsSettings
from strategy.strategy_variable import StockVariables
from strategy.strategy_variable_factory import DataLoader
from third_data.history_k_data_manager import TradeDateManager
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
from trade import trade_record_log_util
from trade.trade_manager import TradeStateManager
from utils import socket_util, middle_api_protocol, tool
from trade.trade_manager import TradeStateManager, DealCodesManager, logger_trade
from utils import socket_util, middle_api_protocol, tool, huaxin_util
OPERRATE_SET = 1  # 设置
OPERRATE_DELETE = 2  # 删除
@@ -220,9 +223,62 @@
            except:
                plates = []
            result_json = {"code": 0, "data": plates}
        elif ctype == 'plate_white_list':
            result_json = self.__on_plate_white_list(data)
        elif ctype == 'get_not_process_place_order_record':
            # 获取未处理的下单记录
            record = PlaceOrderRecordManager(tool.get_now_date_str()).get_not_process_record()
            if not record:
                result_json = {"code": 1, "msg": '暂无记录'}
            else:
                result_json = {"code": 0, "data": json.dumps(record)}
        elif ctype == 'set_place_order_buy':
            id_ = data.get("id")
            place_order_record_manager = PlaceOrderRecordManager(tool.get_now_date_str())
            #  (ID,代码,板块信息, 大单信息, 时间, 价格, 涨幅)
            record = place_order_record_manager.get_not_process_record_by_id(id_)
            if not record:
                result_json = {"code": 1, "msg": '记录不存在'}
            else:
                code = record[1]
                sv: StockVariables = strategy_manager.low_suction_strtegy.stock_variables_dict.get(code)
                if sv is None:
                    result_json = {"code": 1, "msg": '代码变量对象不存在'}
                else:
                    place_order_record_manager.set_buy(record[0])
                    # 可以下单
                    # 判断是否可以买
                    order_ref = huaxin_util.create_order_ref()
                    price = tool.get_buy_max_price(sv.当前价)
                    volume = 100
                    DealCodesManager().place_order(set(record[2].keys()), record[1], order_ref, price, volume)
                    trade_record_log_util.add_place_order_log(code, trade_record_log_util.PlaceOrderInfo(code=code,
                                                                                                         time_str=
                                                                                                         record[4],
                                                                                                         price=sv.当前价,
                                                                                                         rate=round((
                                                                                                                            sv.当前价 - sv.昨日收盘价) * 100 / sv.昨日收盘价,
                                                                                                                    2),
                                                                                                         plates=record[
                                                                                                             2].keys(),
                                                                                                         plates_info=
                                                                                                         record[2],
                                                                                                         info=record[3]
                                                                                                         ))
                    async_log_util.info(logger_trade, f"{code}下单,板块:{record[2].keys()}")
        elif ctype == 'set_place_order_not_buy':
            id_ = data.get("id")
            place_order_record_manager = PlaceOrderRecordManager(tool.get_now_date_str())
            #  (ID,代码,板块信息, 大单信息, 时间, 价格, 涨幅)
            record = place_order_record_manager.get_not_process_record_by_id(id_)
            if not record:
                result_json = {"code": 1, "msg": '记录不存在'}
            else:
                place_order_record_manager.set_not_buy(record[0], "手动驳回")
        elif ctype == 'add_black_list':
            code = data.get("code")
            gpcode_manager.BlackListCodeManager().add_code(code)
            result_json = {"code": 0, "msg": '添加成功'}
        self.send_response(result_json, client_id, request_id)
code_attribute/gpcode_manager.py
@@ -361,8 +361,8 @@
class BlackListCodeManager:
    __instance = None
    __db = 2
    __redis_manager = redis_manager.RedisManager(2)
    __db = 12
    __redis_manager = redis_manager.RedisManager(12)
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
@@ -370,7 +370,7 @@
            # 初始化设置
            # 获取交易窗口的锁
            cls.__instance.__forbidden_trade_codes_cache = RedisUtils.smembers(cls.__get_redis(),
                                                                               "forbidden-trade-codes")
                                                                               "ls-forbidden-trade-codes")
            logger_debug.info(f"加载加黑列表:{cls.__instance.__forbidden_trade_codes_cache}")
        return cls.__instance
@@ -381,28 +381,28 @@
    def add_code(self, code):
        self.__forbidden_trade_codes_cache.add(code)
        RedisUtils.sadd_async(self.__db, "forbidden-trade-codes", code)
        RedisUtils.expire_async(self.__db, "forbidden-trade-codes", tool.get_expire())
        RedisUtils.sadd_async(self.__db, "ls-forbidden-trade-codes", code)
        RedisUtils.expire_async(self.__db, "ls-forbidden-trade-codes", tool.get_expire())
    def sync(self):
        data = RedisUtils.smembers(self.__get_redis(),
                                   "forbidden-trade-codes")
                                   "ls-forbidden-trade-codes")
        self.__forbidden_trade_codes_cache.clear()
        if data:
            self.__forbidden_trade_codes_cache |= data
    def remove_code(self, code):
        self.__forbidden_trade_codes_cache.discard(code)
        RedisUtils.srem(self.__get_redis(), "forbidden-trade-codes", code)
        RedisUtils.srem(self.__get_redis(), "ls-forbidden-trade-codes", code)
    def is_in(self, code):
        return RedisUtils.sismember(self.__get_redis(), "forbidden-trade-codes", code)
        return RedisUtils.sismember(self.__get_redis(), "ls-forbidden-trade-codes", code)
    def is_in_cache(self, code):
        return code in self.__forbidden_trade_codes_cache
    def list_codes(self):
        codes = RedisUtils.smembers(self.__get_redis(), "forbidden-trade-codes")
        codes = RedisUtils.smembers(self.__get_redis(), "ls-forbidden-trade-codes")
        self.__forbidden_trade_codes_cache = codes
        return codes
@@ -411,7 +411,7 @@
    def clear(self):
        self.__forbidden_trade_codes_cache.clear()
        RedisUtils.delete(self.__get_redis(), "forbidden-trade-codes")
        RedisUtils.delete(self.__get_redis(), "ls-forbidden-trade-codes")
class GreenListCodeManager:
strategy/place_order_queue_manager.py
New file
@@ -0,0 +1,77 @@
"""
下单队列管理
"""
# 尚未处理
import json
import random
import time
from utils import tool
from db import mysql_data_delegate as mysql_data
STATE_NOT_PROCESS = 0
STATE_NOT_BUY = 1
STATE_BUY = 2
@tool.singleton
class PlaceOrderRecordManager:
    """
    下单记录管理
    """
    def __init__(self, day):
        self.day = day
        # (ID,代码,板块信息, 大单信息, 时间, 价格, 涨幅)
        self.not_process_records = []
        self.mysql_db = mysql_data.Mysqldb()
        self.__load_data()
    def __load_data(self):
        results = self.mysql_db.select_all(
            f"select * from low_suction_place_order_records where day ='{self.day}' and state = {STATE_NOT_PROCESS} order by time_str")
        self.not_process_records.clear()
        if results:
            self.not_process_records = [(r[0], r[2], json.loads(r[3]), r[6],r[7], float(r[8]), float(r[9])) for r in results]
    def set_buy(self, id):
        for r in self.not_process_records:
            if r[0] == id:
                self.not_process_records.remove(r)
        self.mysql_db.execute(
            f"update low_suction_place_order_records set state={STATE_BUY},update_time=now() where id ='{id}'")
    def set_not_buy(self, id, desc):
        for r in self.not_process_records:
            if r[0] == id:
                self.not_process_records.remove(r)
        self.mysql_db.execute(
            f"update low_suction_place_order_records set state={STATE_NOT_BUY},update_time=now(), state_desc='{desc}' where id ='{id}'")
    def add_record(self, code, time_str, plate_infos, big_order_info_str, price, rate):
        _id = f"{self.day}_{code}_{time_str.replace(':', '')}_{random.randint(0, 10000)}"
        data = (_id, code, plate_infos, big_order_info_str, time_str, price, rate)
        self.mysql_db.execute(
            f"insert into low_suction_place_order_records(id,day,code,plates_info,state,  big_orders_info, time_str,price, rate, create_time) values('{_id}','{self.day}','{code}','{json.dumps(plate_infos, ensure_ascii=False)}',{STATE_NOT_PROCESS},'{big_order_info_str}','{time_str}', {price},{rate},now())")
        self.not_process_records.append(data)
    def get_not_process_record(self):
        if not self.not_process_records:
            return None
        return self.not_process_records[0]
    def get_not_process_record_by_id(self, _id):
        for r in self.not_process_records:
            if r[0] == _id:
                return r
        return None
if __name__ == "__main__":
    manager = PlaceOrderRecordManager("2025-07-07")
    print(manager.not_process_records)
    manager.add_record("000333", '09:30:32', {"家电": ["000111", "000222"]}, "500w/600w", 10.23, 5.2)
    # manager.add_record("000333", '09:30:30', {"家电": ["000111", "000222"]}, "500w/600w")
    # manager.set_not_buy("2025-07-07_000333_1751880205848_3395", "人工驳回")
    print(manager.not_process_records)
strategy/strategy_manager.py
@@ -4,6 +4,7 @@
import json
from code_attribute import gpcode_manager, code_nature_analyse
from code_attribute.gpcode_manager import BlackListCodeManager
from db import redis_manager_delegate as redis_manager
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
@@ -11,6 +12,7 @@
from log_module.log import logger_trade, logger_debug
from strategy.data.data_analyzer import KPLLimitUpDataAnalyzer
from strategy.data.data_manager import LowSuctionOriginDataExportManager
from strategy.place_order_queue_manager import PlaceOrderRecordManager
from strategy.strategy_params_settings import StrategyParamsSettingsManager
from strategy.strategy_variable import StockVariables
from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory
@@ -462,26 +464,12 @@
            if not TradeStateManager().is_can_buy_cache():
                async_log_util.info(logger_trade, f"交易已关闭")
                return
            # 可以下单
            # 判断是否可以买
            order_ref = huaxin_util.create_order_ref()
            price = tool.get_buy_max_price(sv.当前价)
            volume = 100
            DealCodesManager().place_order(set(compute_result[3]), code, order_ref, price, volume)
            trade_record_log_util.add_place_order_log(code, trade_record_log_util.PlaceOrderInfo(code=code,
                                                                                                 time_str=tool.get_now_time_str(),
                                                                                                 price=sv.当前价,
                                                                                                 rate=round((
                                                                                                                    sv.当前价 - sv.昨日收盘价) * 100 / sv.昨日收盘价,
                                                                                                            2),
                                                                                                 plates=compute_result[
                                                                                                     3],
                                                                                                 plates_info=
                                                                                                 compute_result[2],
                                                                                                 info=compute_result[1]
                                                                                                 ))
            async_log_util.info(logger_trade, f"{code}下单,板块:{compute_result[3]}")
            if BlackListCodeManager().is_in_cache(code):
                return
            # 添加记录,等待确认买入
            PlaceOrderRecordManager(self.now_day).add_record(code, tool.get_now_time_str(), compute_result[3],
                                                             compute_result[4], sv.当前价,
                                                             round((sv.当前价 - sv.昨日收盘价) * 100 / sv.昨日收盘价, 2))
# 当前的低吸策略对象
strategy/strategy_script_v6.py
@@ -175,6 +175,8 @@
    if big_order_money < threshold_money:
        return False, f"({round(big_order_money / 1e4, 2)}万/{round(threshold_money / 1e4, 2)}万)大单金额不足"
    return True, f" \n\t大单信息:{round(big_order_money / 1e4, 2)}万(买:{round(big_order_money / 1e4, 2)}万 卖:{round(big_sell_order_money / 1e4, 2)}万)/{round(threshold_money / 1e4, 2)}万  \n\t量够信息:{sv.今日量够信息}\n\t今日最高价:{sv.今日最高价信息} \n\t5日最高价:{sv.日最高价_5}", f"\n\t板块信息:{[(p, sv.开盘啦最正板块涨停.get(p)) for p in can_buy_plates]}", can_buy_plates
    big_order_info = f"{round(big_order_money / 1e4, 2)}w/{round(threshold_money / 1e4, 2)}w"
    return True, f" \n\t大单信息:{round(big_order_money / 1e4, 2)}万(买:{round(big_order_money / 1e4, 2)}万 卖:{round(big_sell_order_money / 1e4, 2)}万)/{round(threshold_money / 1e4, 2)}万  \n\t量够信息:{sv.今日量够信息}\n\t今日最高价:{sv.今日最高价信息} \n\t5日最高价:{sv.日最高价_5}", f"\n\t板块信息:{[(p, sv.开盘啦最正板块涨停.get(p)) for p in can_buy_plates]}", {p: sv.开盘啦最正板块涨停.get(p) for p in can_buy_plates}, big_order_info
compute_result = __can_buy()
strategy/strategy_script_v7.py
New file
@@ -0,0 +1,173 @@
import logging
from strategy.strategy_params_settings import StrategyParamsSettings
from strategy.strategy_variable import StockVariables
sv = StockVariables()
settings = StrategyParamsSettings()
target_code = ''
def format_time(huaxin_timestamp):
    huaxin_timestamp = str(huaxin_timestamp)
    if huaxin_timestamp.find("9") == 0:
        return f"0{huaxin_timestamp[0]}:{huaxin_timestamp[1: 3]}:{huaxin_timestamp[3: 5]}"
    return f"{huaxin_timestamp[0:2]}:{huaxin_timestamp[2: 4]}:{huaxin_timestamp[4: 6]}"
def __can_buy():
    """
    @return: 是否可买, 不能买的原因/可买的板块, 是否量够
    """
    # print(f"{target_code}:执行策略")
    if not settings.can_buy_ge_code:
        if target_code.find("60") != 0 and target_code.find("00") != 0:
            return False, "创业板/科创板的票不买"
    else:
        if target_code.find("60") != 0 and target_code.find("00") != 0 and target_code.find("30") != 0:
            return False, "科创板的票不买"
    if sv.板块成交代码:
        deal_codes = set()
        for p in sv.板块成交代码:
            deal_codes |= set(sv.板块成交代码[p])
        if len(deal_codes) >= settings.max_buy_codes_count:
            return False, f"买入代码数超限({len(deal_codes)}/{settings.max_buy_codes_count})"
    # 目标票板块涨停个数>=2
    can_buy_plates = set()
    for plate in sv.代码板块:
        if plate in sv.板块白名单:
            can_buy_plates.add(plate)
            continue
        # if not sv.资金流入板块 or plate not in sv.资金流入板块:
        #     continue
        if plate in sv.连续老题材:
            continue
        if plate not in sv.可以买的板块:
            continue
        if plate in sv.日出现的板块_2:
            # 老题材
            threshold_count = settings.limit_up_count_of_old_plate
        else:
            # 新题材
            threshold_count = settings.limit_up_count_of_new_plate
        threshold_count = 0
        if len(sv.开盘啦最正板块涨停.get(plate, [])) >= threshold_count:
            can_buy_plates.add(plate)
    if not sv.当前价:
        return False, "无当前价"
    # if getattr(sv, f"涨停数_{settings.has_limit_up_days}") < 1 and getattr(sv, f"炸板数_{settings.has_limit_up_days}") < 1:
    #     return False, f"近{settings.has_limit_up_days}个交易日无涨停/无炸板"
    if settings.cant_yesterday_limit_down and not sv.昨日非跌停:
        return False, "昨日跌停"
    if settings.cant_yesterday_limit_up and not sv.昨日非涨停:
        return False, "昨日涨停"
    if settings.cant_yesterday_open_limit_up and not sv.昨日非炸板:
        return False, "昨日炸板"
    if sv.今日涨停价 > settings.price_range[1] or sv.今日涨停价 < settings.price_range[0]:
        return False, f"今日涨停价高于{settings.price_range[1]}/低于{settings.price_range[0]}"
    if getattr(sv, f"涨停数_{settings.trade_days_count_of_limit_up}") >= settings.count_of_limit_up:
        return False, f"{settings.trade_days_count_of_limit_up}日涨停数>{settings.count_of_limit_up}"
    if getattr(sv, f"跌停数_{settings.trade_days_count_of_limit_down}") >= settings.count_of_limit_down:
        return False, f"{settings.trade_days_count_of_limit_down}日跌停数>{settings.count_of_limit_down}"
    if getattr(sv, f"炸板数_{settings.trade_days_count_of_open_limit_up}") >= settings.count_of_open_limit_up:
        return False, f"{settings.trade_days_count_of_open_limit_up}日炸板数>{settings.count_of_open_limit_up}"
    if sv.涨停数_10 + sv.炸板数_10 >= 4:
        return False, f"10天内>=4个涨停/炸板"
    if sv.今日开盘价 and (sv.今日开盘价 - sv.昨日收盘价) / sv.昨日收盘价 < settings.min_open_rate:
        return False, f"开盘涨幅小于{settings.min_open_rate}"
    rate = (sv.当前价 - sv.昨日收盘价) / sv.昨日收盘价
    if rate >= settings.avaiable_rates[1] or rate < settings.avaiable_rates[0]:
        return False, f"涨幅不在区间内({settings.avaiable_rates[0]}-{settings.avaiable_rates[1]}):{rate}"
    if sv.自由流通市值 > settings.zyltgb_range[1] * 1e8 or sv.自由流通市值 < settings.zyltgb_range[0] * 1e8:
        return False, f"自由市值({sv.自由流通市值})不满足要求"
    if sv.六个交易日涨幅过高:
        return False, f"6个交易日涨幅过高"
    if sv.日三板个数_10 >= 1:
        return False, f"10个交易日有>=3连板"
    if sv.涨得高未放量:
        return False, f"30个交易日涨得高未放量"
    if sv.涨停过未放量:
        return False, f"7个交易日内有涨停/炸板,未出现过高价"
    # if sv.当前价 > sv.昨日最低价 * 1.1:
    #     return False, f"买入时的价格必须≤昨日最低价*110%"
    # if (sv.当前价 - sv.今日最低价) / sv.昨日收盘价 > 0.03:
    #     return False, f"买入时的价格不能高于今日最低价的3%"
    if abs((sv.当前价 - round(sv.今日成交额 / sv.今日成交量, 2)) / sv.昨日收盘价) >= settings.max_rate_than_average_price:
        return False, f"买入价高于均价{settings.max_rate_than_average_price}({abs((sv.当前价 - round(sv.今日成交额 / sv.今日成交量, 2)) / sv.昨日收盘价)})"
    if (sv.今日最高价信息[0] - sv.当前价) / sv.昨日收盘价 > settings.min_rate_of_highest_and_price:
        return False, f"低于分时高价{settings.min_rate_of_highest_and_price}"
    if not settings.can_buy_limited_up and abs(sv.今日最高价信息[0] - sv.今日涨停价) <= 0.001:
        return False, f"今日有涨停"
    # if sv.涨停数_30 <= 0 and not sv.日放倍量日期_15:
    #     return False, "30个交易日无涨停且15个交易日无倍量"
    # 目标票板块涨停个数>=2
    # 板块只能买入2个代码
    if sv.板块成交代码:
        can_buy_plates -= set([p for p in sv.板块成交代码 if len(sv.板块成交代码[p]) >= settings.max_buy_codes_count_per_plate])
        if not can_buy_plates:
            return False, f"板块已有成交"
    if not can_buy_plates:
        return False, f"没有涨停的板块: {[(plate, sv.开盘啦最正板块涨停.get(plate)) for plate in sv.代码板块 if sv.开盘啦最正板块涨停]}  连续老题材:{sv.连续老题材}"
    # new_plates = set(can_buy_plates) - sv.日出现的板块_2
    # if new_plates:
    #     # 有新题材,判断是否过昨日前高
    #     if sv.今日最高价信息[0] - sv.昨日最高价 < 0.02:
    #         return False, "今日最高价需大于昨日最高价"
    # else:
    #     if sv.今日最高价信息[0] - sv.日最高价_5 < 0.02:
    #         return False, "今日最高价需大于5日最高价"
    if settings.trade_days_count_of_limit_up_price_over_high and sv.今日涨停价 <= getattr(sv,
                                                                                     f"日最高价_{settings.trade_days_count_of_limit_up_price_over_high}"):
        return False, f"今日涨停价要突破{settings.trade_days_count_of_limit_up_price_over_high}日最高价"
    # if sv.今日成交量 < sv.昨日成交量 * 0.8:
    #     return False, f"实时成交量必须≥80%昨日总成交量"
    # =======成交大单=====
    big_order_money = 0
    if sv.今日大单数据:
        # print(sv.今日大单数据[-1][3], format_time(sv.今日大单数据[-1][3]))
        # filter_orders = [(o[0], o[2]) for o in sv.今日大单数据 if format_time(o[3]) >= sv.今日量够信息[0]]
        filter_orders = [(o[0], o[2]) for o in sv.今日大单数据]
        filter_orders.reverse()
        orderids = set()
        for o in filter_orders:
            if o[0] in orderids:
                continue
            orderids.add(o[0])
            big_order_money += o[1]
    threshold_money = max(1.5 * sv.自由流通市值 // 1000, 200e4)
    return True, f" \n\t大单信息:{round(big_order_money / 1e4, 2)}万(买:{round(big_order_money / 1e4, 2)}万)/{round(threshold_money / 1e4, 2)}万  \n\t量够信息:{sv.今日量够信息}\n\t今日最高价:{sv.今日最高价信息} \n\t5日最高价:{sv.日最高价_5}", f"\n\t板块信息:{[(p, sv.开盘啦最正板块涨停.get(p)) for p in can_buy_plates]}", can_buy_plates
compute_result = __can_buy()
strategy/strategy_variable.py
@@ -216,6 +216,10 @@
        self.涨得高未放量 = False
        self.涨停过未放量 = False
        self.板块白名单 = set()
        self.涨速 = 0
        # 上一个tick的信息
        self.上个tick = None
        self.可以买的板块 = set()
    def replace_variables(self, expression):
        """
strategy/strategy_variable_factory.py
@@ -243,6 +243,48 @@
                fresults[kpl_util.filter_block(b)] = codes
        return fresults
    def load_target_plate_and_codes_v2(self):
        """
        快读拉升的目标票
        从最近180个交易日的真正涨停数据中
        @return: {"板块":[代码]}
        """
        end_date = self.trade_days[:180][-1]
        start_date = self.trade_days[:180][0]
        mysql = mysql_data_delegate.Mysqldb()
        # 获取上个交易日涨停的票
        results = mysql.select_all(
            f"SELECT r.`_code` FROM `kpl_limit_up_record` r where r._day='{self.trade_days[0]}' and r._open = 0")
        exclude_codes = set([x[0] for x in results])
        results = mysql.select_all(
            f"select r.`_hot_block_name` from `kpl_limit_up_record` r where r.`_open`=0 and r.`_day`>'{end_date}' and r.`_day` <= '{start_date}' group by r.`_hot_block_name`")
        blocks = set([x[0] for x in results])
        fresults = {}
        all_buy_plates_of_codes = self.load_all_buy_plates_of_codes()
        valid_codes = set(all_buy_plates_of_codes.keys())
        for b in blocks:
            sql = f"""
                    SELECT * FROM
                    (
                    SELECT r.`_code`, r.`_code_name`, COUNT(*) AS `count`, MAX(r.`_day`) AS _day FROM `kpl_limit_up_record` r WHERE r.`_open`=0  AND r.`_day`>'{end_date}' AND r.`_day`<='{start_date}' AND r.`_hot_block_name`='{b}' GROUP BY r.`_code`
                    ) a
                    ORDER BY a.count DESC,a._day  DESC
            """
            results = mysql.select_all(sql)
            # 取前1/3
            if results:
                results = [x for x in results if
                           (tool.is_can_buy_code(x[0]) and x[0] in valid_codes and x[0] not in exclude_codes)]
                # 取前1/3且涨停数是前10
                max_count = len(results) // 3 if len(results) % 3 == 0 else len(results) // 3 + 1
                # results = results[:max_count]
                # 取前10
                # results = results[:10]
                codes = [x[0] for x in results]
                fresults[kpl_util.filter_block(b)] = codes
        return fresults
    def load_trade_days(self):
        """
        加载交易日列表,now_day前120个交易日
strategy/time_series_backtest.py
@@ -176,7 +176,8 @@
        code_plates_dict_for_refer = self.data_loader.load_code_plates_for_refer()
        plate_codes = self.data_loader.load_target_plate_and_codes()
        # TODO 快速拉伸驱动
        plate_codes = self.data_loader.load_target_plate_and_codes_v2()
        code_plates_dict_for_buy = {}
        for p in plate_codes:
            for code in plate_codes.get(p):
@@ -704,6 +705,330 @@
        finally:
            self.finish = True
    def run_v2(self):
        def __compute_can_buy_plates(_current_limit_up_list):
            # 统计代码开1数量:(涨停时间,涨停原因, 是否二板, 是否一板)
            code_info_dict = {x[0]: (tool.to_time_str(x[2]), x[5], 1 if kpl_util.get_high_level_count(x[4]) == 2 else 0,
                                     1 if kpl_util.get_high_level_count(x[4]) == 1 else 0) for x in
                              _current_limit_up_list}
            plate_codes_dict = {}
            # 按板块分类
            for code in code_info_dict.keys():
                plates = code_plates.get(code)
                if not plates:
                    plates = {code_info_dict.get(code)[1]}
                plates -= constant.KPL_INVALID_BLOCKS
                if plates:
                    for p in plates:
                        if p not in plate_codes_dict:
                            plate_codes_dict[p] = set()
                        plate_codes_dict[p].add(code)
            valid_plates = set()
            for p in plate_codes_dict:
                codes = list(plate_codes_dict[p])
                codes.sort(key=lambda x: code_info_dict[x][0])
                # 开1 二板数量
                open_continue_2_count = sum(
                    [v[2] for k, v in code_info_dict.items() if k in codes and v[0] < '09:30:00'])
                # 开1 首板数量
                open_continue_1_count = sum(
                    [v[3] for k, v in code_info_dict.items() if k in codes and v[0] < '09:30:00'])
                if open_continue_2_count >= 1 or open_continue_1_count >= 2:
                    valid_plates.add(p)
                    continue
                if len(codes) >= 2 and tool.trade_time_sub(code_info_dict[codes[1]][0],
                                                           code_info_dict[codes[0]][0]) < 10 * 60:
                    # 有≥2个涨停,且两个涨停之间间隔≤10分钟
                    valid_plates.add(p)
                    continue
            return valid_plates
        # 快速拉升驱动
        try:
            self.load_data()
            # print(self.fcodes)
            limit_up_record_data_dict = {}
            for limit_up_item in self.timeline_data["limit_up_record_data"]:
                if limit_up_item[0] not in limit_up_record_data_dict:
                    limit_up_record_data_dict[limit_up_item[0]] = []
                limit_up_record_data_dict[limit_up_item[0]].append(limit_up_item)
            self.timeline_data["limit_up_record_data"] = limit_up_record_data_dict
            next_trade_day = self.timeline_data["next_trade_day"]
            start_time, end_time = "09:25:00", "12:00:00"
            # 分钟K线
            minute_bars_dict = {}
            code_plates = self.current_data["code_plates"]
            code_plates_for_refer = self.current_data["code_plates_for_refer"]
            # 板块涨停代码信息
            kpl_plate_limit_up_codes_info = None
            plate_limit_up_codes_info = None
            kpl_head_plate_limit_up_codes_info = None
            latest_current_limit_up_list = None
            latest_block_in_datas = None
            latest_can_buy_plates = set()
            # 根据板块获取目标票
            target_plate_codes_infos = {}
            for code in self.head_rise_code_blocks:
                for p in self.head_rise_code_blocks[code]:
                    if p not in target_plate_codes_infos:
                        target_plate_codes_infos[p] = []
                    target_plate_codes_infos[p].append(self.head_rise_code_blocks[code][p])
            for p in target_plate_codes_infos:
                target_plate_codes_infos[p].sort(key=lambda x: x[1], reverse=True)
            all_new_plates = set()
            for i in range(60 * 60 * 5):
                if self.finish:
                    break
                time_str = tool.trade_time_add_second(start_time, i)
                # print(f"[{tool.get_now_time_str()}]", time_str)
                if time_str > end_time:
                    break
                self.current_time = time_str
                ticks = self.current_tick_data.get(time_str) if self.current_tick_data else None
                # ===============统计当前涨停数据
                origin_current_limit_up_list = self.current_data["limit_up_list"].get(time_str, [])
                current_limit_up_list = [x for x in origin_current_limit_up_list if
                                         kpl_util.get_high_level_count(x[4]) < 3]
                if current_limit_up_list:
                    latest_current_limit_up_list = current_limit_up_list
                if current_limit_up_list:
                    latest_can_buy_plates = __compute_can_buy_plates(current_limit_up_list)
                    plate_codes_info = {}
                    # 统计板块涨停
                    for x in current_limit_up_list:
                        # 按代码的板块统计涨停板块中的代码数量
                        # 涨停过1分钟才算有效涨停
                        if tool.trade_time_sub(time_str, tool.timestamp_format(x[2], "%H:%M:%S")) < 60:
                            continue
                        plates = code_plates.get(x[0])
                        if plates:
                            for p in plates:
                                if p not in plate_codes_info:
                                    plate_codes_info[p] = []
                                plate_codes_info[p].append((x[0], x[2]))
                    plate_limit_up_codes_info = plate_codes_info
                    plate_codes_info = {}
                    for x in current_limit_up_list:
                        # 按开盘啦涨停原因统计
                        p = x[5]
                        if p in constant.KPL_INVALID_BLOCKS:
                            continue
                        if p not in plate_codes_info:
                            plate_codes_info[p] = []
                        plate_codes_info[p].append((x[0], x[2], x[4]))
                    kpl_plate_limit_up_codes_info = plate_codes_info
                    # {"代码":[(板块代码, 板块名称)]}
                    limit_up_plate_names_of_refer_code = self.current_data["limit_up_plate_names_of_refer_code"]
                    plate_codes_info = {}
                    for x in current_limit_up_list:
                        # 按开盘啦涨停原因统计
                        code = x[0]
                        # if code not in limit_up_plate_names_of_refer_code:
                        #     continue
                        # 如果记录涨停时间过去20分钟就采用涨停队列的涨停原因
                        if tool.trade_time_sub(time_str, tool.timestamp_format(x[2], "%H:%M:%S")) < 60 * 20 or True:
                            plates_infos = limit_up_plate_names_of_refer_code.get(code)
                            plates = set([d[1] for d in plates_infos if d[1] == x[5]]) if plates_infos else set()
                        else:
                            plates = {x[5]}
                        new_plates = set()
                        for p in plates:
                            if p in constant.KPL_INVALID_BLOCKS:
                                continue
                            new_plates.add(p)
                        for p in new_plates:
                            if p not in plate_codes_info:
                                plate_codes_info[p] = []
                            plate_codes_info[p].append((x[0], x[2]))
                    kpl_head_plate_limit_up_codes_info = plate_codes_info
                # ==================注入板块流入
                block_in_datas = self.current_data["block_in"].get(time_str)
                if block_in_datas:
                    blocks = [x[0] for x in block_in_datas if x[1] > 0]
                    block_in_datas = blocks[:20]
                    latest_block_in_datas = block_in_datas
                # ================当前时刻大单
                current_big_orders = self.current_data["big_order"].get(time_str)
                if current_big_orders:
                    for big_order in current_big_orders:
                        # 格式 ("代码", (买单号, 量, 金额, 时间, 最终成交价))
                        self.init_stock_variables(big_order[0], self.timeline_data, self.current_data)
                        stock_variables: StockVariables = self.stock_variables_dict.get(big_order[0])
                        if stock_variables.今日大单数据 is None:
                            stock_variables.今日大单数据 = []
                        stock_variables.今日大单数据.append(big_order[1])
                        # 统计大单均价
                        order_ids = set()
                        total_money = 0
                        total_volume = 0
                        for order in reversed(stock_variables.今日大单数据):
                            if order[0] in order_ids:
                                continue
                            order_ids.add(order[0])
                            total_money += order[2]
                            total_volume += order[1]
                        if total_volume > 0:
                            stock_variables.今日大单均价 = round(total_money / total_volume, 2)
                        else:
                            stock_variables.今日大单均价 = 0
                current_big_sell_orders = self.current_data["big_sell_order"].get(time_str)
                if current_big_sell_orders:
                    for big_order in current_big_sell_orders:
                        # 格式 ("代码", (买单号, 量, 金额, 时间, 最终成交价))
                        self.init_stock_variables(big_order[0], self.timeline_data, self.current_data)
                        stock_variables: StockVariables = self.stock_variables_dict.get(big_order[0])
                        if stock_variables.今日卖大单数据 is None:
                            stock_variables.今日卖大单数据 = []
                        stock_variables.今日卖大单数据.append(big_order[1])
                # 开盘啦最正涨停原因
                most_real_kpl_plate_limit_up_codes_info = {}
                # 获取这个板块的目标票
                if kpl_plate_limit_up_codes_info:
                    current_limit_up_dict = {x[0]: x for x in latest_current_limit_up_list}
                    codes = set()
                    for plate in kpl_plate_limit_up_codes_info:
                        kpl_plate_codes = kpl_plate_limit_up_codes_info.get(plate)
                        codes |= set([x[0] for x in kpl_plate_codes])
                    for code in codes:
                        plates = code_plates.get(code)
                        if not plates:
                            plates = {current_limit_up_dict.get(code)[5]}
                        plates -= constant.KPL_INVALID_BLOCKS
                        if plates:
                            for p in plates:
                                if p not in most_real_kpl_plate_limit_up_codes_info:
                                    most_real_kpl_plate_limit_up_codes_info[p] = []
                                most_real_kpl_plate_limit_up_codes_info[p].append(code)
                if ticks:
                    for tick in ticks:
                        code = tick["symbol"][-6:]
                        if code not in self.fcodes:
                            continue
                        if self.target_codes and code not in self.target_codes:
                            continue
                        if code not in self.stock_variables_dict:
                            # 加载基础数据
                            self.init_stock_variables(code, self.timeline_data, self.current_data)
                        stock_variables: StockVariables = self.stock_variables_dict.get(code)
                        if plate_limit_up_codes_info is not None:
                            stock_variables.板块涨停 = plate_limit_up_codes_info
                        if kpl_plate_limit_up_codes_info is not None:
                            stock_variables.开盘啦板块涨停 = kpl_plate_limit_up_codes_info
                        if kpl_head_plate_limit_up_codes_info is not None:
                            stock_variables.开盘啦领涨板块涨停 = kpl_head_plate_limit_up_codes_info
                        stock_variables.板块成交代码 = self.deal_block_codes
                        # 板块流入数据
                        if latest_block_in_datas:
                            stock_variables.资金流入板块 = latest_block_in_datas
                        # 暂时不用分钟K线
                        # if code not in minute_bars_dict:
                        #     minute_bars_dict[code] = [tick]
                        # if minute_bars_dict[code][-1]["created_at"][:-2] == tick["created_at"][:-2]:
                        #     # 统计分钟K线
                        #     minute_bars_dict[code][-1] = tick
                        # else:
                        #     # 保存分钟K线最高价
                        #     if not stock_variables.今日最高价:
                        #         stock_variables.今日最高价 = minute_bars_dict[code][-1]["price"]
                        #     if minute_bars_dict[code][-1]["price"] > stock_variables.今日最高价:
                        #         stock_variables.今日最高价 = minute_bars_dict[code][-1]["price"]
                        # 保存开盘价
                        if tick["created_at"][-8:] < '09:30:00':
                            stock_variables.今日开盘价 = tick["price"]
                            # 今日开盘涨幅
                            stock_variables.今日开盘涨幅 = round(
                                (tick["price"] - stock_variables.昨日收盘价) / stock_variables.昨日收盘价,
                                4)
                        stock_variables.今日成交量 = tick["cum_volume"]
                        stock_variables.今日成交额 = tick["cum_amount"]
                        stock_variables.当前价 = tick["price"]
                        if not stock_variables.今日量够信息:
                            if stock_variables.今日成交量 > stock_variables.昨日成交量 * 0.8:
                                stock_variables.今日量够信息 = (time_str, stock_variables.当前价, round(
                                    (stock_variables.当前价 - stock_variables.昨日收盘价) * 100 / stock_variables.昨日收盘价, 2),
                                                          self.__statistic_big_order_info(stock_variables))
                                if VOLUME_LOG_ENABLE:
                                    # 统计大单净额,(50w以上,净额,买单个数/买单总金额,卖单个数/卖单总金额)
                                    print("****量够", code, stock_variables.今日量够信息)
                        # 统计今日最高价
                        # if stock_variables.今日最高价 and tick["price"] > stock_variables.今日最高价:
                        #     print(code, "====突破分时最高价:", tick["created_at"], tick["price"])
                        if not stock_variables.今日最高价信息 or tick["price"] > stock_variables.今日最高价信息[0]:
                            stock_variables.今日最高价信息 = (tick["price"], time_str)
                        if not stock_variables.今日最低价 or tick["price"] < stock_variables.今日最低价:
                            stock_variables.今日最低价 = tick["price"]
                        if most_real_kpl_plate_limit_up_codes_info:
                            stock_variables.开盘啦最正板块涨停 = most_real_kpl_plate_limit_up_codes_info
                        # 计算涨速, 上一个tick
                        if stock_variables.上个tick and stock_variables.上个tick['price'] > 0:
                            try:
                                time_space = tool.trade_time_sub(time_str,
                                                                 stock_variables.上个tick["created_at"][-8:]) // 3
                                if time_space > 0:
                                    rate = (tick['price'] - stock_variables.上个tick[
                                        'price']) * 100 / stock_variables.昨日收盘价 / time_space
                                    stock_variables.涨速 = round(rate, 2)
                            except:
                                print("")
                        if stock_variables.涨速 >= 0.8:
                            if code in self.target_codes:
                                print(time_str, code, f"快速拉升:{stock_variables.涨速}",f"代码板块:{stock_variables.代码板块}", f"可买板块:{latest_can_buy_plates}")
                            # 准备数据
                            if plate_limit_up_codes_info is not None:
                                stock_variables.板块涨停 = plate_limit_up_codes_info
                            if kpl_plate_limit_up_codes_info is not None:
                                stock_variables.开盘啦板块涨停 = kpl_plate_limit_up_codes_info
                            if kpl_head_plate_limit_up_codes_info is not None:
                                stock_variables.开盘啦领涨板块涨停 = kpl_head_plate_limit_up_codes_info
                            if most_real_kpl_plate_limit_up_codes_info is not None:
                                stock_variables.开盘啦最正板块涨停 = most_real_kpl_plate_limit_up_codes_info
                            if block_in_datas:
                                stock_variables.资金流入板块 = block_in_datas
                            stock_variables.可以买的板块 = latest_can_buy_plates
                            try:
                                compute_result = self.__run_backtest(code, stock_variables)
                                # print(compute_result)
                                self.__process_test_result(code, stock_variables, next_trade_day, stock_variables.当前价,
                                                           time_str, compute_result)
                            except Exception as e:
                                print(time_str)
                                logging.exception(e)
                        stock_variables.上个tick = tick
            print("可买题材:", all_new_plates)
        finally:
            self.finish = True
    def __process_test_result(self, code, stock_variables: StockVariables, next_trade_day, buy_price, time_str,
                              compute_result):
@@ -799,9 +1124,9 @@
    days = ["2025-06-03", "2025-06-04", "2025-06-05", "2025-06-06", "2025-06-09", "2025-06-10",
            "2025-06-11", "2025-06-12", "2025-06-13", "2025-06-16", "2025-06-17", "2025-06-18", "2025-06-19",
            "2025-06-20", "2025-06-23", "2025-06-24", "2025-06-25", "2025-06-26", "2025-06-27", "2025-06-30",
            "2025-07-01", "2025-07-02"]
            "2025-07-01", "2025-07-02", "2025-07-03", "2025-07-04"]
    # days = ["2025-05-23"]
    days = ["2025-07-04"]
    #
    # days = ["2025-05-12", "2025-05-13", "2025-05-14", "2025-05-15", "2025-05-16", "2025-05-19", "2025-05-20",
    #         "2025-05-21", "2025-05-22", "2025-05-23", "2025-05-26"]
@@ -810,7 +1135,7 @@
    for day in days:
        if day not in back_test_dict:
            # back_test_dict[day] = BackTest(day, "今日量是否足够.py")
            back_test_dict[day] = BackTest(day, "strategy_script_v6.py")
            back_test_dict[day] = BackTest(day, "strategy_script_v7.py", target_codes={})
        print("=========================", day)
        # back_test_dict[day].run_volume()
        back_test_dict[day].run()
        back_test_dict[day].run_v2()
test/test.py
New file
@@ -0,0 +1,32 @@
from log_module import async_log_util
from strategy.place_order_queue_manager import PlaceOrderRecordManager
from trade import trade_record_log_util
from trade.trade_manager import DealCodesManager, logger_trade
from utils import tool, huaxin_util
if __name__=="__main__":
    id_ = '2025-07-07_000333_093032_8162'
    #  (ID,代码,板块信息, 大单信息, 时间, 价格, 涨幅)
    record = PlaceOrderRecordManager(tool.get_now_date_str()).get_not_process_record_by_id(id_)
    if not record:
        result_json = {"code": 1, "msg": '记录不存在'}
    else:
        PlaceOrderRecordManager(tool.get_now_date_str()).set_buy(record[0])
        code = record[1]
        # 可以下单
        # 判断是否可以买
        order_ref = huaxin_util.create_order_ref()
        price = tool.get_buy_max_price(record[5])
        volume = 100
        DealCodesManager().place_order(set(record[2].keys()), record[1], order_ref, price, volume)
        trade_record_log_util.add_place_order_log(code, trade_record_log_util.PlaceOrderInfo(code=code,
                                                                                             time_str=record[4],
                                                                                             price=record[5],
                                                                                             rate=record[6],
                                                                                             plates=record[
                                                                                                 2].keys(),
                                                                                             plates_info=
                                                                                             record[2],
                                                                                             info=record[3]
                                                                                             ))
        async_log_util.info(logger_trade, f"{code}下单,板块:{record[2].keys()}")