"""
|
板块辨识度票管理
|
"""
|
import constant
|
from db import mysql_data_delegate as mysql_data
|
from third_data.history_k_data_util import HistoryKDatasUtils
|
from utils import tool
|
|
|
@tool.singleton
|
class BlockSpecialCodesManager:
|
__block_codes_dict = {}
|
|
def __init__(self):
|
self.mysql = mysql_data.Mysqldb()
|
self.__load_data()
|
|
def __load_data(self):
|
results = self.mysql.select_all("select _block,_code from block_special_codes")
|
temp_dict = {}
|
for row in results:
|
if row[0] not in temp_dict:
|
temp_dict[row[0]] = set()
|
temp_dict[row[0]].add(row[1])
|
self.__block_codes_dict = temp_dict
|
|
def get_block_codes(self, block):
|
return self.__block_codes_dict.get(block)
|
|
def set_block_codes_list(self, datas):
|
"""
|
设置数据
|
@param datas:[(板块,代码,代码名称,涨停次数,自由市值)]
|
@return:
|
"""
|
self.mysql.execute("delete from block_special_codes")
|
for d in datas:
|
sql = f"insert into block_special_codes(_block,_code,_code_name,_limit_up_count,_zyltgb,_create_time) values('{d[0]}', '{d[2]}', '{d[1]}', {d[3]}, {d[4]}, now())"
|
self.mysql.execute(sql)
|
|
|
class AnalysisBlockSpecialCodesManager:
|
__mysql = mysql_data.Mysqldb()
|
|
def __get_code_blocks(self, min_day, max_day):
|
results = self.__mysql.select_all(
|
f"SELECT r.`_hot_block_name`, r.`_code`, COUNT(*) FROM kpl_limit_up_record r WHERE r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' group by r.`_hot_block_name`, r.`_code`")
|
return results
|
|
def __get_limit_up_info(self, min_day):
|
sql = f"SELECT r.`_code`, COUNT(r.`_code`),r.`_code_name`,IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 ) FROM (SELECT * FROM kpl_limit_up_record r ORDER BY r.`_create_time` DESC) r WHERE r.`_day`>'{min_day}' GROUP BY r.`_code`"
|
results = self.__mysql.select_all(sql)
|
# {"代码":(涨停次数, 名称, 自由流通市值)}
|
return {x[0]: (x[1], x[2], x[3]) for x in results}
|
|
def __get_top(self, block, min_day, max_day):
|
sql = f"SELECT r.`_code`,r.`_code_name`, COUNT(*), IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 ) FROM (SELECT * FROM kpl_limit_up_record r ORDER BY r.`_create_time` DESC) r WHERE r.`_hot_block_name`='{block}' AND r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' GROUP BY r.`_code`"
|
results = self.__mysql.select_all(sql)
|
results = list(results)
|
info_map = {x[1]: x for x in results}
|
max_count = min(10, len(results) // 2)
|
results.sort(key=lambda x: x[2], reverse=True)
|
names1 = set([x[0] for x in results[:max_count]])
|
results.sort(key=lambda x: x[3], reverse=True)
|
names2 = set([x[0] for x in results[:max_count]])
|
fnames = names1 & names2
|
return fnames
|
|
def __get_block_map(self):
|
"""
|
获取板块裂变
|
@return:
|
"""
|
constant.get_path_prefix()
|
with open(f"{constant.get_path_prefix()}/板块对应.txt", encoding="utf-8") as f:
|
lines = f.readlines()
|
block_map = {}
|
for line in lines:
|
line = line.strip()
|
if line:
|
line = line.replace(",", ",").replace("丨", "|")
|
parent_blocks = set()
|
if line.find("|") >= 0:
|
sts = line.split("|")
|
parent_blocks |= set(sts[0].split(","))
|
line = sts[1]
|
blocks = line.split(",")
|
if not blocks:
|
continue
|
if not parent_blocks:
|
parent_blocks.add(blocks[0])
|
for b in blocks:
|
b = b.strip()
|
if b not in block_map:
|
block_map[b] = set()
|
block_map[b] |= parent_blocks
|
if len(parent_blocks) > 1:
|
# parent加入
|
for b in parent_blocks:
|
if b not in block_map:
|
block_map[b] = set()
|
block_map[b].add(b)
|
return block_map
|
|
def test_block_map(self):
|
print(self.__get_block_map())
|
|
def get_block_special_codes(self):
|
"""
|
获取板块有辨识度的代码
|
@return:
|
"""
|
trading_dates = HistoryKDatasUtils.get_latest_trading_date(15)
|
max_day = trading_dates[-1]
|
min_day = tool.date_sub(max_day, 365)
|
|
block_map = self.__get_block_map()
|
# [(板块名称,代码, 在板块中的涨停次数)]
|
code_block_infos = self.__get_code_blocks(min_day, max_day)
|
code_block_dict = {} # {"代码":{"板块": 涨停次数}}
|
for b in code_block_infos:
|
if b[1] not in code_block_dict:
|
code_block_dict[b[1]] = {}
|
bs = block_map.get(b[0])
|
if not bs:
|
bs = {b[0]}
|
for bb in bs:
|
if bb not in code_block_dict[b[1]]:
|
code_block_dict[b[1]][bb] = 0
|
code_block_dict[b[1]][bb] += b[2]
|
block_codes_dict = {} # {"板块":[(代码,涨停次数)]}
|
for code in code_block_dict:
|
for b in code_block_dict[code]:
|
if b not in block_codes_dict:
|
block_codes_dict[b] = []
|
block_codes_dict[b].append((code, code_block_dict[code][b]))
|
|
limit_up_info_map = self.__get_limit_up_info(min_day)
|
fdatas = []
|
for b in block_codes_dict:
|
# if b != '人工智能':
|
# continue
|
|
if b in constant.KPL_INVALID_BLOCKS:
|
continue
|
|
code_info_list = block_codes_dict[b]
|
# code_info_list.sort(key=lambda x: x[1], reverse=True)
|
max_count = len(code_info_list) // 3
|
# code_info_list.sort(key=lambda x: float(limit_up_info_map[x[0]][2]), reverse=True)
|
# code_info_list.sort(key=lambda x: code_block_dict[x[0]][b], reverse=True)
|
zylt_list = code_info_list[:max_count]
|
zylt_list = [x[0] for x in zylt_list]
|
# fcodes = list(set(count_list) & set(zylt_list))
|
# fcodes.sort(key=lambda x: code_block_dict[x][b], reverse=True)
|
# ffcodes = fcodes[:5]
|
index = 0
|
# 获取股价,是否是ST
|
if not zylt_list:
|
continue
|
juejin_results = HistoryKDatasUtils.get_gp_latest_info(zylt_list, fields="sec_id,sec_level,upper_limit")
|
if juejin_results is None:
|
continue
|
|
juejin_result_dict = {x['sec_id']: (x['sec_id'], x['sec_level'], x['upper_limit']) for x in juejin_results}
|
for code in zylt_list:
|
if code_block_dict[code][b] <= 3:
|
# 累计涨停次数小于3次
|
continue
|
|
if code not in juejin_result_dict:
|
# 查询不到信息
|
continue
|
|
if juejin_result_dict[code][1] != 1:
|
# 非正常票
|
continue
|
|
if juejin_result_dict[code][2] < 3:
|
# 小于3块
|
continue
|
index += 1
|
fdatas.append(
|
(b, limit_up_info_map[code][1], code, code_block_dict[code][b],
|
int(float(limit_up_info_map[code][2]))))
|
if index >= 10:
|
break
|
# BlockSpecialCodesManager().set_block_codes_list(fdatas)
|
return fdatas
|
|
|
if __name__ == "__main__":
|
datas = AnalysisBlockSpecialCodesManager().get_block_special_codes()
|
for d in datas:
|
print(d)
|
# BlockSpecialCodesManager().set_block_codes_list(datas)
|
print(datas)
|