Administrator
3 天以前 5f034f7a6733b03e0d08d7920ec6de1b1517c421
strategy/strategy_variable_factory.py
@@ -4,11 +4,14 @@
import datetime
import json
import os
import re
from code_attribute import global_data_loader
import constant
from code_attribute import global_data_loader, gpcode_manager
from db import mysql_data_delegate
from strategy.data_analyzer import KTickLineAnalyzer, KPLLimitUpDataAnalyzer, K60SLineAnalyzer
from strategy.strategy_variable import StockVariables
from third_data import kpl_api, kpl_util
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import JueJinLocalApi, HistoryKDatasUtils
from utils import global_util, tool
@@ -19,7 +22,7 @@
    数据加载器类,用于集中管理策略变量所需的各类数据加载逻辑
    """
    def __init__(self, now_day, cache_path="D:/datas"):
    def __init__(self, now_day, cache_path=f"{constant.get_path_prefix()}/datas"):
        """
        初始化数据加载器
        :param now_day: 当前日期,格式为"2025-01-01"
@@ -29,11 +32,14 @@
        self.jueJinLocalApi = JueJinLocalApi("41c4f5da-2591-11f0-a9c9-f4b5203f67bf",
                                             "018db265fa34e241dd6198b7ca507ee0a82ad029")
        self.trade_days = self.load_trade_days()
        self.plate_codes = {}
        # 代码的精选板块 : {"代码":{板块}}
        self.jx_blocks = {}
    def load_kline_data(self):
        """
        加载日K线数据
        :return: 日K线数据
        :return: {"代码": 日K线数据}
        """
        dir_path = os.path.join(self.cache_path, "k_bars")
        day = self.trade_days[0]
