"""
|
热门板块数据处理
|
"""
|
import datetime
|
import json
|
|
from utils import tool
|
from db import redis_manager
|
from db import mysql_data
|
from code_attribute import limit_up_time_manager, gpcode_manager
|
from l2 import code_price_manager
|
from third_data.history_k_data_util import HistoryKDatasUtils
|
|
__redisManager = redis_manager.RedisManager(0)
|
INVALID_BLOCKS = ["其他", "ST股", "ST摘帽", "业绩增长", "业绩预增", "公告", "次新股"]
|
|
|
def __get_redis():
|
return __redisManager.getRedis()
|
|
|
class XGBHotBlockDataManager:
|
total_datas = []
|
__last_datas = {}
|
latest_datas = None
|
|
@classmethod
|
def save(cls, day, datas):
|
|
mysqldb = mysql_data.Mysqldb()
|
# 统计代码所属板块
|
code_block_dict = {}
|
for data in datas:
|
for code_info in data[2]:
|
code = code_info[0][1].split(".")[0]
|
if not code_block_dict.get(code):
|
code_block_dict[code] = set()
|
code_block_dict[code].add(data[0])
|
|
# 如果本次和上次相差的代码且之前有过涨停就判断为炸板
|
if cls.latest_datas:
|
latest_codes = set()
|
for data in cls.latest_datas:
|
for code_info in data[2]:
|
code = code_info[0][1].split(".")[0]
|
latest_codes.add(code)
|
now_codes = set(code_block_dict.keys())
|
del_set = latest_codes - now_codes
|
for code in del_set:
|
# 获取之前是否涨停过
|
results = cls.list_by_code(code, tool.get_now_date())
|
if results:
|
first_limit_up_time = results[0][8]
|
if len(first_limit_up_time) > 6:
|
# 之前涨停过,更新炸板时间
|
for result in results:
|
mysqldb.execute(
|
f"update xgb_hot_block set _open_limit_up_time='{tool.get_now_time_str()}',_update_time=now() where _id='{result[0]}'")
|
cls.latest_datas = datas
|
for data in datas:
|
for code_info in data[2]:
|
code = code_info[0][1].split(".")[0]
|
_id = f"{day}_{data[0]}_{code}"
|
|
result = mysqldb.select_one("select * from xgb_hot_block where _id='{}'".format(_id))
|
limit_up_time = code_info[4]
|
if len(limit_up_time) <= 6:
|
limit_up_time = ''
|
# 00/60开头代码 有涨停时间
|
if code.find("00") == 0 or code.find("60") == 0:
|
if len(limit_up_time) > 6:
|
code_price_manager.Buy1PriceManager.set_limit_up_time(code, limit_up_time)
|
|
if not result:
|
mysqldb.execute(
|
f"insert into xgb_hot_block(_id,_day,_block_name,_code,_limit_up_time,_price,_rate,_update_time,_first_limit_up_time,_code_name) values('{_id}','{day}','{data[0]}','{code}','{code_info[4]}','{code_info[2]}','{code_info[3]}',now(),'{limit_up_time}','{code_info[0][0]}')")
|
else:
|
# 如果上次的数据和这次一样就不更新,否则需要更新数据
|
if cls.__last_datas.get(_id) != code_info:
|
mysqldb.execute(
|
f"update xgb_hot_block set _limit_up_time='{code_info[4]}',_price='{code_info[2]}',_rate='{code_info[3]}',_update_time=now(),_code_name='{code_info[0][0]}' where _id='{_id}'")
|
if (not result[8] or len(result[8]) <= 6) and len(limit_up_time) >= 6:
|
# 没有首次涨停时间
|
mysqldb.execute(
|
f"update xgb_hot_block set _first_limit_up_time='{limit_up_time}',_update_time=now() where _id='{_id}'")
|
cls.__last_datas[_id] = code_info
|
# 获取原来的代码所属板块,删除之前错误的板块
|
old_datas = XGBHotBlockDataManager.list_by_code(code, day)
|
if old_datas:
|
for d in old_datas:
|
if d[2] not in code_block_dict[code]:
|
mysqldb.execute(f"delete from xgb_hot_block where _id='{d[0]}'")
|
# 将今日数据导入到内存中
|
XGBHotBlockDataManager.total_datas = XGBHotBlockDataManager.list_all(tool.get_now_date_str())
|
|
@staticmethod
|
def list_all(day):
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_all(f"select * from xgb_hot_block where _day='{day}'")
|
|
@staticmethod
|
def list_by_code(code, day):
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_all(f"select * from xgb_hot_block where _code='{code}' and _day='{day}'")
|
|
@staticmethod
|
def list_by_block(block_name, day):
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_all(f"select * from xgb_hot_block where _block_name='{block_name}' and _day='{day}'")
|
|
@staticmethod
|
def list_blocks_with_day(days):
|
mysqldb = mysql_data.Mysqldb()
|
sql = "select _block_name,_day from xgb_hot_block where "
|
wheres = []
|
for day in days:
|
wheres.append(f"_day = '{day}'")
|
sql += " or ".join(wheres)
|
sql += " group by _block_name,_day"
|
|
results = mysqldb.select_all(sql)
|
return results
|
|
@staticmethod
|
def get_latest_blocks(code):
|
wheres = []
|
for b in INVALID_BLOCKS:
|
wheres.append(f"hb.`_block_name` != '{b}'")
|
wheres = " and ".join(wheres)
|
sql = f"SELECT GROUP_CONCAT(_block_name) FROM (SELECT hb.`_block_name`,hb.`_day` FROM `xgb_hot_block` hb WHERE hb.`_code`='{code}' AND {wheres} ORDER BY hb.`_day` DESC LIMIT 10) a GROUP BY a._day ORDER BY a._day DESC LIMIT 1"
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_one(sql)
|
|
|
class XGBDataUtil:
|
|
# 板块中是否有高位票
|
@classmethod
|
def is_has_high_code(cls, block_name, latest_datas):
|
if not latest_datas:
|
return False
|
for block in latest_datas:
|
if block[0] != block_name:
|
continue
|
for code_data in block[2]:
|
if code_data[1] and code_data[1].find("板") >= 0:
|
day = code_data[1][-2:-1]
|
if int(day) >= 2:
|
return True, code_data
|
return False, None
|
|
|
__blocks_dict = {}
|
|
|
def get_latest_block(day_count=15):
|
now_day = tool.get_now_date_str()
|
if now_day in __blocks_dict:
|
return __blocks_dict[now_day]
|
now_date = datetime.datetime.now()
|
end_date = HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
|
start_date = now_date - datetime.timedelta(days=(day_count * 2 + 10))
|
start_date = start_date.strftime("%Y-%m-%d")
|
days = HistoryKDatasUtils.get_trading_dates(start_date, end_date)
|
days = days[0 - day_count:]
|
results = XGBHotBlockDataManager.list_blocks_with_day(days)
|
__blocks_dict[now_day] = results
|
return results
|
|
|
# 加载之前的板块
|
__before_block_dict = {}
|
|
|
def __load_before_block(code):
|
if code not in __before_block_dict:
|
blocks = XGBHotBlockDataManager.get_latest_blocks(code)
|
if blocks:
|
blocks = blocks[0].split(',')
|
__before_block_dict[code] = blocks
|
|
return __before_block_dict.get(code)
|
|
|
def __get_code_from_code_info(code_info):
|
code = code_info[0][1].split(".")[0]
|
return code
|
|
|
# 获取代码所在板块信息
|
def get_info(code):
|
blocks = get_code_blocks(code)
|
if not blocks:
|
blocks = __load_before_block(code)
|
latest_blocks_info = get_latest_block(15)
|
# 统计板块出现次数
|
|
# 获取目标板块
|
target_block = None
|
if blocks:
|
for block in blocks:
|
if block in INVALID_BLOCKS:
|
continue
|
target_block = block
|
break
|
if not target_block:
|
return None
|
|
# 统计板块中的平均涨幅
|
total_rates = 0
|
total_count = 0
|
target_block_rate = 0
|
if XGBHotBlockDataManager.latest_datas:
|
for b in XGBHotBlockDataManager.latest_datas:
|
if b[0] != target_block:
|
continue
|
if b[1]:
|
target_block_rate = float(b[1].strip().replace("%", ''))
|
for code_info in b[2]:
|
code__ = __get_code_from_code_info(code_info)
|
rate = float(code_info[3].strip().replace("%", ''))
|
if code__ != code:
|
total_rates += rate
|
total_count += 1
|
|
# 统计板块出现的次数
|
target_block_histry_count = 0
|
if blocks:
|
for block_info in latest_blocks_info:
|
if block_info[0] != target_block:
|
continue
|
target_block_histry_count += 1
|
|
# 是否出现过高位板
|
high_block_infos = []
|
for block in blocks:
|
if block == '公告' or block == '其他':
|
continue
|
if XGBHotBlockDataManager.latest_datas:
|
has_high, high_code_info = XGBDataUtil.is_has_high_code(block, XGBHotBlockDataManager.latest_datas)
|
if has_high:
|
high_block_info = (high_code_info[0], high_code_info[1])
|
high_block_infos.append(high_block_info)
|
|
limit_up_codes_info_set = set()
|
# 板块下的代码数量
|
block_codes_set = set()
|
if XGBHotBlockDataManager.latest_datas:
|
for block in XGBHotBlockDataManager.latest_datas:
|
if block[0] == target_block:
|
for code_data in block[2]:
|
code_ = code_data[0][1].split('.')[0]
|
if len(code_data[4]) > 6:
|
limit_up_codes_info_set.add((code_, code_data[4]))
|
block_codes_set.add(code_)
|
# 获取涨停的顺序
|
limit_up_index = -1
|
limit_up_codes_info_list = list(limit_up_codes_info_set)
|
limit_up_codes_info_list.sort(key=lambda x: x[1])
|
for i in range(0, len(limit_up_codes_info_list)):
|
if limit_up_codes_info_list[i][0] == code:
|
limit_up_index = i
|
if limit_up_index < 0:
|
limit_up_index = len(limit_up_codes_info_list)
|
|
# 涨停代码集合
|
limit_up_codes_set = set([k[0] for k in limit_up_codes_info_list])
|
|
limit_up_codes_set.discard(code)
|
block_codes_set.discard(code)
|
limit_up_codes_count = len(block_codes_set)
|
|
total_datas = XGBHotBlockDataManager.total_datas
|
break_codes = set()
|
re_limit_codes = set()
|
for data in total_datas:
|
block = data[2]
|
if block != target_block:
|
continue
|
code = data[3]
|
limit_up_time = data[4]
|
first_limit_up_time = data[8]
|
open_limit_up_time = data[10]
|
if len(limit_up_time) <= 6 and first_limit_up_time and len(first_limit_up_time) > 6:
|
break_codes.add(code)
|
if len(limit_up_time) > 6 and open_limit_up_time and len(open_limit_up_time) > 6:
|
re_limit_codes.add(code)
|
|
# 排除自己
|
break_codes.discard(code)
|
# 排除已经涨停的代码
|
break_codes = break_codes.difference(limit_up_codes_set)
|
# 炸板个数
|
break_size = len(break_codes)
|
|
# 炸板回封数量
|
re_limit_up_size = len(re_limit_codes)
|
fresult = {
|
# 目标板块信息(板块名称,板块涨幅,历史板块出现次数)
|
"target_block_info": (target_block, target_block_rate, target_block_histry_count),
|
# 涨停顺序
|
"limit_up_index": limit_up_index,
|
# 涨停代码数量
|
"limit_up_codes_count": limit_up_codes_count,
|
# 板块代码涨幅信息
|
"block_codes_rates_info": (total_rates, total_count),
|
# 炸板代码数量
|
"break_size": break_size,
|
# 炸板回封数量
|
"re_limit_up_size": re_limit_up_size,
|
# 高位版信息
|
"high_block_infos": high_block_infos,
|
}
|
|
return fresult
|
|
|
# 保存数据
|
def save_datas(day, datas):
|
XGBHotBlockDataManager.save(day, datas)
|
code_block_dict = {}
|
block_codes_dict = {}
|
for block in datas:
|
codes = []
|
for code_data in block[2]:
|
code = code_data[0][1].split(".")[0]
|
if gpcode_manager.FirstCodeManager.is_in_first_record(code):
|
limit_up_time = code_data[4]
|
if limit_up_time and len(limit_up_time) > 6:
|
if not limit_up_time_manager.get_limit_up_time(code):
|
limit_up_time_manager.save_limit_up_time(code, limit_up_time)
|
if code not in code_block_dict:
|
code_block_dict[code] = set()
|
code_block_dict[code].add(block[0])
|
codes.append(code)
|
block_codes_dict[block[0]] = codes
|
__save_block_codes(block_codes_dict)
|
for key in code_block_dict:
|
__save_code_block(key, code_block_dict[key])
|
|
|
# 保存代码所属板块
|
def __save_code_block(code, blocks):
|
__get_redis().setex(f"code_blocks-{code}", tool.get_expire(), json.dumps(list(blocks)))
|
|
|
# 保存板块下的代码
|
def __save_block_codes(block_codes):
|
__get_redis().setex(f"blocks_codes", tool.get_expire(), json.dumps(block_codes))
|
|
|
def __get_block_codes():
|
val = __get_redis().get(f"blocks_codes")
|
if val is None:
|
return None
|
return json.loads(val)
|
|
|
# 获取代码板块
|
def get_code_blocks(code):
|
val = __get_redis().get(f"code_blocks-{code}")
|
if val is None:
|
return None
|
return json.loads(val)
|
|
|
# 获取板块下的所有代码
|
def get_block_codes(block):
|
block_codes = __get_block_codes()
|
if block_codes:
|
block_codes.get(block)
|
return None
|
|
|
if __name__ == "__main__":
|
# XGBHotBlockDataManager.total_datas=XGBHotBlockDataManager.list_all("2023-03-23")
|
# get_info('002230')
|
print(__load_before_block("603163"))
|