"""
|
热门板块数据处理
|
"""
|
import json
|
|
import tool
|
from db import redis_manager
|
from db import mysql_data
|
|
__redisManager = redis_manager.RedisManager(0)
|
|
|
def __get_redis():
|
return __redisManager.getRedis()
|
|
|
class XGBHotBlockDataManager:
|
total_datas = []
|
__last_datas = {}
|
latest_datas = None
|
|
@classmethod
|
def save(cls, day, datas):
|
cls.latest_datas = datas
|
mysqldb = mysql_data.Mysqldb()
|
# 统计代码所属板块
|
code_block_dict = {}
|
for data in datas:
|
for code_info in data[2]:
|
code = code_info[0].split(".")[0]
|
if not code_block_dict.get(code):
|
code_block_dict[code] = set()
|
code_block_dict[code].add(data[0])
|
|
for data in datas:
|
for code_info in data[2]:
|
code = code_info[0].split(".")[0]
|
_id = f"{day}_{data[0]}_{code}"
|
|
result = mysqldb.select_one("select * from xgb_hot_block where _id='{}'".format(_id))
|
limit_up_time = code_info[4]
|
if len(limit_up_time) <= 6:
|
limit_up_time = ''
|
if not result:
|
mysqldb.execute(
|
f"insert into xgb_hot_block(_id,_day,_block_name,_code,_limit_up_time,_price,_rate,_update_time,_first_limit_up_time) values('{_id}','{day}','{data[0]}','{code}','{code_info[4]}','{code_info[2]}','{code_info[3]}',now(),'{limit_up_time}')")
|
else:
|
# 如果上次的数据和这次一样就不更新,否则需要更新数据
|
if cls.__last_datas.get(_id) != code_info:
|
mysqldb.execute(
|
f"update xgb_hot_block set _limit_up_time='{code_info[4]}',_price='{code_info[2]}',_rate='{code_info[3]}',_update_time=now() where _id='{_id}'")
|
if (not result[8] or len(result[8]) <= 6) and len(limit_up_time) >= 6:
|
mysqldb.execute(
|
f"update xgb_hot_block set _first_limit_up_time='{limit_up_time}',_update_time=now() where _id='{_id}'")
|
|
cls.__last_datas[_id] = code_info
|
# 获取原来的代码所属板块,删除之前错误的板块
|
old_datas = XGBHotBlockDataManager.list_by_code(code, day)
|
if old_datas:
|
for d in old_datas:
|
if d[2] not in code_block_dict[code]:
|
mysqldb.execute(f"delete from xgb_hot_block where _id='{d[0]}'")
|
# 将今日数据导入到内存中
|
XGBHotBlockDataManager.total_datas = XGBHotBlockDataManager.list_all(tool.get_now_date_str())
|
|
@staticmethod
|
def list_all(day):
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_all(f"select * from xgb_hot_block where _day='{day}'")
|
|
@staticmethod
|
def list_by_code(code, day):
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_all(f"select * from xgb_hot_block where _code='{code}' and _day='{day}'")
|
|
@staticmethod
|
def list_by_block(block_name, day):
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_all(f"select * from xgb_hot_block where _block_name='{block_name}' and _day='{day}'")
|
|
|
# 获取代码所在板块信息
|
def get_info(code):
|
blocks = get_code_blocks(code)
|
|
target_block = None
|
if blocks:
|
for block in blocks:
|
if block == '公告' or block == '其他':
|
continue
|
target_block = block
|
break
|
if not target_block:
|
return None
|
|
limit_up_codes_set = set()
|
if XGBHotBlockDataManager.latest_datas:
|
for block in XGBHotBlockDataManager.latest_datas:
|
if block[0] == target_block:
|
for code_data in block[2]:
|
if len(code_data[4]) > 6:
|
limit_up_codes_set.add(code_data[0].split('.')[0])
|
limit_up_codes_set.discard(code)
|
limit_up_count = len(limit_up_codes_set)
|
|
total_datas = XGBHotBlockDataManager.total_datas
|
break_codes = set()
|
for data in total_datas:
|
block = data[2]
|
if block != target_block:
|
continue
|
code = data[3]
|
limit_up_time = data[4]
|
first_limit_up_time = data[8]
|
if len(limit_up_time) <= 6 and first_limit_up_time and len(first_limit_up_time) > 6:
|
break_codes.add(code)
|
# 排除自己
|
break_codes.discard(code)
|
# 排除已经涨停的代码
|
break_codes = break_codes.difference(limit_up_codes_set)
|
# 炸板个数
|
break_size = len(break_codes)
|
return target_block, limit_up_count, break_size
|
|
|
# 保存数据
|
def save_datas(day, datas):
|
XGBHotBlockDataManager.save(day, datas)
|
code_block_dict = {}
|
block_codes_dict = {}
|
for block in datas:
|
codes = []
|
for code_data in block[2]:
|
code = code_data[0].split(".")[0]
|
if code not in code_block_dict:
|
code_block_dict[code] = set()
|
code_block_dict[code].add(block[0])
|
codes.append(code)
|
block_codes_dict[block[0]] = codes
|
__save_block_codes(block_codes_dict)
|
for key in code_block_dict:
|
__save_code_block(key, code_block_dict[key])
|
|
|
# 保存代码所属板块
|
def __save_code_block(code, blocks):
|
__get_redis().setex(f"code_blocks-{code}", tool.get_expire(), json.dumps(list(blocks)))
|
|
|
# 保存板块下的代码
|
def __save_block_codes(block_codes):
|
__get_redis().setex(f"blocks_codes", tool.get_expire(), json.dumps(block_codes))
|
|
|
def __get_block_codes():
|
val = __get_redis().get(f"blocks_codes")
|
if val is None:
|
return None
|
return json.loads(val)
|
|
|
# 获取代码板块
|
def get_code_blocks(code):
|
val = __get_redis().get(f"code_blocks-{code}")
|
if val is None:
|
return None
|
return json.loads(val)
|
|
|
# 获取板块下的所有代码
|
def get_block_codes(block):
|
block_codes = __get_block_codes()
|
if block_codes:
|
block_codes.get(block)
|
return None
|
|
|
if __name__ == "__main__":
|
# XGBHotBlockDataManager.total_datas=XGBHotBlockDataManager.list_all("2023-03-23")
|
# get_info('002230')
|
codes = set([1, 2, 3, 4])
|
print(codes.difference(set([1, 2])))
|