Administrator
2024-07-03 59d0c089c7c0749d121b6fef6cba05509cff6a8c
策略修改
1个文件已删除
5个文件已修改
2个文件已添加
314 ■■■■ 已修改文件
huaxin_client/l2_client_for_cb.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_backtest.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_callback.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_l2.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 143 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/backtest_trade.py 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_strategy.py 88 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_for_cb.py
@@ -281,7 +281,8 @@
                        "ExecType": pTransaction['ExecType'].decode()}
                huaxin_l2_log.info(logger_local_huaxin_l2_transaction, f"{item}")
                if self.buyStrategyDataManager.add_transaction_info(item):
                result = self.buyStrategyDataManager.add_transaction_info(item)
                if result[0]:
                    l2_transaction_price_queue.put_nowait(
                        (pTransaction['SecurityID'], pTransaction['TradePrice'], pTransaction['TradeTime']))
        except Exception as e:
@@ -299,7 +300,8 @@
                        "SellNo": pTick['SellNo'],
                        "ExecType": '1'}
                huaxin_l2_log.info(logger_local_huaxin_l2_transaction, f"{item}")
                if self.buyStrategyDataManager.add_transaction_info(item):
                result = self.buyStrategyDataManager.add_transaction_info(item)
                if result[0]:
                    l2_transaction_price_queue.put_nowait(
                        (pTick['SecurityID'], pTick['Price'], pTick['TickTime']))
        except Exception as e:
main.py
@@ -9,7 +9,7 @@
from code_attribute import target_codes_manager, gpcode_manager, code_market_manager, history_k_data_util
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l2_client_for_cb, trade_client_for_cb
from huaxin_client import l2_client_for_cb
from huaxin_client.client_network import SendResponseSkManager
from log_module import async_log_util, log_export
from records import huaxin_trade_record_manager
@@ -323,10 +323,11 @@
                # 正股需要加载板块
                if code.find("11") != 0 and code.find("12") != 0:
                    limit_up_price = tool.get_limit_up_price(code, result[8])
                    KPLCodeJXBlockManager().load_jx_blocks(result[0], result[3],
                                                           float(limit_up_price),
                                                           KPLLimitUpDataRecordManager.get_current_reasons())
                    # 涨幅大于5%才开始获取板块
                    if result[2] > 0.05:
                        KPLCodeJXBlockManager().load_jx_blocks(result[0], result[3],
                                                               float(limit_up_price),
                                                               KPLLimitUpDataRecordManager.get_current_reasons())
        except Exception as e:
            logger_debug.exception(e)
            time.sleep(1)
test/test_backtest.py
New file
@@ -0,0 +1,4 @@
from trade import backtest_trade
if __name__ == "__main__":
    backtest_trade.start_backtest("2024-06-27")
test/test_callback.py
File was deleted
test/test_l2.py
New file
@@ -0,0 +1,22 @@
from code_attribute import target_codes_manager
from log_module import log_export
def get_cb_code(code):
    return target_codes_manager.get_cb_code(code)
def load_market_info():
    markets = log_export.load_market_info()
    codes = set()
    for code in markets:
        for m in markets[code]:
            if m[0].find("30") == 0 and m[2] > 0.09:
                codes.add(m[0])
if __name__ == "__main__":
    codes = {'300732', '300283'}
    for code in codes:
        print(code, get_cb_code(code))
third_data/kpl_data_manager.py
@@ -15,7 +15,7 @@
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from log_module.log import logger_kpl_limit_up_reason_change, logger_debug, \
    logger_kpl_open_limit_up, logger_kpl_block_can_buy
from third_data import kpl_util, kpl_api
from third_data import kpl_util, kpl_api, kpl_block_util
# 代码对应的涨停原因保存
from third_data.kpl_util import KPLPlatManager, KPLDataType
@@ -264,9 +264,9 @@
            if limit_up_price and buy_1_price:
                # 处理买1,卖1信息
                pre_close_price = round(float(limit_up_price) / tool.get_limit_up_rate(code), 2)
                # 如果涨幅大于7%就读取板块
                # 如果涨幅大于5%就读取板块
                price_rate = (buy_1_price - pre_close_price) / pre_close_price
                if price_rate > 0.07 * (tool.get_limit_up_rate(code)-1) * 10:
                if price_rate > 0.05 * (tool.get_limit_up_rate(code) - 1) * 10:
                    jx_blocks_info = self.get_jx_blocks_cache(code)
                    if not jx_blocks_info:
                        start_time = time.time()
