""" 热门板块数据处理 """ import datetime import json import init import tool from db import redis_manager from db import mysql_data import limit_up_time_manager import gpcode_manager from l2 import code_price_manager from third_data.history_k_data_util import HistoryKDatasUtils __redisManager = redis_manager.RedisManager(0) INVALID_BLOCKS = ["其他", "ST股", "ST摘帽", "业绩增长", "业绩预增", "公告", "次新股"] def __get_redis(): return __redisManager.getRedis() class XGBHotBlockDataManager: total_datas = [] __last_datas = {} latest_datas = None @classmethod def save(cls, day, datas): mysqldb = mysql_data.Mysqldb() # 统计代码所属板块 code_block_dict = {} for data in datas: for code_info in data[2]: code = code_info[0][1].split(".")[0] if not code_block_dict.get(code): code_block_dict[code] = set() code_block_dict[code].add(data[0]) # 如果本次和上次相差的代码且之前有过涨停就判断为炸板 if cls.latest_datas: latest_codes = set() for data in cls.latest_datas: for code_info in data[2]: code = code_info[0][1].split(".")[0] latest_codes.add(code) now_codes = set(code_block_dict.keys()) del_set = latest_codes - now_codes for code in del_set: # 获取之前是否涨停过 results = cls.list_by_code(code, tool.get_now_date()) if results: first_limit_up_time = results[0][8] if len(first_limit_up_time) > 6: # 之前涨停过,更新炸板时间 for result in results: mysqldb.execute( f"update xgb_hot_block set _open_limit_up_time='{tool.get_now_time_str()}',_update_time=now() where _id='{result[0]}'") cls.latest_datas = datas for data in datas: for code_info in data[2]: code = code_info[0][1].split(".")[0] _id = f"{day}_{data[0]}_{code}" result = mysqldb.select_one("select * from xgb_hot_block where _id='{}'".format(_id)) limit_up_time = code_info[4] if len(limit_up_time) <= 6: limit_up_time = '' # 00/60开头代码 有涨停时间 if code.find("00") == 0 or code.find("60") == 0: if len(limit_up_time) > 6: code_price_manager.Buy1PriceManager.set_limit_up_time(code, limit_up_time) if not result: mysqldb.execute( f"insert into xgb_hot_block(_id,_day,_block_name,_code,_limit_up_time,_price,_rate,_update_time,_first_limit_up_time,_code_name) values('{_id}','{day}','{data[0]}','{code}','{code_info[4]}','{code_info[2]}','{code_info[3]}',now(),'{limit_up_time}','{code_info[0][0]}')") else: # 如果上次的数据和这次一样就不更新,否则需要更新数据 if cls.__last_datas.get(_id) != code_info: mysqldb.execute( f"update xgb_hot_block set _limit_up_time='{code_info[4]}',_price='{code_info[2]}',_rate='{code_info[3]}',_update_time=now(),_code_name='{code_info[0][0]}' where _id='{_id}'") if (not result[8] or len(result[8]) <= 6) and len(limit_up_time) >= 6: # 没有首次涨停时间 mysqldb.execute( f"update xgb_hot_block set _first_limit_up_time='{limit_up_time}',_update_time=now() where _id='{_id}'") cls.__last_datas[_id] = code_info # 获取原来的代码所属板块,删除之前错误的板块 old_datas = XGBHotBlockDataManager.list_by_code(code, day) if old_datas: for d in old_datas: if d[2] not in code_block_dict[code]: mysqldb.execute(f"delete from xgb_hot_block where _id='{d[0]}'") # 将今日数据导入到内存中 XGBHotBlockDataManager.total_datas = XGBHotBlockDataManager.list_all(tool.get_now_date_str()) @staticmethod def list_all(day): mysqldb = mysql_data.Mysqldb() return mysqldb.select_all(f"select * from xgb_hot_block where _day='{day}'") @staticmethod def list_by_code(code, day): mysqldb = mysql_data.Mysqldb() return mysqldb.select_all(f"select * from xgb_hot_block where _code='{code}' and _day='{day}'") @staticmethod def list_by_block(block_name, day): mysqldb = mysql_data.Mysqldb() return mysqldb.select_all(f"select * from xgb_hot_block where _block_name='{block_name}' and _day='{day}'") @staticmethod def list_blocks_with_day(days): mysqldb = mysql_data.Mysqldb() sql = "select _block_name,_day from xgb_hot_block where " wheres = [] for day in days: wheres.append(f"_day = '{day}'") sql += " or ".join(wheres) sql += " group by _block_name,_day" results = mysqldb.select_all(sql) return results @staticmethod def get_latest_blocks(code): wheres = [] for b in INVALID_BLOCKS: wheres.append(f"hb.`_block_name` != '{b}'") wheres = " and ".join(wheres) sql = f"SELECT GROUP_CONCAT(_block_name) FROM (SELECT hb.`_block_name`,hb.`_day` FROM `xgb_hot_block` hb WHERE hb.`_code`='{code}' AND {wheres} ORDER BY hb.`_day` DESC LIMIT 10) a GROUP BY a._day ORDER BY a._day DESC LIMIT 1" mysqldb = mysql_data.Mysqldb() return mysqldb.select_one(sql) class XGBDataUtil: # 板块中是否有高位票 @classmethod def is_has_high_code(cls, block_name, latest_datas): if not latest_datas: return False for block in latest_datas: if block[0] != block_name: continue for code_data in block[2]: if code_data[1] and code_data[1].find("板") >= 0: day = code_data[1][-2:-1] if int(day) >= 2: return True, code_data return False, None __blocks_dict = {} def get_latest_block(day_count=15): now_day = tool.get_now_date_str() if now_day in __blocks_dict: return __blocks_dict[now_day] now_date = datetime.datetime.now() end_date = HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str()) start_date = now_date - datetime.timedelta(days=(day_count * 2 + 10)) start_date = start_date.strftime("%Y-%m-%d") days = HistoryKDatasUtils.get_trading_dates(start_date, end_date) days = days[0 - day_count:] results = XGBHotBlockDataManager.list_blocks_with_day(days) __blocks_dict[now_day] = results return results # 加载之前的板块 __before_block_dict = {} def __load_before_block(code): if code not in __before_block_dict: blocks = XGBHotBlockDataManager.get_latest_blocks(code) if blocks: blocks = blocks[0].split(',') __before_block_dict[code] = blocks return __before_block_dict.get(code) def __get_code_from_code_info(code_info): code = code_info[0][1].split(".")[0] return code # 获取代码所在板块信息 def get_info(code): blocks = get_code_blocks(code) if not blocks: blocks = __load_before_block(code) latest_blocks_info = get_latest_block(15) # 统计板块出现次数 # 获取目标板块 target_block = None if blocks: for block in blocks: if block in INVALID_BLOCKS: continue target_block = block break if not target_block: return None # 统计板块中的平均涨幅 total_rates = 0 total_count = 0 target_block_rate = 0 if XGBHotBlockDataManager.latest_datas: for b in XGBHotBlockDataManager.latest_datas: if b[0] != target_block: continue if b[1]: target_block_rate = float(b[1].strip().replace("%", '')) for code_info in b[2]: code__ = __get_code_from_code_info(code_info) rate = float(code_info[3].strip().replace("%", '')) if code__ != code: total_rates += rate total_count += 1 # 统计板块出现的次数 target_block_histry_count = 0 if blocks: for block_info in latest_blocks_info: if block_info[0] != target_block: continue target_block_histry_count += 1 # 是否出现过高位板 high_block_infos = [] for block in blocks: if block == '公告' or block == '其他': continue if XGBHotBlockDataManager.latest_datas: has_high, high_code_info = XGBDataUtil.is_has_high_code(block, XGBHotBlockDataManager.latest_datas) if has_high: high_block_info = (high_code_info[0], high_code_info[1]) high_block_infos.append(high_block_info) limit_up_codes_info_set = set() # 板块下的代码数量 block_codes_set = set() if XGBHotBlockDataManager.latest_datas: for block in XGBHotBlockDataManager.latest_datas: if block[0] == target_block: for code_data in block[2]: code_ = code_data[0][1].split('.')[0] if len(code_data[4]) > 6: limit_up_codes_info_set.add((code_, code_data[4])) block_codes_set.add(code_) # 获取涨停的顺序 limit_up_index = -1 limit_up_codes_info_list = list(limit_up_codes_info_set) limit_up_codes_info_list.sort(key=lambda x: x[1]) for i in range(0, len(limit_up_codes_info_list)): if limit_up_codes_info_list[i][0] == code: limit_up_index = i if limit_up_index < 0: limit_up_index = len(limit_up_codes_info_list) # 涨停代码集合 limit_up_codes_set = set([k[0] for k in limit_up_codes_info_list]) limit_up_codes_set.discard(code) block_codes_set.discard(code) limit_up_codes_count = len(block_codes_set) total_datas = XGBHotBlockDataManager.total_datas break_codes = set() re_limit_codes = set() for data in total_datas: block = data[2] if block != target_block: continue code = data[3] limit_up_time = data[4] first_limit_up_time = data[8] open_limit_up_time = data[10] if len(limit_up_time) <= 6 and first_limit_up_time and len(first_limit_up_time) > 6: break_codes.add(code) if len(limit_up_time) > 6 and open_limit_up_time and len(open_limit_up_time) > 6: re_limit_codes.add(code) # 排除自己 break_codes.discard(code) # 排除已经涨停的代码 break_codes = break_codes.difference(limit_up_codes_set) # 炸板个数 break_size = len(break_codes) # 炸板回封数量 re_limit_up_size = len(re_limit_codes) fresult = { # 目标板块信息(板块名称,板块涨幅,历史板块出现次数) "target_block_info": (target_block, target_block_rate, target_block_histry_count), # 涨停顺序 "limit_up_index": limit_up_index, # 涨停代码数量 "limit_up_codes_count": limit_up_codes_count, # 板块代码涨幅信息 "block_codes_rates_info": (total_rates, total_count), # 炸板代码数量 "break_size": break_size, # 炸板回封数量 "re_limit_up_size": re_limit_up_size, # 高位版信息 "high_block_infos": high_block_infos, } return fresult # 保存数据 def save_datas(day, datas): XGBHotBlockDataManager.save(day, datas) code_block_dict = {} block_codes_dict = {} for block in datas: codes = [] for code_data in block[2]: code = code_data[0][1].split(".")[0] if gpcode_manager.FirstCodeManager.is_in_first_record(code): limit_up_time = code_data[4] if limit_up_time and len(limit_up_time) > 6: if not limit_up_time_manager.get_limit_up_time(code): limit_up_time_manager.save_limit_up_time(code, limit_up_time) if code not in code_block_dict: code_block_dict[code] = set() code_block_dict[code].add(block[0]) codes.append(code) block_codes_dict[block[0]] = codes __save_block_codes(block_codes_dict) for key in code_block_dict: __save_code_block(key, code_block_dict[key]) # 保存代码所属板块 def __save_code_block(code, blocks): __get_redis().setex(f"code_blocks-{code}", tool.get_expire(), json.dumps(list(blocks))) # 保存板块下的代码 def __save_block_codes(block_codes): __get_redis().setex(f"blocks_codes", tool.get_expire(), json.dumps(block_codes)) def __get_block_codes(): val = __get_redis().get(f"blocks_codes") if val is None: return None return json.loads(val) # 获取代码板块 def get_code_blocks(code): val = __get_redis().get(f"code_blocks-{code}") if val is None: return None return json.loads(val) # 获取板块下的所有代码 def get_block_codes(block): block_codes = __get_block_codes() if block_codes: block_codes.get(block) return None if __name__ == "__main__": # XGBHotBlockDataManager.total_datas=XGBHotBlockDataManager.list_all("2023-03-23") # get_info('002230') print(__load_before_block("603163"))