""" 热门板块数据处理 """ import datetime import json import juejin import tool from db import redis_manager from db import mysql_data import limit_up_time_manager import gpcode_manager __redisManager = redis_manager.RedisManager(0) def __get_redis(): return __redisManager.getRedis() class XGBHotBlockDataManager: total_datas = [] __last_datas = {} latest_datas = None @classmethod def save(cls, day, datas): cls.latest_datas = datas mysqldb = mysql_data.Mysqldb() # 统计代码所属板块 code_block_dict = {} for data in datas: for code_info in data[2]: code = code_info[0][1].split(".")[0] if not code_block_dict.get(code): code_block_dict[code] = set() code_block_dict[code].add(data[0]) for data in datas: for code_info in data[2]: code = code_info[0][1].split(".")[0] _id = f"{day}_{data[0]}_{code}" result = mysqldb.select_one("select * from xgb_hot_block where _id='{}'".format(_id)) limit_up_time = code_info[4] if len(limit_up_time) <= 6: limit_up_time = '' if not result: mysqldb.execute( f"insert into xgb_hot_block(_id,_day,_block_name,_code,_limit_up_time,_price,_rate,_update_time,_first_limit_up_time,_code_name) values('{_id}','{day}','{data[0]}','{code}','{code_info[4]}','{code_info[2]}','{code_info[3]}',now(),'{limit_up_time}','{code_info[0][0]}')") else: # 如果上次的数据和这次一样就不更新,否则需要更新数据 if cls.__last_datas.get(_id) != code_info: mysqldb.execute( f"update xgb_hot_block set _limit_up_time='{code_info[4]}',_price='{code_info[2]}',_rate='{code_info[3]}',_update_time=now(),_code_name='{code_info[0][0]}' where _id='{_id}'") if (not result[8] or len(result[8]) <= 6) and len(limit_up_time) >= 6: mysqldb.execute( f"update xgb_hot_block set _first_limit_up_time='{limit_up_time}',_update_time=now() where _id='{_id}'") cls.__last_datas[_id] = code_info # 获取原来的代码所属板块,删除之前错误的板块 old_datas = XGBHotBlockDataManager.list_by_code(code, day) if old_datas: for d in old_datas: if d[2] not in code_block_dict[code]: mysqldb.execute(f"delete from xgb_hot_block where _id='{d[0]}'") # 将今日数据导入到内存中 XGBHotBlockDataManager.total_datas = XGBHotBlockDataManager.list_all(tool.get_now_date_str()) @staticmethod def list_all(day): mysqldb = mysql_data.Mysqldb() return mysqldb.select_all(f"select * from xgb_hot_block where _day='{day}'") @staticmethod def list_by_code(code, day): mysqldb = mysql_data.Mysqldb() return mysqldb.select_all(f"select * from xgb_hot_block where _code='{code}' and _day='{day}'") @staticmethod def list_by_block(block_name, day): mysqldb = mysql_data.Mysqldb() return mysqldb.select_all(f"select * from xgb_hot_block where _block_name='{block_name}' and _day='{day}'") @staticmethod def list_blocks(days): mysqldb = mysql_data.Mysqldb() sql = "select distinct(_block_name) from xgb_hot_block where " wheres = [] for day in days: wheres.append(f"_day = '{day}'") sql += " or ".join(wheres) results = mysqldb.select_all(sql) fresult = set() for result in results: fresult.add(result[0]) return fresult __blocks_dict = {} def get_latest_block(day_count=15): now_day = tool.get_now_date_str() if now_day in __blocks_dict: return __blocks_dict[now_day] now_date = datetime.datetime.now() end_date = juejin.JueJinManager.get_previous_trading_date(tool.get_now_date_str()) start_date = now_date - datetime.timedelta(days=(day_count * 2 + 10)) start_date = start_date.strftime("%Y-%m-%d") days = juejin.JueJinManager.get_trading_dates(start_date, end_date) days = days[0 - day_count:] results = XGBHotBlockDataManager.list_blocks(days) __blocks_dict[now_day] = results return results # 获取代码所在板块信息 def get_info(code): blocks = get_code_blocks(code) # 判断是否有新概念 # 新概念 new_block = None latest_blocks = get_latest_block(15) for block in blocks: if block not in latest_blocks: new_block = block # 如果板块中的涨停票数》=2才算新题材 count = 0 if XGBHotBlockDataManager.latest_datas: for b in XGBHotBlockDataManager.latest_datas: if b[0] == new_block: for code_data in b[2]: if len(code_data[4]) > 6: count += 0 if count < 2: new_block = None if not new_block: break target_block = None if blocks: for block in blocks: if block == '公告' or block == '其他': continue target_block = block break if not target_block: return None limit_up_codes_set = set() if XGBHotBlockDataManager.latest_datas: for block in XGBHotBlockDataManager.latest_datas: if block[0] == target_block: for code_data in block[2]: if len(code_data[4]) > 6: limit_up_codes_set.add(code_data[0][1].split('.')[0]) limit_up_codes_set.discard(code) limit_up_count = len(limit_up_codes_set) total_datas = XGBHotBlockDataManager.total_datas break_codes = set() for data in total_datas: block = data[2] if block != target_block: continue code = data[3] limit_up_time = data[4] first_limit_up_time = data[8] if len(limit_up_time) <= 6 and first_limit_up_time and len(first_limit_up_time) > 6: break_codes.add(code) # 排除自己 break_codes.discard(code) # 排除已经涨停的代码 break_codes = break_codes.difference(limit_up_codes_set) # 炸板个数 break_size = len(break_codes) return target_block, limit_up_count, break_size, new_block # 保存数据 def save_datas(day, datas): XGBHotBlockDataManager.save(day, datas) code_block_dict = {} block_codes_dict = {} for block in datas: codes = [] for code_data in block[2]: code = code_data[0][1].split(".")[0] if gpcode_manager.FirstCodeManager.is_in_first_record(code): limit_up_time = code_data[4] if limit_up_time and len(limit_up_time) > 6: if not limit_up_time_manager.get_limit_up_time(code): limit_up_time_manager.save_limit_up_time(code, limit_up_time) if code not in code_block_dict: code_block_dict[code] = set() code_block_dict[code].add(block[0]) codes.append(code) block_codes_dict[block[0]] = codes __save_block_codes(block_codes_dict) for key in code_block_dict: __save_code_block(key, code_block_dict[key]) # 保存代码所属板块 def __save_code_block(code, blocks): __get_redis().setex(f"code_blocks-{code}", tool.get_expire(), json.dumps(list(blocks))) # 保存板块下的代码 def __save_block_codes(block_codes): __get_redis().setex(f"blocks_codes", tool.get_expire(), json.dumps(block_codes)) def __get_block_codes(): val = __get_redis().get(f"blocks_codes") if val is None: return None return json.loads(val) # 获取代码板块 def get_code_blocks(code): val = __get_redis().get(f"code_blocks-{code}") if val is None: return None return json.loads(val) # 获取板块下的所有代码 def get_block_codes(block): block_codes = __get_block_codes() if block_codes: block_codes.get(block) return None if __name__ == "__main__": # XGBHotBlockDataManager.total_datas=XGBHotBlockDataManager.list_all("2023-03-23") # get_info('002230') print(get_latest_block()) print(get_latest_block())