@@ -294,7 +294,7 @@
                                async_log_util.info(logger_kpl_block_can_buy,
                                                    f"{code}:获取到精选板块(更新)-{blocks}  耗时:{int(time.time() - start_time)}s")
                                self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                elif price_rate > 0.03 * (tool.get_limit_up_rate(code)-1) * 10:
                elif price_rate > 0.03 * (tool.get_limit_up_rate(code) - 1) * 10:
                    # 添加备用板块
                    if not self.get_jx_blocks_cache(code, by=True):
                        start_time = time.time()
@@ -393,6 +393,141 @@
        return result
class CodePlateKeyBuyManager:
    # 无板块
    BLOCK_TYPE_NONE = -1
    # 一般板块
    BLOCK_TYPE_COMMON = 0
    # 强势板块
    BLOCK_TYPE_STRONG = 1
    # 猛拉板块
    BLOCK_TYPE_SOON_LIMIT_UP = 2
    # 潜伏板块
    BLOCK_TYPE_START_UP = 3
    __can_buy_compute_result_dict = {}
    @classmethod
    def __is_block_can_buy_new(cls, code, block, current_limit_up_datas, code_limit_up_reasons_dict,
                               current_limit_up_block_codes_dict, yesterday_current_limit_up_codes):
        """
        该票的板块是否可以买
        @param code:
        @param block:
        @param current_limit_up_datas: 当前涨停数据
        @param code_limit_up_reasons_dict: 代码的涨停原因
        @param current_limit_up_block_codes_dict: 今日涨停板块对应的代码
        @param yesterday_current_limit_up_codes: 昨日涨停代码
        @return: 返回(前排开1数量,高位板数量,创业板数量,身位)
        """
        block_codes = current_limit_up_block_codes_dict.get(block)
        if block_codes is None:
            block_codes = set()
        # 判断开1数量
        current_open_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes_current(code, block,
                                                                                          current_limit_up_datas)
        # 创业板数量
        gem_codes = set()
        for c in block_codes:
            if c.find("30") == 0:
                gem_codes.add(code)
        # 高位板数量
        high_codes = set()
        for c in block_codes:
            if c in yesterday_current_limit_up_codes:
                high_codes.add(c)
        # 获取主板实时身位,剔除高位板
        exclude_first_codes = set()
        current_shsz_rank, front_current_shsz_rank_codes = kpl_block_util.get_code_current_rank(code, block,
                                                                                                current_limit_up_datas,
                                                                                                code_limit_up_reasons_dict,
                                                                                                yesterday_current_limit_up_codes,
                                                                                                exclude_first_codes,
                                                                                                len(
                                                                                                    current_open_limit_up_codes),
                                                                                                shsz=True)
        return len(current_open_limit_up_codes), len(high_codes), len(gem_codes), current_shsz_rank
    @classmethod
    def get_plate_keys(cls, code):
        """
        获取代码的板块
        :param code:
        :return: 返回代码板块
        """
        blocks_info = KPLCodeJXBlockManager().get_jx_blocks_cache(code)
        if not blocks_info:
            blocks_info = KPLCodeJXBlockManager().get_jx_blocks_cache(code, by=True)
        if blocks_info:
            return blocks_info[0]
        return None
    # 获取可以买的板块
    # current_limit_up_datas: 今日实时涨停
    # latest_2_day_limit_up_datas:最近2天的实时涨停(不含今日)
    # limit_up_record_datas:今日历史涨停
    # yesterday_current_limit_up_codes : 昨日涨停代码
    # before_blocks_dict:历史涨停原因
    # 返回板块的计算结果[(板块名称,是否可买,是否是独苗,信息)]
    @classmethod
    def get_can_buy_block(cls, code, current_limit_up_datas, yesterday_current_limit_up_codes):
        # 加载涨停代码的目标板块
        def load_limit_up_codes_block():
            if current_limit_up_datas:
                for d in current_limit_up_datas:
                    blocks = {d[5]}
                    if d[6]:
                        blocks |= set(d[6].split('、'))
                    blocks = blocks - constant.KPL_INVALID_BLOCKS
                    code_limit_up_reasons_dict[d[3]] = blocks
            return code_limit_up_reasons_dict
        if current_limit_up_datas is None:
            current_limit_up_datas = []
        # 获取目标代码板块
        keys = cls.get_plate_keys(code)
        # log.logger_kpl_debug.info("{}最终关键词:{}", code, keys)
        # 涨停列表中匹配关键词,返回(板块:代码集合),代码集合中已经排除自身
        fresults = []
        if not keys:
            return fresults, set()
        code_limit_up_reasons_dict = {}
        current_limit_up_block_codes_dict = {}
        load_limit_up_codes_block()
        for c in code_limit_up_reasons_dict:
            bs = code_limit_up_reasons_dict[c]
            for b in bs:
                if b not in current_limit_up_block_codes_dict:
                    current_limit_up_block_codes_dict[b] = set()
                current_limit_up_block_codes_dict[b].add(c)
        for block in keys:
            # 前排开1数量, 高位板数量, 创业板数量, 身位
            open_limit_up_count, high_count, gem_count, rank = cls.__is_block_can_buy_new(
                code, block, current_limit_up_datas, code_limit_up_reasons_dict,
                current_limit_up_block_codes_dict, yesterday_current_limit_up_codes)
            fresults.append((block, open_limit_up_count, high_count, gem_count, rank))
        return fresults, keys
    # 返回:(可以买的板块列表, 是否是独苗, 消息简介,可买的强势主线, 激进买入板块列表)
    @classmethod
    def statistic_block_infos(cls, code, current_limit_up_datas):
        """
        统计板块信息
        :param code:
        :param current_limit_up_datas:
        :return: [(板块, 前排开1数量, 高位板数量, 创业板数量, 身位)]
        """
        yesterday_limit_up_codes = get_yesterday_limit_up_codes()
        blocks_compute_results, keys = cls.get_can_buy_block(code, current_limit_up_datas, yesterday_limit_up_codes)
        return blocks_compute_results
