""" 热门板块数据处理 """ import json import tool from db import redis_manager from db import mysql_data __redisManager = redis_manager.RedisManager(0) def __get_redis(): return __redisManager.getRedis() class XGBHotBlockDataManager: total_datas = [] __last_datas = {} latest_datas = None @classmethod def save(cls, day, datas): cls.latest_datas = datas mysqldb = mysql_data.Mysqldb() # 统计代码所属板块 code_block_dict = {} for data in datas: for code_info in data[2]: code = code_info[0].split(".")[0] if not code_block_dict.get(code): code_block_dict[code] = set() code_block_dict[code].add(data[0]) for data in datas: for code_info in data[2]: code = code_info[0].split(".")[0] _id = f"{day}_{data[0]}_{code}" result = mysqldb.select_one("select * from xgb_hot_block where _id='{}'".format(_id)) limit_up_time = code_info[4] if len(limit_up_time) <= 6: limit_up_time = '' if not result: mysqldb.execute( f"insert into xgb_hot_block(_id,_day,_block_name,_code,_limit_up_time,_price,_rate,_update_time,_first_limit_up_time) values('{_id}','{day}','{data[0]}','{code}','{code_info[4]}','{code_info[2]}','{code_info[3]}',now(),'{limit_up_time}')") else: # 如果上次的数据和这次一样就不更新,否则需要更新数据 if cls.__last_datas.get(_id) != code_info: mysqldb.execute( f"update xgb_hot_block set _limit_up_time='{code_info[4]}',_price='{code_info[2]}',_rate='{code_info[3]}',_update_time=now() where _id='{_id}'") if (not result[8] or len(result[8]) <= 6) and len(limit_up_time) >= 6: mysqldb.execute( f"update xgb_hot_block set _first_limit_up_time='{limit_up_time}',_update_time=now() where _id='{_id}'") cls.__last_datas[_id] = code_info # 获取原来的代码所属板块,删除之前错误的板块 old_datas = XGBHotBlockDataManager.list_by_code(code, day) if old_datas: for d in old_datas: if d[2] not in code_block_dict[code]: mysqldb.execute(f"delete from xgb_hot_block where _id='{d[0]}'") # 将今日数据导入到内存中 XGBHotBlockDataManager.total_datas = XGBHotBlockDataManager.list_all(tool.get_now_date_str()) @staticmethod def list_all(day): mysqldb = mysql_data.Mysqldb() return mysqldb.select_all(f"select * from xgb_hot_block where _day='{day}'") @staticmethod def list_by_code(code, day): mysqldb = mysql_data.Mysqldb() return mysqldb.select_all(f"select * from xgb_hot_block where _code='{code}' and _day='{day}'") @staticmethod def list_by_block(block_name, day): mysqldb = mysql_data.Mysqldb() return mysqldb.select_all(f"select * from xgb_hot_block where _block_name='{block_name}' and _day='{day}'") # 获取代码所在板块信息 def get_info(code): blocks = get_code_blocks(code) target_block = None if blocks: for block in blocks: if block == '公告' or block == '其他': continue target_block = block break if not target_block: return None limit_up_codes_set = set() if XGBHotBlockDataManager.latest_datas: for block in XGBHotBlockDataManager.latest_datas: if block[0] == target_block: for code_data in block[2]: if len(code_data[4]) > 6: limit_up_codes_set.add(code_data[0].split('.')[0]) limit_up_codes_set.discard(code) limit_up_count = len(limit_up_codes_set) total_datas = XGBHotBlockDataManager.total_datas break_codes = set() for data in total_datas: block = data[2] if block != target_block: continue code = data[3] limit_up_time = data[4] first_limit_up_time = data[8] if len(limit_up_time) <= 6 and first_limit_up_time and len(first_limit_up_time) > 6: break_codes.add(code) # 排除自己 break_codes.discard(code) # 排除已经涨停的代码 break_codes = break_codes.difference(limit_up_codes_set) # 炸板个数 break_size = len(break_codes) return target_block, limit_up_count, break_size # 保存数据 def save_datas(day, datas): XGBHotBlockDataManager.save(day, datas) code_block_dict = {} block_codes_dict = {} for block in datas: codes = [] for code_data in block[2]: code = code_data[0].split(".")[0] if code not in code_block_dict: code_block_dict[code] = set() code_block_dict[code].add(block[0]) codes.append(code) block_codes_dict[block[0]] = codes __save_block_codes(block_codes_dict) for key in code_block_dict: __save_code_block(key, code_block_dict[key]) # 保存代码所属板块 def __save_code_block(code, blocks): __get_redis().setex(f"code_blocks-{code}", tool.get_expire(), json.dumps(list(blocks))) # 保存板块下的代码 def __save_block_codes(block_codes): __get_redis().setex(f"blocks_codes", tool.get_expire(), json.dumps(block_codes)) def __get_block_codes(): val = __get_redis().get(f"blocks_codes") if val is None: return None return json.loads(val) # 获取代码板块 def get_code_blocks(code): val = __get_redis().get(f"code_blocks-{code}") if val is None: return None return json.loads(val) # 获取板块下的所有代码 def get_block_codes(block): block_codes = __get_block_codes() if block_codes: block_codes.get(block) return None if __name__ == "__main__": # XGBHotBlockDataManager.total_datas=XGBHotBlockDataManager.list_all("2023-03-23") # get_info('002230') codes = set([1, 2, 3, 4]) print(codes.difference(set([1, 2])))