Administrator
2023-03-27 8535f56dbf6e410b4a09f02f95d4d49bcc8753f2
third_data/hot_block_data_process.py
@@ -5,6 +5,7 @@
import tool
from db import redis_manager
from db import mysql_data
__redisManager = redis_manager.RedisManager(0)
@@ -13,8 +14,118 @@
    return __redisManager.getRedis()
class XGBHotBlockDataManager:
    total_datas = []
    __last_datas = {}
    latest_datas = None
    @classmethod
    def save(cls, day, datas):
        cls.latest_datas = datas
        mysqldb = mysql_data.Mysqldb()
        # 统计代码所属板块
        code_block_dict = {}
        for data in datas:
            for code_info in data[2]:
                code = code_info[0].split(".")[0]
                if not code_block_dict.get(code):
                    code_block_dict[code] = set()
                code_block_dict[code].add(data[0])
        for data in datas:
            for code_info in data[2]:
                code = code_info[0].split(".")[0]
                _id = f"{day}_{data[0]}_{code}"
                result = mysqldb.select_one("select * from xgb_hot_block where _id='{}'".format(_id))
                limit_up_time = code_info[4]
                if len(limit_up_time) <= 6:
                    limit_up_time = ''
                if not result:
                    mysqldb.execute(
                        f"insert into xgb_hot_block(_id,_day,_block_name,_code,_limit_up_time,_price,_rate,_update_time,_first_limit_up_time) values('{_id}','{day}','{data[0]}','{code}','{code_info[4]}','{code_info[2]}','{code_info[3]}',now(),'{limit_up_time}')")
                else:
                    # 如果上次的数据和这次一样就不更新,否则需要更新数据
                    if cls.__last_datas.get(_id) != code_info:
                        mysqldb.execute(
                            f"update xgb_hot_block set _limit_up_time='{code_info[4]}',_price='{code_info[2]}',_rate='{code_info[3]}',_update_time=now() where _id='{_id}'")
                        if (not result[8] or len(result[8]) <= 6) and len(limit_up_time) >= 6:
                            mysqldb.execute(
                                f"update xgb_hot_block set _first_limit_up_time='{limit_up_time}',_update_time=now() where _id='{_id}'")
                cls.__last_datas[_id] = code_info
                # 获取原来的代码所属板块,删除之前错误的板块
                old_datas = XGBHotBlockDataManager.list_by_code(code, day)
                if old_datas:
                    for d in old_datas:
                        if d[2] not in code_block_dict[code]:
                            mysqldb.execute(f"delete from xgb_hot_block where _id='{d[0]}'")
        # 将今日数据导入到内存中
        XGBHotBlockDataManager.total_datas = XGBHotBlockDataManager.list_all(tool.get_now_date_str())
    @staticmethod
    def list_all(day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from xgb_hot_block where _day='{day}'")
    @staticmethod
    def list_by_code(code, day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from xgb_hot_block where _code='{code}' and _day='{day}'")
    @staticmethod
    def list_by_block(block_name, day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from xgb_hot_block where _block_name='{block_name}' and _day='{day}'")
# 获取代码所在板块信息
def get_info(code):
    blocks = get_code_blocks(code)
    target_block = None
    if blocks:
        for block in blocks:
            if block == '公告' or block == '其他':
                continue
            target_block = block
            break
    if not target_block:
        return None
    limit_up_codes_set = set()
    if XGBHotBlockDataManager.latest_datas:
        for block in XGBHotBlockDataManager.latest_datas:
            if block[0] == target_block:
                for code_data in block[2]:
                    if len(code_data[4]) > 6:
                        limit_up_codes_set.add(code_data[0].split('.')[0])
    limit_up_codes_set.discard(code)
    limit_up_count = len(limit_up_codes_set)
    total_datas = XGBHotBlockDataManager.total_datas
    break_codes = set()
    for data in total_datas:
        block = data[2]
        if block != target_block:
            continue
        code = data[3]
        limit_up_time = data[4]
        first_limit_up_time = data[8]
        if len(limit_up_time) <= 6 and first_limit_up_time and len(first_limit_up_time) > 6:
            break_codes.add(code)
    # 排除自己
    break_codes.discard(code)
    # 排除已经涨停的代码
    break_codes = break_codes.difference(limit_up_codes_set)
    # 炸板个数
    break_size = len(break_codes)
    return target_block, limit_up_count, break_size
# 保存数据
def save_datas(datas):
def save_datas(day, datas):
    XGBHotBlockDataManager.save(day, datas)
    code_block_dict = {}
    block_codes_dict = {}
    for block in datas:
@@ -63,5 +174,9 @@
        block_codes.get(block)
    return None
if __name__ == "__main__":
    print(get_code_blocks("600468"))
    # XGBHotBlockDataManager.total_datas=XGBHotBlockDataManager.list_all("2023-03-23")
    # get_info('002230')
    codes = set([1, 2, 3, 4])
    print(codes.difference(set([1, 2])))