def load_history_limit_up():
    for file_name in os.listdir(f"{constant.get_path_prefix()}/kpl/his"):
        if file_name.find("HisDaBanList_1.log") < 0:
trade/backtest_trade.py
@@ -2,40 +2,57 @@
回撤交易
"""
# 持仓字典
import logging
from log_module import log_export
from log_module.log import logger_debug
from trade.buy_strategy import BuyStrategyDataManager
from utils import l2_huaxin_util, tool
position_dict = {}
markets_list_dict = {}
# TODO 待完成
def get_price_by_time(code, time_str):
def get_market_by_time(code, time_str):
    """
    获取某个时间的价格
    :param code:
    :param time_str:
    :return:
    """
    markets = markets_list_dict.get(code)
    if markets:
        for m in markets:
            pass
            if tool.trade_time_sub(l2_huaxin_util.convert_time(m[9]), time_str) >= 0:
                return m
    return None
def start_backtest(date):
    """
    开始回测
    :param date:
    :return:
    """
    try:
        __BuyStrategyDataManager = BuyStrategyDataManager()
        # 回撤
        markets_dict = log_export.load_latest_market_info(date)
        for code in markets_dict:
            __BuyStrategyDataManager.add_market_info(markets_dict[code])
        global markets_list_dict
        markets_list_dict = log_export.load_market_info(date)
        transactions = log_export.load_transactions(date)
        for t in transactions:
            code = t["SecurityID"]
            need_buy = __BuyStrategyDataManager.add_transaction_info(t)
            time_str = l2_huaxin_util.convert_time(t["OrderTime"])
            market_info = get_market_by_time(code, time_str)
            if market_info:
                __BuyStrategyDataManager.add_market_info(market_info)
            need_buy, need_buy_msg = __BuyStrategyDataManager.add_transaction_info(t, True)
            if need_buy and code not in position_dict:
                # 持仓结果保存
                position_dict[code] = t
                logger_debug.info(f"回测下单:{t}")
                logger_debug.info(f"回测下单({need_buy_msg}):{t}")
    except Exception as e:
        logging.exception(e)
        logger_debug.exception(e)
trade/buy_strategy.py
@@ -7,10 +7,11 @@
from log_module.log import logger_trade
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager
from trade.l2_transaction_data_manager import HuaXinBuyOrderManager
from utils import tool
from utils import tool, l2_huaxin_util
class BuyStrategyDataManager:
    __latest_transaction_price_dict = {}
    """
    买入策略管理
    """
@@ -18,10 +19,12 @@
    def __init__(self):
        # 涨停价管理
        self.__limit_up_price_dict = {}
        self.__pre_close_price_dict = {}  # 昨日收盘价
        self.__latest_trade_price_dict = {}
        self.__latest_market_info_dict = {}
    def set_pre_close_price(self, code, price):
        self.__pre_close_price_dict[code] = price
        limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal(f"{tool.get_limit_up_rate(code)}"))
        self.__limit_up_price_dict[code] = round(float(limit_up_price), 2)
@@ -34,7 +37,7 @@
        """
        code = info[0]
        # 设置昨日收盘价格
        if code not in self.__limit_up_price_dict:
        if code not in self.__limit_up_price_dict or code not in self.__pre_close_price_dict:
            self.set_pre_close_price(code, info[8])
        self.__latest_market_info_dict[info[0]] = info
