Administrator
2024-11-15 b53b0f632cca75df8f39a17fab3d26caeecb2caf
trade/buy_radical/block_special_codes_manager.py
@@ -1,7 +1,9 @@
"""
板块辨识度票管理
"""
import constant
from db import mysql_data_delegate as mysql_data
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import tool
@@ -37,6 +39,159 @@
            self.mysql.execute(sql)
class AnalysisBlockSpecialCodesManager:
    __mysql = mysql_data.Mysqldb()
    def __get_code_blocks(self, min_day, max_day):
        results = self.__mysql.select_all(
            f"SELECT r.`_hot_block_name`, r.`_code`, COUNT(*) FROM kpl_limit_up_record  r WHERE r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' group by  r.`_hot_block_name`, r.`_code`")
        return results
    def __get_limit_up_info(self, min_day):
        sql = f"SELECT r.`_code`, COUNT(r.`_code`),r.`_code_name`,IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 )  FROM (SELECT * FROM kpl_limit_up_record  r ORDER BY r.`_create_time` DESC)  r WHERE r.`_day`>'{min_day}' GROUP BY r.`_code`"
        results = self.__mysql.select_all(sql)
        # {"代码":(涨停次数, 名称, 自由流通市值)}
        return {x[0]: (x[1], x[2], x[3]) for x in results}
    def __get_top(self, block, min_day, max_day):
        sql = f"SELECT r.`_code`,r.`_code_name`, COUNT(*), IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 )  FROM (SELECT * FROM kpl_limit_up_record  r ORDER BY r.`_create_time` DESC)  r WHERE r.`_hot_block_name`='{block}' AND r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' GROUP BY r.`_code`"
        results = self.__mysql.select_all(sql)
        results = list(results)
        info_map = {x[1]: x for x in results}
        max_count = min(10, len(results) // 2)
        results.sort(key=lambda x: x[2], reverse=True)
        names1 = set([x[0] for x in results[:max_count]])
        results.sort(key=lambda x: x[3], reverse=True)
        names2 = set([x[0] for x in results[:max_count]])
        fnames = names1 & names2
        return fnames
    def __get_block_map(self):
        """
        获取板块裂变
        @return:
        """
        constant.get_path_prefix()
        with open(f"{constant.get_path_prefix()}/板块对应.txt", encoding="utf-8") as f:
            lines = f.readlines()
            block_map = {}
            for line in lines:
                line = line.strip()
                if line:
                    line = line.replace(",", ",").replace("丨", "|")
                    parent_blocks = set()
                    if line.find("|") >= 0:
                        sts = line.split("|")
                        parent_blocks |= set(sts[0].split(","))
                        line = sts[1]
                    blocks = line.split(",")
                    if not blocks:
                        continue
                    if not parent_blocks:
                        parent_blocks.add(blocks[0])
                    for b in blocks:
                        b = b.strip()
                        if b not in block_map:
                            block_map[b] = set()
                        block_map[b] |= parent_blocks
                    if len(parent_blocks) > 1:
                        # parent加入
                        for b in parent_blocks:
                            if b not in block_map:
                                block_map[b] = set()
                            block_map[b].add(b)
            return block_map
    def test_block_map(self):
        print(self.__get_block_map())
    def get_block_special_codes(self):
        """
        获取板块有辨识度的代码
        @return:
        """
        trading_dates = HistoryKDatasUtils.get_latest_trading_date(15)
        max_day = trading_dates[-1]
        min_day = tool.date_sub(max_day, 365)
        block_map = self.__get_block_map()
        # [(板块名称,代码, 在板块中的涨停次数)]
        code_block_infos = self.__get_code_blocks(min_day, max_day)
        code_block_dict = {}  # {"代码":{"板块": 涨停次数}}
        for b in code_block_infos:
            if b[1] not in code_block_dict:
                code_block_dict[b[1]] = {}
            bs = block_map.get(b[0])
            if not bs:
                bs = {b[0]}
            for bb in bs:
                if bb not in code_block_dict[b[1]]:
                    code_block_dict[b[1]][bb] = 0
                code_block_dict[b[1]][bb] += b[2]
        block_codes_dict = {}  # {"板块":[(代码,涨停次数)]}
        for code in code_block_dict:
            for b in code_block_dict[code]:
                if b not in block_codes_dict:
                    block_codes_dict[b] = []
                block_codes_dict[b].append((code, code_block_dict[code][b]))
        limit_up_info_map = self.__get_limit_up_info(min_day)
        fdatas = []
        for b in block_codes_dict:
            # if b != '人工智能':
            #     continue
            if b in constant.KPL_INVALID_BLOCKS:
                continue
            code_info_list = block_codes_dict[b]
            # code_info_list.sort(key=lambda x: x[1], reverse=True)
            max_count = len(code_info_list) // 3
            # code_info_list.sort(key=lambda x: float(limit_up_info_map[x[0]][2]), reverse=True)
            # code_info_list.sort(key=lambda x: code_block_dict[x[0]][b], reverse=True)
            zylt_list = code_info_list[:max_count]
            zylt_list = [x[0] for x in zylt_list]
            # fcodes = list(set(count_list) & set(zylt_list))
            # fcodes.sort(key=lambda x: code_block_dict[x][b], reverse=True)
            # ffcodes = fcodes[:5]
            index = 0
            # 获取股价,是否是ST
            if not zylt_list:
                continue
            juejin_results = HistoryKDatasUtils.get_gp_latest_info(zylt_list, fields="sec_id,sec_level,upper_limit")
            if juejin_results is None:
                continue
            juejin_result_dict = {x['sec_id']: (x['sec_id'], x['sec_level'], x['upper_limit']) for x in juejin_results}
            for code in zylt_list:
                if code_block_dict[code][b] <= 3:
                    # 累计涨停次数小于3次
                    continue
                if code not in juejin_result_dict:
                    # 查询不到信息
                    continue
                if juejin_result_dict[code][1] != 1:
                    # 非正常票
                    continue
                if juejin_result_dict[code][2] < 3:
                    # 小于3块
                    continue
                index += 1
                fdatas.append(
                    (b, limit_up_info_map[code][1], code, code_block_dict[code][b],
                     int(float(limit_up_info_map[code][2]))))
                if index >= 10:
                    break
        # BlockSpecialCodesManager().set_block_codes_list(fdatas)
        return fdatas
if __name__ == "__main__":
    codes = BlockSpecialCodesManager().get_block_codes("证券")
    print(codes)
    datas = AnalysisBlockSpecialCodesManager().get_block_special_codes()
    for d in datas:
        print(d)
    # BlockSpecialCodesManager().set_block_codes_list(datas)
    print(datas)