import json
|
import os
|
|
import constant
|
import tool
|
|
# 开盘啦历史涨停数据管理
|
from db import mysql_data
|
from l2 import code_price_manager
|
from third_data import kpl_util
|
|
INVALID_BLOCKS = ["一季报增长", "二季报增长", "三季报增长", "四季报增长", "业绩增长", "中报增长", "年报增长", "年报预增", "无", "次新股", "ST摘帽", "超跌", "股权转让",
|
"并购重组"]
|
|
|
class KPLLimitUpDataRecordManager:
|
total_datas = None
|
latest_datas = {}
|
|
@classmethod
|
def save_record(cls, day, records):
|
# 统计代码所属板块
|
code_block_dict = {}
|
for data in records:
|
blocks = set(data[5].split("、"))
|
code = data[0]
|
for b in blocks:
|
if not code_block_dict.get(code):
|
code_block_dict[code] = set()
|
code_block_dict[code].add(b)
|
|
# 涨停数据记录
|
mysqldb = mysql_data.Mysqldb()
|
for d in records:
|
# (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额)
|
code = d[0]
|
_id = f"{day}_{code}_{d[5]}"
|
|
result = mysqldb.select_one("select * from kpl_limit_up_record where _id='{}'".format(_id))
|
if not result:
|
mysqldb.execute(
|
f"insert into kpl_limit_up_record(_id,_day,_hot_block_name,_code,_code_name,_limit_up_time,_blocks,_latest_limit_up_time,_update_time,_create_time) values('{_id}','{day}','{d[5]}','{d[0]}','{d[1]}','{d[2]}','{d[6]}','{d[3]}',now(),now())")
|
else:
|
if _id in cls.latest_datas and json.dumps(cls.latest_datas.get(_id)) != json.dumps(d):
|
mysqldb.execute(
|
f"update kpl_limit_up_record set _latest_limit_up_time='{d[3]}',_limit_up_time='{d[2]}' ,_update_time=now() where _id='{_id}'")
|
cls.latest_datas[_id] = d
|
cls.latest_datas[_id] = d
|
|
# 获取原来的代码所属板块,删除之前错误的板块
|
old_datas = KPLLimitUpDataRecordManager.list_by_code(code, day)
|
if old_datas:
|
for dd in old_datas:
|
if dd[2] not in code_block_dict[code]:
|
mysqldb.execute(f"delete from kpl_limit_up_record where _id='{dd[0]}'")
|
if dd[0] in cls.latest_datas:
|
cls.latest_datas.pop(dd[0])
|
cls.total_datas = KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
|
|
@classmethod
|
def load_total_datas(cls):
|
cls.total_datas = KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
|
|
@staticmethod
|
def list_all(day):
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_all(f"select * from kpl_limit_up_record where _day='{day}'")
|
|
@staticmethod
|
def list_by_code(code, day):
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_all(f"select * from kpl_limit_up_record where _code='{code}' and _day='{day}'")
|
|
@staticmethod
|
def list_by_block(block_name, day):
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_all(
|
f"select * from kpl_limit_up_record where _hot_block_name='{block_name}' and _day='{day}'")
|
|
@staticmethod
|
def list_blocks_with_day(days):
|
mysqldb = mysql_data.Mysqldb()
|
sql = "select _hot_block_name,_day from kpl_limit_up_record where "
|
wheres = []
|
for day in days:
|
wheres.append(f"_day = '{day}'")
|
sql += " or ".join(wheres)
|
sql += " group by _hot_block_name,_day"
|
|
results = mysqldb.select_all(sql)
|
return results
|
|
@staticmethod
|
def get_latest_blocks(code):
|
wheres = []
|
for b in INVALID_BLOCKS:
|
wheres.append(f"hb.`_hot_block_name` != '{b}'")
|
wheres = " and ".join(wheres)
|
sql = f"SELECT GROUP_CONCAT(_hot_block_name) FROM (SELECT hb.`_hot_block_name`,hb.`_day` FROM `kpl_limit_up_record` hb WHERE hb.`_code`='{code}' AND {wheres} ORDER BY hb.`_day` DESC LIMIT 10) a GROUP BY a._day ORDER BY a._day DESC LIMIT 1"
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_one(sql)
|
|
# 获取代码最近的板块,返回[(板块,日期)]
|
@staticmethod
|
def get_latest_infos(code, count):
|
sql = f"SELECT GROUP_CONCAT(_hot_block_name),`_day` FROM (SELECT hb.`_hot_block_name`,hb.`_day` FROM `kpl_limit_up_record` hb WHERE hb.`_code`='{code}' ORDER BY hb.`_day` DESC LIMIT 10) a GROUP BY a._day ORDER BY a._day DESC LIMIT {count}"
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_all(sql)
|
|
|
class KPLDataManager:
|
__latest_datas = {}
|
|
def __save_in_file(self, key, datas):
|
name = f"{tool.get_now_date_str()}_{key}.log"
|
path = f"{constant.CACHE_PATH}/{name}"
|
with open(path, 'w') as f:
|
f.write(json.dumps(datas))
|
|
def __get_from_file(self, key, day=tool.get_now_date_str()):
|
name = f"{day}_{key}.log"
|
path = f"{constant.CACHE_PATH}/{name}"
|
if not os.path.exists(path):
|
return None
|
with open(path, 'r') as f:
|
lines = f.readlines()
|
if lines:
|
return json.loads(lines[0])
|
return None
|
|
def get_from_file(self, type, day):
|
name = f"{day}_{type.value}.log"
|
path = f"{constant.CACHE_PATH}/{name}"
|
if not os.path.exists(path):
|
return None
|
with open(path, 'r') as f:
|
lines = f.readlines()
|
if lines:
|
return json.loads(lines[0])
|
return None
|
|
def save_data(self, type, datas):
|
self.__latest_datas[type] = datas
|
self.__save_in_file(type, datas)
|
|
def get_data(self, type):
|
type = type.value
|
if type in self.__latest_datas:
|
return self.__latest_datas[type]
|
result = self.__get_from_file(type)
|
if result is not None:
|
self.__latest_datas[type] = result
|
return result
|
|
|
def load_history_limit_up():
|
for file_name in os.listdir("D:/kpl/his"):
|
if file_name.find("HisDaBanList_1.log") < 0:
|
continue
|
day = file_name[:10]
|
with open(f"D:/kpl/his/{file_name}", 'r', encoding="utf-16") as f:
|
lines = f.readlines()
|
line = lines[0]
|
result = json.loads(line)
|
list_ = kpl_util.parseDaBanData(result, kpl_util.DABAN_TYPE_LIMIT_UP)
|
KPLLimitUpDataRecordManager.save_record(day, list_)
|
|
print(day, list_)
|
|
|
if __name__ == "__main__":
|
print(KPLLimitUpDataRecordManager.get_latest_infos("000950", 4))
|