@@ -53,9 +56,49 @@
                                                           info['OrderTime'], info['MainSeq'], info['SubSeq'],
                                                           info['BuyNo'],
                                                           info['SellNo'], info['ExecType'])])
        if int(l2_huaxin_util.convert_time(info['OrderTime']).replace(":", "")) < int("093000"):
            return False, "09:30之前不下单"
        code = info["SecurityID"]
        try:
            if info["TradePrice"] == self.__limit_up_price_dict.get(code):
            can_buy, msg = self.__can_place_order(code, info, backtest)
            return can_buy, msg
        finally:
            self.__latest_trade_price_dict[code] = info["TradePrice"]
    def __can_place_order(self, code, info, backtest):
        """
        是否可以下单
        :param code: 代码
        :param transaction_info: 成交信息
        :param backtest: 是否是回测模式
        :return: 返回是否可以下单
        """
        if backtest:
            # 获取2秒内吃的档数
            rised_price = self.__get_rised_price(code, info)
            if rised_price < 0.1:
                return False, "2S内涨幅小于10档"
            pre_close_price = self.__pre_close_price_dict.get(code)
            if pre_close_price is None:
                return False, "没有获取到收盘价"
            if (info["TradePrice"] - pre_close_price) / pre_close_price < 0.07:
                return False, "涨幅小于7%"
            return True, f"2S内上升{int(rised_price * 100)}档"
        else:
            # 判断是否涨停
            is_limit_up = info["TradePrice"] == self.__limit_up_price_dict.get(code)
            limit_up_price = self.__limit_up_price_dict.get(code)
            if not is_limit_up and code.find("30") == 0:
                # 30开始的9.8以上的算涨停
                markekt = self.__latest_market_info_dict.get(info[0])
                if markekt and markekt[2] >= 0.098:
                    is_limit_up = True
                    limit_up_price = info["TradePrice"]
            if is_limit_up:
                # 当前为涨停价
                if code not in self.__latest_trade_price_dict:
                    # 开1
@@ -63,16 +106,41 @@
                    markekt = self.__latest_market_info_dict.get(info[0])
                    if markekt and markekt[3] * markekt[4] >= 1e8:
                        async_log_util.info(logger_trade, f"{code}:买1({markekt[3] * markekt[4]})超过1亿")
                        return True
                        return True, "买1超过1亿,开1可买"
                    else:
                        return False
                        return False, "开1"
                else:
                    if self.__latest_trade_price_dict.get(code) != self.__limit_up_price_dict.get(code):
                    if self.__latest_trade_price_dict.get(code) != limit_up_price:
                        # 之前那一次不是涨停价
                        return True
        finally:
            self.__latest_trade_price_dict[code] = info["TradePrice"]
        return False
                        return True, "涨停"
        return False, ""
    def __get_rised_price(self, code, transaction_info, time_space=2000):
        """
        获取指定时间内股价上升的价格
        :param code:
        :param transaction_info:
        :param time_space:
        :return:
        """
        if code not in self.__latest_transaction_price_dict:
            self.__latest_transaction_price_dict[code] = []
        if not self.__latest_transaction_price_dict[code] or self.__latest_transaction_price_dict[code][-1][0] != \
                transaction_info["OrderTime"]:
            self.__latest_transaction_price_dict[code].append(
                (transaction_info["TradePrice"], transaction_info["OrderTime"]))
            # 删除1s之前的数据
        while True:
            end_time, start_time = self.__latest_transaction_price_dict[code][-1][1], \
                                   self.__latest_transaction_price_dict[code][0][1]
            if tool.trade_time_sub_with_ms(l2_huaxin_util.convert_time(end_time, with_ms=True),
                                           l2_huaxin_util.convert_time(start_time, with_ms=True)) <= time_space:
                break
            else:
                # 删除第一个元素
                del self.__latest_transaction_price_dict[code][0]
        return self.__latest_transaction_price_dict[code][-1][0] - self.__latest_transaction_price_dict[code][0][0]
    def process(self, underlying_code, underlying_transaction_info, underlying_market_info, cb_code, cb_market_info):
        """