@@ -86,7 +92,7 @@
            k_bar_code_data_dict[code] = date_datas
        return k_bar_code_data_dict
    def load_tick_data(self):
    def load_tick_data(self, target_codes=None):
        """
        加载当日的tick数据
        :return:tick数据字典
@@ -99,6 +105,8 @@
                if f.find(self.now_day) < 0:
                    continue
                code = f.split("_")[1][:6]
                if target_codes and code not in target_codes:
                    continue
                tick_path = os.path.join(tick_dir_path, f)
                with open(tick_path, mode='r') as f:
                    lines = f.readlines()
@@ -124,12 +132,116 @@
    def load_limit_up_data(self):
        """
        加载涨停数据
        :return: 涨停数据记录
        :return: 涨停数据记录[(代码, 日期, 板块, 是否炸板)]
        """
        mysql = mysql_data_delegate.Mysqldb()
        results = mysql.select_all(
            f"select _code, _day, _hot_block_name from kpl_limit_up_record where _day>='{self.trade_days[-1]}' and _day<='{self.trade_days[0]}' and _open=0")
            f"select _code, _day, _hot_block_name, _open, _blocks  from kpl_limit_up_record where _day>='{self.trade_days[-1]}' and _day <='{self.trade_days[0]}'")
        for r in results:
            r[2] = kpl_util.filter_block(r[2])
        return results
    def __compute_limit_up_reasons_for_refer(self, block_infos):
        """
        计算参考票的涨停原因
        @param block_infos: [(板块, 日期)]
        @return:
        """
        # [(板块, 日期)]
        block_infos.sort(key=lambda x: x[1], reverse=True)
        # {"板块":[(出现次数, 最近出现日期)]}
        temp_dict = {}
        for b in block_infos:
            if b[0] in constant.KPL_INVALID_BLOCKS:
                continue
            if b[0] not in temp_dict:
                temp_dict[b[0]] = [0, b[1]]
            temp_dict[b[0]][0] += 1
        if not temp_dict:
            return set()
        temp_list = [(k, temp_dict[k][0], temp_dict[k][1]) for k in temp_dict]
        # 按照涨停次数与最近涨停时间排序
        temp_list.sort(key=lambda x: (x[1], x[2]), reverse=True)
        # 取涨停次数最多的和最近涨停的
        # 取相同次数的原因
        if temp_list:
            _list = [t for t in temp_list if t[1] == temp_list[0][1]]
            if _list[0][1] == 1:
                _list = _list[:1]
            blocks = set([x[0] for x in _list])
        else:
            blocks = set()
        blocks -= constant.KPL_INVALID_BLOCKS
        # 去除例如概念这些泛指词
        return set([kpl_util.filter_block(x) for x in blocks])
    def load_code_plates_for_refer(self):
        """
        获取参考票的涨停原因
        @return:
        """
        sql = f"SELECT r.`_code`, r.`_day`, r.`_hot_block_name`, r.`_blocks`,  r.`_open` FROM `kpl_limit_up_record` r WHERE r.`_day`>='{tool.date_sub(self.now_day, 365)}' and r.`_day` <'{self.now_day}'"
        mysql = mysql_data_delegate.Mysqldb()
        kpl_results = mysql.select_all(sql)
        # {"代码":[(板块, 日期), (板块, 日期)]}
        kpl_block_dict = {}
        for r in kpl_results:
            # 当日炸板的不计算原因
            if r[4] == 1:
                continue
            code = r[0]
            if code not in kpl_block_dict:
                kpl_block_dict[code] = []
            kpl_block_dict[code].append((r[2], r[1]))  # (板块, 日期)
        reasons_dict = {}
        for code in kpl_block_dict:
            block_infos = kpl_block_dict.get(code)
            reasons_dict[code] = self.__compute_limit_up_reasons_for_refer(block_infos)
        return reasons_dict
    def load_target_plate_and_codes(self):
        """
        加载目标板块与对应的代码:
        从最近60个交易日的真正涨停数据中
        @return: {"板块":[代码]}
        """
        end_date = self.trade_days[:60][-1]
        start_date = self.trade_days[:60][0]
        mysql = mysql_data_delegate.Mysqldb()
        # 获取上个交易日涨停的票
        results = mysql.select_all(
            f"SELECT r.`_code` FROM `kpl_limit_up_record` r where r._day='{self.trade_days[0]}' and r._open = 0")
        exclude_codes = set([x[0] for x in results])
        results = mysql.select_all(
            f"select r.`_hot_block_name` from `kpl_limit_up_record` r where r.`_open`=0 and r.`_day`>'{end_date}' and r.`_day` <= '{start_date}' group by r.`_hot_block_name`")
        blocks = set([x[0] for x in results])
        fresults = {}
        all_buy_plates_of_codes = self.load_all_buy_plates_of_codes()
        valid_codes = set(all_buy_plates_of_codes.keys())
        for b in blocks:
            sql = f"""
                    SELECT * FROM
                    (
                    SELECT r.`_code`, r.`_code_name`, COUNT(*) AS `count`, MAX(r.`_day`) AS _day FROM `kpl_limit_up_record` r WHERE r.`_open`=0  AND r.`_day`>'{end_date}' AND r.`_day`<='{start_date}' AND r.`_hot_block_name`='{b}' GROUP BY r.`_code`
                    ) a
                    ORDER BY a.count DESC,a._day  DESC
            """
            results = mysql.select_all(sql)
            # 取前1/3
            if results:
                results = [x for x in results if
                           (tool.is_can_buy_code(x[0]) and x[0] in valid_codes and x[0] not in exclude_codes)]
                # 取前1/3且涨停数是前10
                max_count = len(results) // 3 if len(results) % 3 == 0 else len(results) // 3 + 1
                # results = results[:max_count]
                # 取前10
                results = results[:10]
                codes = [x[0] for x in results]
                fresults[kpl_util.filter_block(b)] = codes
        return fresults
    def load_trade_days(self):
        """
@@ -142,7 +254,10 @@
        one_year_ago = (pre_date - datetime.timedelta(days=365)).strftime('%Y-%m-%d')
        pre_date = pre_date.strftime('%Y-%m-%d')
        trade_days = self.jueJinLocalApi.get_trading_dates(one_year_ago, pre_date)
        if constant.is_windows():
            trade_days = self.jueJinLocalApi.get_trading_dates(one_year_ago, pre_date)
        else:
            trade_days = HistoryKDatasUtils.get_trading_dates(one_year_ago, pre_date)
        trade_days.sort(reverse=True)
        trade_days = trade_days[:120]
        return trade_days
