Administrator
2023-07-11 18cd3598a6abf19dac1a02eb19c9db8edae8cc0c
third_data/hot_block_data_process.py
@@ -1,26 +1,334 @@
"""
热门板块数据处理
"""
import datetime
import json
import tool
from utils import tool
from db import redis_manager
from db import mysql_data
from code_attribute import limit_up_time_manager, gpcode_manager
from l2 import code_price_manager
from third_data.history_k_data_util import HistoryKDatasUtils
__redisManager = redis_manager.RedisManager(0)
INVALID_BLOCKS = ["其他", "ST股", "ST摘帽", "业绩增长", "业绩预增", "公告", "次新股"]
def __get_redis():
    return __redisManager.getRedis()
class XGBHotBlockDataManager:
    total_datas = []
    __last_datas = {}
    latest_datas = None
    @classmethod
    def save(cls, day, datas):
        mysqldb = mysql_data.Mysqldb()
        # 统计代码所属板块
        code_block_dict = {}
        for data in datas:
            for code_info in data[2]:
                code = code_info[0][1].split(".")[0]
                if not code_block_dict.get(code):
                    code_block_dict[code] = set()
                code_block_dict[code].add(data[0])
        # 如果本次和上次相差的代码且之前有过涨停就判断为炸板
        if cls.latest_datas:
            latest_codes = set()
            for data in cls.latest_datas:
                for code_info in data[2]:
                    code = code_info[0][1].split(".")[0]
                    latest_codes.add(code)
            now_codes = set(code_block_dict.keys())
            del_set = latest_codes - now_codes
            for code in del_set:
                # 获取之前是否涨停过
                results = cls.list_by_code(code, tool.get_now_date())
                if results:
                    first_limit_up_time = results[0][8]
                    if len(first_limit_up_time) > 6:
                        # 之前涨停过,更新炸板时间
                        for result in results:
                            mysqldb.execute(
                                f"update xgb_hot_block set _open_limit_up_time='{tool.get_now_time_str()}',_update_time=now() where _id='{result[0]}'")
        cls.latest_datas = datas
        for data in datas:
            for code_info in data[2]:
                code = code_info[0][1].split(".")[0]
                _id = f"{day}_{data[0]}_{code}"
                result = mysqldb.select_one("select * from xgb_hot_block where _id='{}'".format(_id))
                limit_up_time = code_info[4]
                if len(limit_up_time) <= 6:
                    limit_up_time = ''
                # 00/60开头代码 有涨停时间
                if code.find("00") == 0 or code.find("60") == 0:
                    if len(limit_up_time) > 6:
                        code_price_manager.Buy1PriceManager.set_limit_up_time(code, limit_up_time)
                if not result:
                    mysqldb.execute(
                        f"insert into xgb_hot_block(_id,_day,_block_name,_code,_limit_up_time,_price,_rate,_update_time,_first_limit_up_time,_code_name) values('{_id}','{day}','{data[0]}','{code}','{code_info[4]}','{code_info[2]}','{code_info[3]}',now(),'{limit_up_time}','{code_info[0][0]}')")
                else:
                    # 如果上次的数据和这次一样就不更新,否则需要更新数据
                    if cls.__last_datas.get(_id) != code_info:
                        mysqldb.execute(
                            f"update xgb_hot_block set _limit_up_time='{code_info[4]}',_price='{code_info[2]}',_rate='{code_info[3]}',_update_time=now(),_code_name='{code_info[0][0]}' where _id='{_id}'")
                        if (not result[8] or len(result[8]) <= 6) and len(limit_up_time) >= 6:
                            # 没有首次涨停时间
                            mysqldb.execute(
                                f"update xgb_hot_block set _first_limit_up_time='{limit_up_time}',_update_time=now() where _id='{_id}'")
                cls.__last_datas[_id] = code_info
                # 获取原来的代码所属板块,删除之前错误的板块
                old_datas = XGBHotBlockDataManager.list_by_code(code, day)
                if old_datas:
                    for d in old_datas:
                        if d[2] not in code_block_dict[code]:
                            mysqldb.execute(f"delete from xgb_hot_block where _id='{d[0]}'")
        # 将今日数据导入到内存中
        XGBHotBlockDataManager.total_datas = XGBHotBlockDataManager.list_all(tool.get_now_date_str())
    @staticmethod
    def list_all(day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from xgb_hot_block where _day='{day}'")
    @staticmethod
    def list_by_code(code, day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from xgb_hot_block where _code='{code}' and _day='{day}'")
    @staticmethod
    def list_by_block(block_name, day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from xgb_hot_block where _block_name='{block_name}' and _day='{day}'")
    @staticmethod
    def list_blocks_with_day(days):
        mysqldb = mysql_data.Mysqldb()
        sql = "select _block_name,_day from xgb_hot_block where "
        wheres = []
        for day in days:
            wheres.append(f"_day = '{day}'")
        sql += " or ".join(wheres)
        sql += " group by _block_name,_day"
        results = mysqldb.select_all(sql)
        return results
    @staticmethod
    def get_latest_blocks(code):
        wheres = []
        for b in INVALID_BLOCKS:
            wheres.append(f"hb.`_block_name` != '{b}'")
        wheres = " and ".join(wheres)
        sql = f"SELECT GROUP_CONCAT(_block_name) FROM (SELECT hb.`_block_name`,hb.`_day` FROM `xgb_hot_block` hb WHERE hb.`_code`='{code}' AND {wheres} ORDER BY hb.`_day` DESC LIMIT 10) a  GROUP BY a._day ORDER BY a._day DESC LIMIT 1"
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_one(sql)
class XGBDataUtil:
    # 板块中是否有高位票
    @classmethod
    def is_has_high_code(cls, block_name, latest_datas):
        if not latest_datas:
            return False
        for block in latest_datas:
            if block[0] != block_name:
                continue
            for code_data in block[2]:
                if code_data[1] and code_data[1].find("板") >= 0:
                    day = code_data[1][-2:-1]
                    if int(day) >= 2:
                        return True, code_data
        return False, None
__blocks_dict = {}
def get_latest_block(day_count=15):
    now_day = tool.get_now_date_str()
    if now_day in __blocks_dict:
        return __blocks_dict[now_day]
    now_date = datetime.datetime.now()
    end_date = HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
    start_date = now_date - datetime.timedelta(days=(day_count * 2 + 10))
    start_date = start_date.strftime("%Y-%m-%d")
    days = HistoryKDatasUtils.get_trading_dates(start_date, end_date)
    days = days[0 - day_count:]
    results = XGBHotBlockDataManager.list_blocks_with_day(days)
    __blocks_dict[now_day] = results
    return results
# 加载之前的板块
__before_block_dict = {}
def __load_before_block(code):
    if code not in __before_block_dict:
        blocks = XGBHotBlockDataManager.get_latest_blocks(code)
        if blocks:
            blocks = blocks[0].split(',')
            __before_block_dict[code] = blocks
    return __before_block_dict.get(code)
def __get_code_from_code_info(code_info):
    code = code_info[0][1].split(".")[0]
    return code
# 获取代码所在板块信息
def get_info(code):
    blocks = get_code_blocks(code)
    if not blocks:
        blocks = __load_before_block(code)
    latest_blocks_info = get_latest_block(15)
    # 统计板块出现次数
    # 获取目标板块
    target_block = None
    if blocks:
        for block in blocks:
            if block in INVALID_BLOCKS:
                continue
            target_block = block
            break
    if not target_block:
        return None
    # 统计板块中的平均涨幅
    total_rates = 0
    total_count = 0
    target_block_rate = 0
    if XGBHotBlockDataManager.latest_datas:
        for b in XGBHotBlockDataManager.latest_datas:
            if b[0] != target_block:
                continue
            if b[1]:
                target_block_rate = float(b[1].strip().replace("%", ''))
            for code_info in b[2]:
                code__ = __get_code_from_code_info(code_info)
                rate = float(code_info[3].strip().replace("%", ''))
                if code__ != code:
                    total_rates += rate
                    total_count += 1
    # 统计板块出现的次数
    target_block_histry_count = 0
    if blocks:
        for block_info in latest_blocks_info:
            if block_info[0] != target_block:
                continue
            target_block_histry_count += 1
    # 是否出现过高位板
    high_block_infos = []
    for block in blocks:
        if block == '公告' or block == '其他':
            continue
        if XGBHotBlockDataManager.latest_datas:
            has_high, high_code_info = XGBDataUtil.is_has_high_code(block, XGBHotBlockDataManager.latest_datas)
            if has_high:
                high_block_info = (high_code_info[0], high_code_info[1])
                high_block_infos.append(high_block_info)
    limit_up_codes_info_set = set()
    # 板块下的代码数量
    block_codes_set = set()
    if XGBHotBlockDataManager.latest_datas:
        for block in XGBHotBlockDataManager.latest_datas:
            if block[0] == target_block:
                for code_data in block[2]:
                    code_ = code_data[0][1].split('.')[0]
                    if len(code_data[4]) > 6:
                        limit_up_codes_info_set.add((code_, code_data[4]))
                        block_codes_set.add(code_)
    # 获取涨停的顺序
    limit_up_index = -1
    limit_up_codes_info_list = list(limit_up_codes_info_set)
    limit_up_codes_info_list.sort(key=lambda x: x[1])
    for i in range(0, len(limit_up_codes_info_list)):
        if limit_up_codes_info_list[i][0] == code:
            limit_up_index = i
    if limit_up_index < 0:
        limit_up_index = len(limit_up_codes_info_list)
    # 涨停代码集合
    limit_up_codes_set = set([k[0] for k in limit_up_codes_info_list])
    limit_up_codes_set.discard(code)
    block_codes_set.discard(code)
    limit_up_codes_count = len(block_codes_set)
    total_datas = XGBHotBlockDataManager.total_datas
    break_codes = set()
    re_limit_codes = set()
    for data in total_datas:
        block = data[2]
        if block != target_block:
            continue
        code = data[3]
        limit_up_time = data[4]
        first_limit_up_time = data[8]
        open_limit_up_time = data[10]
        if len(limit_up_time) <= 6 and first_limit_up_time and len(first_limit_up_time) > 6:
            break_codes.add(code)
        if len(limit_up_time) > 6 and open_limit_up_time and len(open_limit_up_time) > 6:
            re_limit_codes.add(code)
    # 排除自己
    break_codes.discard(code)
    # 排除已经涨停的代码
    break_codes = break_codes.difference(limit_up_codes_set)
    # 炸板个数
    break_size = len(break_codes)
    # 炸板回封数量
    re_limit_up_size = len(re_limit_codes)
    fresult = {
        # 目标板块信息(板块名称,板块涨幅,历史板块出现次数)
        "target_block_info": (target_block, target_block_rate, target_block_histry_count),
        # 涨停顺序
        "limit_up_index": limit_up_index,
        # 涨停代码数量
        "limit_up_codes_count": limit_up_codes_count,
        # 板块代码涨幅信息
        "block_codes_rates_info": (total_rates, total_count),
        # 炸板代码数量
        "break_size": break_size,
        # 炸板回封数量
        "re_limit_up_size": re_limit_up_size,
        # 高位版信息
        "high_block_infos": high_block_infos,
    }
    return fresult
# 保存数据
def save_datas(datas):
def save_datas(day, datas):
    XGBHotBlockDataManager.save(day, datas)
    code_block_dict = {}
    block_codes_dict = {}
    for block in datas:
        codes = []
        for code_data in block[2]:
            code = code_data[0].split(".")[0]
            code = code_data[0][1].split(".")[0]
            if gpcode_manager.FirstCodeManager.is_in_first_record(code):
                limit_up_time = code_data[4]
                if limit_up_time and len(limit_up_time) > 6:
                    if not limit_up_time_manager.get_limit_up_time(code):
                        limit_up_time_manager.save_limit_up_time(code, limit_up_time)
            if code not in code_block_dict:
                code_block_dict[code] = set()
            code_block_dict[code].add(block[0])
@@ -63,5 +371,8 @@
        block_codes.get(block)
    return None
if __name__ == "__main__":
    print(get_code_blocks("600468"))
    # XGBHotBlockDataManager.total_datas=XGBHotBlockDataManager.list_all("2023-03-23")
    # get_info('002230')
    print(__load_before_block("603163"))