"""
|
开盘啦板块工具
|
"""
|
|
# 是否是强势板块
|
# current_limit_up_datas:实时涨停数据 (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量)
|
import datetime
|
import time
|
|
import constant
|
from utils import tool
|
|
|
# 是否主板开1
|
# limit_up_record_datas 今日历史涨停
|
def get_shsz_open_limit_up_codes(code, block, limit_up_record_datas, code_block_dict):
|
# 获取今日9:30的时间戳
|
time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
|
timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
|
limit_up_codes = set()
|
if limit_up_record_datas:
|
for k in limit_up_record_datas:
|
if code_block_dict.get(k[3]) == block:
|
if int(k[5]) < timestamp:
|
limit_up_codes.add(k[3])
|
return limit_up_codes
|
|
|
# 获取主板开1且目前是涨停的代码
|
def get_shsz_open_limit_up_codes_current(code, block, current_limit_up_datas):
|
# 获取今日9:30的时间戳
|
time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
|
timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
|
limit_up_codes = set()
|
for k in current_limit_up_datas:
|
if k[5] == block:
|
if int(k[2]) < timestamp:
|
limit_up_codes.add(k[0])
|
return limit_up_codes
|
|
|
# 代码是否是后排
|
def is_back_row(code, block, current_limit_up_datas):
|
codes = set()
|
for k in current_limit_up_datas:
|
if k[5] == block:
|
codes.add(k[0])
|
codes.discard(code)
|
if len(codes) == 0:
|
return False
|
else:
|
return True
|
|
|
# 是否是前几的板块
|
# 板块中有主板涨停的才参与排序(排序时间按照板块中的涨停时间来排序)
|
|
def __is_top_block(block, block_codes_infos, topn):
|
block_limit_up_dict = {}
|
for b in block_codes_infos:
|
if b[1] not in block_limit_up_dict:
|
block_limit_up_dict[b[1]] = []
|
block_limit_up_dict[b[1]].append(b)
|
# 剔除只有非主板涨停的板块
|
invalid_blocks = []
|
for k in block_limit_up_dict:
|
has_shsz = False
|
for b in block_limit_up_dict[k]:
|
if tool.is_can_buy_code(b[0]):
|
has_shsz = True
|
break
|
if not has_shsz:
|
invalid_blocks.append(k)
|
for k in invalid_blocks:
|
block_limit_up_dict.pop(k)
|
|
# 每个板块涨停时间排序
|
invalid_blocks = []
|
for k in block_limit_up_dict:
|
# 删除宽泛概念
|
if k in constant.KPL_INVALID_BLOCKS:
|
invalid_blocks.append(k)
|
continue
|
block_limit_up_dict[k].sort(key=lambda x: x[2])
|
|
for k in invalid_blocks:
|
block_limit_up_dict.pop(k)
|
|
block_codes_infos = [block_limit_up_dict[k][0] for k in block_limit_up_dict]
|
block_codes_infos.sort(key=lambda x: x[2])
|
# 去除通用涨停原因
|
index = 0
|
for d in block_codes_infos:
|
if d[1] == block:
|
if index + 1 <= topn:
|
return True, block_codes_infos[:topn]
|
else:
|
return False, block_codes_infos[:topn]
|
index += 1
|
if index <= topn:
|
return True, block_codes_infos[:topn]
|
return False, block_codes_infos[:topn]
|
|
|
def is_record_top_block(code, block, limit_up_record_datas, yesterday_current_limit_up_codes, topn):
|
block_codes_infos = []
|
limit_up_time = time.time()
|
for k in limit_up_record_datas:
|
# 判断是否是首板
|
if k[0] in yesterday_current_limit_up_codes:
|
continue
|
|
if k[3] != code:
|
block_codes_infos.append((k[3], k[2], int(k[5])))
|
else:
|
limit_up_time = int(k[5])
|
block_codes_infos.append((code, block, limit_up_time))
|
# 排序
|
return __is_top_block(block, block_codes_infos, topn)
|
|
|
def is_current_top_block(code, block, current_limit_up_datas, yesterday_current_limit_up_codes, topn):
|
block_codes_infos = []
|
limit_up_time = time.time()
|
for k in current_limit_up_datas:
|
# 判断是否是首板
|
if k[0] in yesterday_current_limit_up_codes:
|
continue
|
if k[0] != code:
|
block_codes_infos.append((k[0], k[5], int(k[2])))
|
else:
|
limit_up_time = int(k[2])
|
# 排序
|
block_codes_infos.append((code, block, limit_up_time))
|
# 排序
|
return __is_top_block(block, block_codes_infos, topn)
|
|
|
# 获取当日历史身位
|
# shsz:是否主板
|
def get_code_record_rank(code, block, limit_up_record_datas, code_limit_up_reason_dict,
|
yesterday_current_limit_up_codes, shsz=True):
|
block_codes_infos = []
|
limit_up_time = time.time()
|
for k in limit_up_record_datas:
|
if k[3] == code:
|
# 获取当前代码涨停时间
|
limit_up_time = int(k[5])
|
if shsz and not tool.is_can_buy_code(k[3]):
|
continue
|
# 剔除高位板
|
if k[3] in yesterday_current_limit_up_codes:
|
continue
|
if code_limit_up_reason_dict.get(k[3]) == block:
|
if k[3] != code:
|
block_codes_infos.append((k[3], int(k[5])))
|
|
block_codes_infos.append((code, limit_up_time))
|
block_codes_infos.sort(key=lambda x: x[1])
|
front_codes = []
|
for i in range(0, len(block_codes_infos)):
|
if block_codes_infos[i][0] == code:
|
return i, front_codes
|
else:
|
front_codes.append(block_codes_infos[i][0])
|
return 0, []
|
|
|
# 获取当日实时身位
|
# before_blocks_dict格式位{"代码":set("板块")}
|
def get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reasons_dict,
|
yesterday_current_limit_up_codes, exclude_codes, open_limit_up_count, shsz=False,
|
limit_up_time=time.time()):
|
block_codes_infos = []
|
for k in current_limit_up_datas:
|
if k[0] == code:
|
# 获取当前代码涨停时间
|
limit_up_time = int(k[2])
|
if shsz and not tool.is_can_buy_code(k[0]):
|
continue
|
# 剔除高位板
|
if k[0] in yesterday_current_limit_up_codes:
|
continue
|
if code_limit_up_reasons_dict.get(k[0]) and block in code_limit_up_reasons_dict.get(k[0]):
|
if k[0] != code:
|
# 代码.涨停时间
|
block_codes_infos.append((k[0], int(k[2])))
|
block_codes_infos.append((code, limit_up_time))
|
block_codes_infos.sort(key=lambda x: x[1])
|
front_codes = []
|
first_count = 0
|
for i in range(0, len(block_codes_infos)):
|
if i == open_limit_up_count and exclude_codes and block_codes_infos[i][0] in exclude_codes:
|
# 非开1老大被排除
|
first_count += 1
|
continue
|
if block_codes_infos[i][0] == code:
|
return i - first_count, front_codes
|
else:
|
front_codes.append(block_codes_infos[i][0])
|
return 0, []
|
|
|
# 开1时间范围
|
open_limit_up_time_range = time.mktime(
|
time.strptime(tool.get_now_date_str() + " 09:25:00", '%Y-%m-%d %H:%M:%S')), time.mktime(
|
time.strptime(tool.get_now_date_str() + " 09:30:00", '%Y-%m-%d %H:%M:%S'))
|
|
if __name__ == "__main__":
|
print(open_limit_up_time_range)
|