@@ -152,7 +267,10 @@
        加载交易日列表,now_day前120个交易日
        :return: 交易日列表
        """
        next_trade_day = self.jueJinLocalApi.get_next_trading_date(self.now_day)
        if constant.is_windows():
            next_trade_day = self.jueJinLocalApi.get_next_trading_date(self.now_day)
        else:
            next_trade_day = HistoryKDatasUtils.get_next_trading_date(self.now_day)
        return next_trade_day
    def load_target_codes(self):
@@ -182,6 +300,228 @@
        except Exception as e:
            return set(), None
    def load_plate_codes(self, plate_code, plate_name):
        """
        获取板块有可能买的目标代码
        @param plate_code:
        @return:[(代码, 领涨次数, 最大领涨次数)]
        """
        if not plate_code or plate_name == '无' or plate_name in constant.KPL_INVALID_BLOCKS:
            return set()
        if plate_code in self.plate_codes:
            return self.plate_codes.get(plate_code)
        dir_path = os.path.join(self.cache_path, "plate_codes_info", self.now_day)
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        path_ = os.path.join(dir_path, plate_code + ".text")
        datas = []
        if os.path.exists(path_):
            with open(path_, mode='r', encoding='utf-8') as f:
                codes_info = []
                lines = f.readlines()
                for i in range(len(lines)):
                    if i == 0:
                        continue
                    line = lines[i]
                    if line:
                        r = eval(line)
                        datas.append(r)
        else:
            codes_info = []
            results = self.request_plate_codes(plate_code)
            with open(path_, mode='w', encoding='utf-8') as f:
                f.write(plate_name)
                f.write("\n")
                f.writelines([f"{x}\n" for x in results])
            datas = results
        # 保存到内存中
        if datas:
            max_data = max(datas, key=lambda x: x[3])
            for r in datas:
                if r[3] < 1:
                    continue
                if re.match(r"风险|立案", r[2]):
                    continue
                if r[1].find("ST") >= 0:
                    continue
                if r[1].find("S") >= 0:
                    continue
                if not tool.is_can_buy_code(r[0]):
                    continue
                codes_info.append((r[0], r[3], max_data[3]))
                # if len(codes_info) >= 10:
                #     break
        f_codes_info = codes_info  # [d for d in codes_info if d[1] >= d[2] // 2]
        self.plate_codes[plate_code] = f_codes_info
        return self.plate_codes.get(plate_code)
    def load_all_codes_of_plates(self, is_for_buy=False):
        """
        加载所有板块的领涨票
        @return:{"板块代码":(板块名称, [(代码,代码名称,标签,领涨次数)])}
        """
        dir_path = os.path.join(self.cache_path, "plate_codes_info", self.now_day)
        if not os.path.exists(dir_path):
            return None
        fdata = {}
        plate_files = os.listdir(dir_path)
        for plate_file in plate_files:
            plate_code = plate_file.split(".")[0]
            path_ = os.path.join(dir_path, plate_file)
            with open(path_, mode='r', encoding='utf-8') as f:
                datas = []
                lines = f.readlines()
                for i in range(len(lines)):
                    if i == 0:
                        continue
                    line = lines[i]
                    if line:
                        r = eval(line)
                        if not is_for_buy:
                            if r[3] < 1:
                                continue
                            if r[1].find("ST") >= 0:
                                continue
                            if r[1].find("S") >= 0:
                                continue
                        else:
                            if re.match(r"风险|立案", r[2]):
                                continue
                            if r[1].find("ST") >= 0:
                                continue
                            if r[1].find("S") >= 0:
                                continue
                            if not tool.is_can_buy_code(r[0]):
                                continue
                        datas.append(r)
                        # if len(datas) >= 10:
                        #     break
                fdata[plate_code] = (kpl_util.filter_block(lines[0].strip()), datas)
        return fdata
    def load_all_refer_plates_of_codes(self):
        """
        加载所有有领涨代码的领涨板块
        @return:
        """
        datas = self.load_all_codes_of_plates()
        fdata = {}
        for plate_code in datas:
            plate_name = datas[plate_code][0]
            codes_info = datas[plate_code][1]
            for item in codes_info:
                code, limit_up_count = item[0], item[3]
                if code not in fdata:
                    fdata[code] = []
                fdata[code].append((plate_code, plate_name, limit_up_count))
        for code in fdata:
            fdata[code].sort(key=lambda x: x[2], reverse=True)
            fdata[code] = fdata[code][:3]
        return fdata
    def load_all_buy_plates_of_codes(self):
        """
        加载所有代码的领涨板块
        @return: {"代码":{"板块名称":(代码, 领涨次数, 最大领涨次数)}}
        """
        datas = self.load_all_codes_of_plates(is_for_buy=True)
        fdata = {}
        if not datas:
            return fdata
        for plate_code in datas:
            plate_name = datas[plate_code][0]
            codes_info = datas[plate_code][1]
            if not codes_info:
                continue
            max_count = max(codes_info, key=lambda x: x[3])[3]
            for item in codes_info:
                code, limit_up_count = item[0], item[3]
                if code not in fdata:
                    fdata[code] = {}
                fdata[code][plate_name] = (code, limit_up_count, max_count)
        fdata_dict = {c: [(p, fdata[c][p]) for p in fdata[c]] for c in fdata}
        for c in fdata_dict:
            fdata_dict[c].sort(key=lambda x: x[1], reverse=True)
            fdata_dict[c] = fdata_dict[c][:3]
        fdata = {code: {x[0]: x[1] for x in fdata_dict[code]} for code in fdata_dict}
        return fdata
    def request_plate_codes(self, plate_code):
        """
        获取板块的代码
        @param plate_code:
        @return:[代码, 名称, 风险项, 领涨次数]
        """
        fresults = []
        for i in range(1, 10):
            results = kpl_api.getHistoryCodesByPlateOrderByLZCS(plate_code, self.now_day, "0930", i)
            results = json.loads(results)["list"]
            fresults.extend(results)
            if len(results) < 30:
                break
        fdatas = []
        for result in fresults:
            d = result[0], result[1], result[2], result[40]
            fdatas.append(d)
        return fdatas
    def get_limit_up_reasons_with_plate_code(self):
        """
        获取涨停原因
        :return: 涨停数据记录[(代码, 日期, 板块, 是否炸板)]
        """
        mysql = mysql_data_delegate.Mysqldb()
        sql = """
                SELECT _hot_block_code,_hot_block_name  FROM
                (
                SELECT r.`_hot_block_code`, r.`_hot_block_name`, r.`_create_time` FROM
                  (SELECT  DISTINCT(c.`_hot_block_code`) FROM `kpl_limit_up_record` c WHERE c.`_day`>'最小日期' and c.`_day`<'今日日期') a
                  LEFT JOIN kpl_limit_up_record r ON r.`_hot_block_code` = a._hot_block_code ORDER BY r.`_create_time` DESC
                 ) b GROUP BY b._hot_block_code HAVING b._hot_block_code IS NOT NULL
        """
        sql = sql.replace("最小日期", self.trade_days[-1]).replace("今日日期", self.now_day)
        results = mysql.select_all(sql)
        return [x for x in results if kpl_util.filter_block(x[1]) not in constant.KPL_INVALID_BLOCKS]
    def load_jx_blocks(self, code):
        if code in self.jx_blocks:
            self.jx_blocks.get(code)
        # 从文件中读取
        dir_path = os.path.join(self.cache_path, "jx_blocks", self.now_day)
        path_str = os.path.join(dir_path, f"{code}.txt")
        if os.path.exists(path_str):
            with open(path_str, mode='r', encoding='utf-8') as f:
                lines = f.readlines()
                blocks = eval(lines[0])
                self.jx_blocks[code] = blocks
        if code in self.jx_blocks:
            return self.jx_blocks.get(code)
        blocks = kpl_api.getCodeJingXuanBlocks(code)
        blocks = set([kpl_util.filter_block(x[1]) for x in blocks])
        blocks -= constant.KPL_INVALID_BLOCKS
        self.jx_blocks[code] = blocks
        # 保存到文件
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        with open(os.path.join(dir_path, f"{code}.txt"), mode='w', encoding='utf-8') as f:
            f.write(f"{blocks}")
        return blocks
    def load_all_jx_blocks(self):
        code_blocks = {}
        # 从文件中读取
        dir_path = os.path.join(self.cache_path, "jx_blocks", self.now_day)
        files = os.listdir(dir_path)
        for file in files:
            code = file[:6]
            with open(os.path.join(dir_path, file), mode='r', encoding='utf-8') as f:
                code_blocks[code] = eval(f.readlines()[0])
        return code_blocks
class StrategyVariableFactory:
    @staticmethod
@@ -204,8 +544,11 @@
        instance.昨日非涨停 = not KTickLineAnalyzer.is_yesterday_limit_up(kline_data_1d)
        instance.昨日非炸板 = not KTickLineAnalyzer.is_yesterday_exploded(kline_data_1d)
        instance.昨日非跌停 = not KTickLineAnalyzer.is_yesterday_limit_down(kline_data_1d)
        instance.昨日最低价 = KTickLineAnalyzer.get_yesterday_low_price(kline_data_1d)
        instance.昨日开盘价 = KTickLineAnalyzer.get_yesterday_open_price(kline_data_1d)
        day_counts = [5, 10, 30, 60, 120]
        for day in day_counts:
        for day in [3, 5, 10, 30, 60, 120]:
            instance.__setattr__(f"日最高价_{day}", KTickLineAnalyzer.get_recent_days_high(kline_data_1d, day))
        for day in day_counts:
            instance.__setattr__(f"日最高量_{day}", KTickLineAnalyzer.get_recent_days_max_volume(kline_data_1d, day))
@@ -238,12 +581,16 @@
        for day in day_counts:
            instance.__setattr__(f"日大等于4次跌停个数_{day}",
                                 KTickLineAnalyzer.get_fourth_or_more_limit_down_days(kline_data_1d, day))
        for day in [5, 10, 15, 30, 60, 120]:
            instance.__setattr__(f"日放倍量日期_{day}",
                                 KTickLineAnalyzer.get_recent_days_double_volume_date(kline_data_1d, day))
        for day in day_counts:
            days = trade_days[:day]
            instance.__setattr__(f"日个股最正的原因_{day}",
                                 KPLLimitUpDataAnalyzer.get_most_common_reasons(limit_up_data_records, min_day=days[-1],
                                                                                max_day=days[0]))
        if kline_data_60s_dict:
            for day in day_counts:
                # 获取日K最高量的信息
@@ -255,12 +602,113 @@
            kline_data_60s = kline_data_60s_dict.get(trade_days[0])
            fdata = K60SLineAnalyzer.get_close_price_of_max_volume(kline_data_60s)
            instance.__setattr__(f"昨日分时最高量价", fdata)
        if KTickLineAnalyzer.is_too_high_and_not_relase_volume(kline_data_1d):
            instance.涨得高未放量 = True
        else:
            instance.涨得高未放量 = False
        return instance
def __test_jx_blocks(__DataLoader):
    def load_all_codes():
        codes = set()
        with open("D:/codes/codes_sh.text") as f:
            lines = f.readlines()
            codes |= set([x.strip() for x in lines if x.find("30") != 0])
        with open("D:/codes/codes_sz.text") as f:
            lines = f.readlines()
            codes |= set([x.strip() for x in lines if x.find("30") != 0])
        return codes
    code_blocks = __DataLoader.load_all_jx_blocks()
    # codes = load_all_codes()
    # for code in codes:
    #     print(code)
    #     __DataLoader.load_jx_blocks(code)
    codes = ["002639", "002366"]
    same_blocks = set()
    for c in codes:
        blocks = __DataLoader.load_jx_blocks(c)
        if not same_blocks:
            same_blocks = blocks
        same_blocks &= blocks
    print("相同板块", same_blocks)
    for code in code_blocks:
        if len(code_blocks[code] & same_blocks) == len(same_blocks):
            if code in codes:
                continue
            print(code, code_blocks[code])
def __load_target_codes_v1():
    """
    50亿以下的
    @return:
    """
    def get_zylt(code):
        zylt_volume_map = global_util.zylt_volume_map
        last_trade_day = __DataLoader.trade_days[0]
        volume = zylt_volume_map.get(code)
        # 今日涨停价要突破昨日最高价
        k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
        return k_bars[0]["close"] * volume * tool.get_limit_up_rate(code)
    __DataLoader = DataLoader('2025-06-13')
    global_data_loader.load_zyltgb_volume_from_db()
    results = __DataLoader.load_target_plate_and_codes()
    # for k in results:
    #     print(k, results[k])
    plates = ["天然气", "军工"]
    print("==========新题材=======")
    for p in plates:
        codes = [x for x in results.get(p)]  # if get_zylt(x) < 31e8
        print("======", p)
        for code in codes:
            print("\t\t", code, gpcode_manager.CodesNameManager().get_code_name(code))
if __name__ == "__main__":
    # __load_target_codes_v1()
    __DataLoader = DataLoader("2025-06-04")
    # __test_jx_blocks(__DataLoader)
    # instance = StockVariables()
    # day = 5
    # instance.__setattr__(f"日最高价_{day}", 12.00)
    # print(instance.日最高价_5)
    DataLoader("2025-05-06").load_tick_data()
    # 下载目标票的板块
    # fdata = __DataLoader.load_all_refer_plates_of_codes()
    # print(fdata.get("000833"))
    # result_dict = __DataLoader.load_code_plates_for_refer()
    # print(result_dict["301279"])
    results = __DataLoader.load_target_plate_and_codes()
    # for k in results:
    #     print(k, results[k])
    plates = ["锂电池"]
    print("==========新题材=======")
    for p in plates:
        print(p, results.get(p))
    # print("食品饮料", results.get("食品饮料"))
    # print("锂电池", results.get("锂电池"))
    # print("数字经济", results.get("数字经济"))
    # print("地产链", results.get("地产链"))
    # print("物流", results.get("物流"))
    # 下载涨停原因板块对应的代码
    plates = __DataLoader.get_limit_up_reasons_with_plate_code()
    for p in plates:
        print(p)
        __DataLoader.load_plate_codes(p[0], p[1])
    # DataLoader("2025-05-06").load_tick_data()
    #
    # print(re.match(r"风险|立案", "风险123"))