"""
|
kpl API数据获取与处理
|
"""
|
import json
|
import os.path
|
import time
|
import datetime
|
|
import dask
|
import requests
|
|
import constant
|
from log_module import async_log_util
|
from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_Overall_market_strength_score, \
|
logger_stock_of_markets_plate
|
# import requests
|
from strategy import data_cache
|
from strategy import basic_methods
|
from strategy.index_market_trend_strategy import index_trend_expectation
|
|
from trade import middle_api_protocol
|
from utils import hx_qc_value_util, tool
|
|
# 获取logger实例
|
logger = logger_common
|
|
now = time.time()
|
print(f"kpl_api开始运行--{now}")
|
|
# 竞价
|
DABAN_TYPE_BIDDING = 8
|
# 涨停
|
DABAN_TYPE_LIMIT_UP = 1
|
# 炸板
|
DABAN_TYPE_OPEN_LIMIT_UP = 2
|
# 跌停
|
DABAN_TYPE_LIMIT_DOWN = 3
|
# 曾跌停
|
DABAN_TYPE_EVER_LIMIT_DOWN = 5
|
|
|
def __base_request(url, data, timeout=10):
|
DELEGATE = True
|
if not DELEGATE:
|
headers = {
|
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
|
"User-Agent": "Dalvik / 2.1.0(Linux;U;Android 6.0.1;MuMu Build/V417IR)"
|
}
|
# proxies={'https': '192.168.3.251:9002'}
|
# 禁止代理,不然会走本地代理
|
response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None},
|
timeout=timeout)
|
if response.status_code != 200:
|
raise Exception("请求出错")
|
return response.text
|
else:
|
fdata = middle_api_protocol.load_kpl(url, data, timeout)
|
return middle_api_protocol.request(fdata)
|
|
|
def daBanList(pidType):
|
data = "Order=1&a=DaBanList&st=100&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23" \
|
f"&VerSion=5.8.0.2&Index=0&Is_st=1&PidType={pidType}&apiv=w32&Type=4&FilterMotherboard=0&Filter=0&FilterTIB=0" \
|
"&FilterGem=0 "
|
result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
|
return result
|
|
|
# 市场行情-行业 主力净额由高到低排序 实时获取
|
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
|
data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
|
result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
|
data=data)
|
return result
|
|
|
# 市场行情-精选 主力净额由高到低排序 实时获取 st=20 请求地址中 st=20 代表获取前20(排序方式为主力净额降序排列)
|
# def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
|
# data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=10&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
|
# result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
|
# data=data)
|
# return result
|
# def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
|
# data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=3&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
|
# result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
|
# data=data)
|
# return result
|
|
# #市场行情-精选 (排序方式为强度降序排列) st=20 请求地址中 st=20 代表获取前20
|
def getMarketJingXuanRealRankingInfo():
|
data = f"Order=1&a=RealRankingInfo&st=20&a=RealRankingInfo&apiv=w35&Type=1&Index=0&c=ZhiShuRanking&VerSion=5.13.0.0&Order=1&PhoneOSNew=1&ZSType=7&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9773&"
|
result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
|
data=data)
|
return result
|
|
|
# 获取代码的板块
|
def getStockIDPlate(code):
|
data = f"a=GetStockIDPlate_New&apiv=w32&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
|
result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
|
result = json.loads(result)
|
if int(result["errcode"]) != 0:
|
return None
|
return result["ListJX"] if result["ListJX"] else result["List"]
|
|
|
# IDPlate = getStockIDPlate(600550)
|
# print(f"IDPlate===={IDPlate}")
|
|
# 获取代码的精选板块
|
# 返回格式:[(板块代码,板块名称,涨幅百分比)]
|
def getCodeJingXuanBlocks(code):
|
data = f"a=GetStockIDPlate&apiv=w32&Type=2&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
|
result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
|
result = json.loads(result)
|
return result.get("ListJX")
|
|
|
# 获取该概念下的个股代码及其他
|
def getCodesByPlate(plate_code):
|
data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&old=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&IsZZ=0&Token=0&Index=0&apiv=w32&Type=6&IsKZZType=0&UserID=0&PlateID={plate_code}&"
|
return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
|
|
|
# 获取概念中的板块中的子板块
|
def getSonPlate(plate_code):
|
data = f"a=SonPlate_Info&apiv=w32&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&PlateID={plate_code}&"
|
return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
|
|
|
# 市场行情-行业
|
# def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
|
# data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
|
# return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
|
# data=data)
|
#
|
# # 市场行情-精选
|
# def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
|
# data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
|
# return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
|
# data=data)
|
|
|
# 获取自由流通市值
|
def getZYLTAmount(code):
|
data = f"a=GetStockPanKou_Narrow&apiv=w32&c=StockL2Data&VerSion=5.8.0.2&State=1&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&StockID={code}&"
|
result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
|
result = json.loads(result)
|
if "real" in result:
|
return result["real"].get("actualcirculation_value")
|
return None
|
|
|
# 获取涨停列表
|
def __getLimitUpInfo(pidType, page, pageSize):
|
data = f"Order=0&a=DailyLimitPerformance&st={pageSize}&apiv=w35&Type=4&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabb-99ef-3116-8bb9-6d893c846e24&VerSion=5.13.0.0&Index={(page - 1) * pageSize}&PidType={pidType}&"
|
result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
|
return result
|
|
|
# 整理涨停列表的数据
|
def getLimitUpInfoNew():
|
pids = [(1, "首板"), (2, "2连板"), (3, "3连板"), (4, "4连板"), (5, "")]
|
fresults = []
|
for pid_info in pids:
|
results = []
|
for i in range(10):
|
# start_time = time.time()
|
result = __getLimitUpInfo(pid_info[0], i + 1, 20)
|
# print("请求用时", time.time() - start_time)
|
result = json.loads(result)
|
datas = result["info"][0]
|
results.extend(datas)
|
# day = result["info"][1] # 连板天数?
|
if len(datas) < 20:
|
break
|
for r in results:
|
if not r[18] and pid_info[1]:
|
r[18] = pid_info[1]
|
# # 替换掉板块名称 机器人概念/机器人
|
# for i in range(len(r)):
|
# if type(r[i]) == str:
|
# r[i] = kpl_util.filter_block(r[i])
|
fresults.extend(results)
|
|
return json.dumps({"errcode": 0, "list": fresults})
|
|
|
# if __name__ == "__main__":
|
|
|
# print(f"打板列表t(pidType)====={daBanList(2)}")
|
# print(f"获取个股代码的板块==={getStockIDPlate('002766')}")
|
# print((f"获取个股代码的精选板块==={getCodeJingXuanBlocks('002878')}"))
|
# print(f"获取该概念下的个股代码及其他====={getCodesByPlate(885500)}") 《《《《《《《《《《
|
# print(f"获取概念中的板块中的子板块====={json.loads(getSonPlate(801085))}")
|
|
# print(f"获取概念中的板块强度====={getSonPlate(getCodesByPlate(getCodeJingXuanBlocks('002452')[2][0]))}")
|
# print(f"市场行情-行业板块 数==={len(getMarketIndustryRealRankingInfo(True))}")
|
# print(f"市场行情-行业板块==={json.loads(getMarketIndustryRealRankingInfo(True))}")
|
|
# 返回格式:['板块ID','板块名称','强度','涨幅','未知','成交额','''''''''强度','未知']
|
# print(f"市场行情-精选板块 数==={getMarketJingXuanRealRankingInfo(True)}")
|
# print(f"市场行情-精选板块==={json.loads(getMarketJingXuanRealRankingInfo(True))}")
|
# print(f"股票代码:{Market_situation_selected_sectors_No1[0]}")
|
# jingxuanbankuai = json.loads(getMarketJingXuanRealRankingInfo(True))
|
# print(f"jingxuanbankuai==={type(jingxuanbankuai)}")
|
# print(f"板块代码:{jingxuanbankuai['list'][0][0]},板块名称:{jingxuanbankuai['list'][0][1]},强度:{jingxuanbankuai['list'][0][2]},涨幅:{jingxuanbankuai['list'][0][3]},未知:{jingxuanbankuai['list'][0][4]},成交额:{round(jingxuanbankuai['list'][0][5]/100000000)}亿,主力净额:{round(jingxuanbankuai['list'][0][6]/100000000,2)}亿,主买:{round(jingxuanbankuai['list'][0][7]/100000000,2)}亿,主卖:{round(jingxuanbankuai['list'][0][8]/100000000,2)}亿,未知:{jingxuanbankuai['list'][0][9]},流通值:{round(jingxuanbankuai['list'][0][10]/100000000,2)}亿,未知/或为最大涨跌幅:{round(jingxuanbankuai['list'][0][11],2)},未知:{round(jingxuanbankuai['list'][0][12]/100000000,2)}亿,总市值:{round(jingxuanbankuai['list'][0][13]/100000000,2)}亿,第一季度机构持仓:{round(jingxuanbankuai['list'][0][14]/100000000,2)}亿,未知:{round(jingxuanbankuai['list'][0][15],2)},未知:{round(jingxuanbankuai['list'][0][16],2)},强度:{round(jingxuanbankuai['list'][0][17],2)}")
|
# # 部分板块没有子板块
|
# print(f"获取概念中的板块中的子板块====={json.loads(getSonPlate(801248))}")
|
|
# print(f"自由流通市值==={getZYLTAmount('603319')}")
|
# print((f"获取个股代码的精选板块列表==={getCodeJingXuanBlocks('002452')}"))
|
# print((f"获取个股代码的精选第一板块代码==={getCodeJingXuanBlocks('002452')[0][0]}"))
|
# print(f"获取该概念下的个股代码及其他====={json.loads(getCodesByPlate(getCodeJingXuanBlocks('002452')[0][0]))}")
|
# print(f"获取该概念下的个股代码及其他dddddd====={json.loads(its_strongest_sector_situation)}")
|
# print(f"涨停列表及概念板块={json.loads(getLimitUpInfoNew())['list']}")
|
|
########################################################################################################################################################################################################################
|
|
# 获取行情精选板块 强度排名
|
def get_market_sift_plate_its_stock_power():
|
@dask.delayed
|
def batch_get_plate_codes(fs):
|
return fs
|
|
@dask.delayed
|
def request_plate_codes(i):
|
plate_name = i[1]
|
log_data = None
|
# 尝试过滤掉无意义的概念板块(plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革','超跌', '壳资源', '股权转让', '送转填权']) and '增长' in plate_name
|
if (plate_name not in ['科创板', '北交所', '次新股', '无', 'ST板块', 'ST摘帽', '并购重组', '国企改革', '超跌',
|
'壳资源', '股权转让', '送转填权']) or ('增长' in plate_name):
|
|
# print(f"{i[1]} 强度:{i[2]}")
|
# 通过板块ID获取其下面的个股强度列表
|
# print(f"======={i[0]}=======")
|
its_stock = json.loads(getCodesByPlate(i[0]))
|
log_data = {plate_name: its_stock['list']}
|
|
# its_stock_list_info = its_stock['list']
|
# logger.info(f"its_stock_list_info==={its_stock_list_info}")
|
# 将板块强度下面对应的个股列表打印到日志中
|
# for i in its_stock_list_info:
|
# if i[0] != 1:
|
# logger.info(
|
# f"l === 个股代码:{i[0]},公司名称:{i[1]},主力资金推测:{i[2]},未知0值:{i[3]},概念:{i[4]},最新价:{i[5]},当日当时涨幅:{i[6]}%,"
|
# f"成交额:{round(i[7] / 100000000, 2)} 亿,实际换手率:{i[8]}%,未知0值:{i[9]},实际流通:{round(i[10] / 100000000, 2)}亿,"
|
# f"主力买:{round(i[11] / 100000000, 2)}亿,"
|
# f"主力卖:{round(i[12] / 100000000, 2)}亿,"
|
# f"主力净额:{round(i[13] / 10000, 2)}万,买成占比:{i[14]}%,卖成占比:{i[15]}%,净成占比:{i[16]}%,买流占比:{i[17]}%,卖流占比:{i[18]}%,净流占比:{i[19]}%,"
|
# f"区间涨幅:{i[20]}%,量比:{i[21]},未知0:{i[22]},上板情况:{i[23]},上板排名:{i[24]},换手率:{i[25]}%,"
|
# f"未知空值:{i[26]},未知零值:{i[27]},收盘封单:{i[28]},最大封单:{i[29]},未知空值?:{i[30]},"
|
# f"?:{i[30]}%,?:{i[31]},??:{i[32]},振幅:{i[33]}%,未知0????:{i[34]},未知0?????:{i[35]},"
|
# f"?=:{i[36]},?总市值:{i[37]},?流通市值:{i[38]},最终归属概念(收盘后出数据?):{i[39]},领涨次数:{i[40]},"
|
# f"41未知1值:{i[41]},第三季度机构持仓【str数据勿用运算符】:{i[42]}万,?年预测净利润:{i[43]},上年预测净利润:{i[44]},年内预测净利润:{i[45]}"
|
# )
|
|
# 初始化股票强度列表
|
stock_power_list = []
|
for s in its_stock['list']:
|
# 过滤掉涨幅大于 and s[6] < 6.5 且小于0%的 和 名称中包含ST的 和 涨速小于等于0%的 和 只要昨日未涨停 和 上证或深证的正股 and s[9] > 0.0025
|
if s[6] > 0 and s[1].find("ST") < 0 and s[1].find("XD") < 0 and s[23].find("板") < 0 and s[24].find(
|
"板") < 0 and (s[0].startswith('60') or s[0].startswith('00')) and s[9] > 1:
|
# print(f"{s[1]},个股代码:{s[0]}, 涨幅:{s[6]}% 涨速:{s[9]}% 概念:{s[4]} 主力资金推测:{s[2]} 领涨次数:{s[40]} 今日第几板:{s[23]} 是否破版{s[24]}")
|
# 对个股强度 主要 属性列表进行装填
|
its_stock_power = [s[1], s[0], s[6], s[9], s[4], s[2], s[40]]
|
# 逐个选择性添加its_stock中的元素到个股强度列表中
|
# print(f"its_stock_power===={its_stock_power}")
|
# 整体将添加完善的个股强度列表添加到股票列表中
|
stock_power_list.append(its_stock_power)
|
# print(f"stock_power_list===={stock_power_list}")
|
# 过滤掉没有瞬时高强度个股的空概念
|
if len(stock_power_list) != 0:
|
# 将对应板块的股票强度列表新建一个字典
|
stock_power_item = {i[1]: stock_power_list}
|
# 并更新到精选板块个股字典中
|
market_sift_plate_stock_dict.update(stock_power_item)
|
return log_data
|
|
data = (getMarketJingXuanRealRankingInfo())
|
market_sift_plate = json.loads(data)
|
# logger_kpl_jingxuan_in 打印的日志专用于开盘了数据的存储分析,不能轻易删除
|
# print(f"market_sift_plate 数 ======{len(market_sift_plate['list'])}")
|
# 行情》精选板块》排名前20中》对应个股》符合条件的个股
|
# logger.info(f"market_sift_plate['list']======{market_sift_plate['list']}")
|
# logger.info(f"market_sift_plate['list'][0] ======{market_sift_plate['list'][0]}")
|
# 初始化精选板块对应个股字典
|
market_sift_plate_stock_dict = {}
|
if 'list' in market_sift_plate:
|
ds = []
|
for d in market_sift_plate['list']:
|
ds.append(request_plate_codes(d))
|
dask_result = batch_get_plate_codes(ds)
|
compute_results = dask_result.compute()
|
log_datas = {}
|
for r in compute_results:
|
if not r:
|
continue
|
for b in r:
|
log_datas[b] = r[b]
|
now_time = tool.get_now_time_str()
|
if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME:
|
# logger.info(f"精选板块股票强度数据更新 == {market_sift_plate_stock_dict}")
|
# 只在盘中时间获取
|
async_log_util.info(logger_stock_of_markets_plate, f"{(market_sift_plate['list'], log_datas)}")
|
|
return market_sift_plate_stock_dict
|
|
|
# 调用一下获取精选板块股票强度数据函数 【本模块内使用时调用】
|
# get_market_sift_plate_its_stock_power()
|
|
def get_market_sift_plate_its_stock_power_process(callback):
|
while True:
|
try:
|
# now = time.time()
|
# print(f"kpl_limit_up_process开始了{now}")
|
start_time = time.time()
|
now_time = tool.get_now_time_str()
|
if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME:
|
its_stock_power = get_market_sift_plate_its_stock_power()
|
time_str = datetime.datetime.now().strftime("%H%M%S")
|
if 92900 < int(time_str) < 95000:
|
logger_kpl_jingxuan_in.info(f"耗时:{time.time() - start_time} 数据:{its_stock_power}")
|
callback(its_stock_power)
|
# print(f"精选板块拉升个股更新===={its_stock_power}")
|
except Exception as e:
|
logger.error(f"开盘啦板块强度线程报错An error occurred: {e}")
|
finally:
|
time.sleep(2)
|
|
|
# 获取涨停信息数据
|
def get_limit_up_info():
|
# 获取涨停信息列表
|
limit_up_info = json.loads(getLimitUpInfoNew())['list']
|
return limit_up_info
|
|
|
# 获取市场行情情绪综合强度
|
def get_market_strong():
|
"""
|
获取市场强度
|
:return:
|
"""
|
result = __base_request("https://apphwhq.longhuvip.com/w1/api/index.php",
|
f"a=DiskReview&apiv=w35&c=HomeDingPan&VerSion=5.13.0.0&PhoneOSNew=1&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9773&")
|
data = json.loads(result)
|
return int(data["info"]["strong"])
|
|
|
# market_strong = get_market_strong()
|
# print(f"market_strong==={market_strong}")
|
|
|
# 获取涨停板块名称列表并存储本地的函数
|
def get_limit_up_block_names():
|
# 设定当前时间点
|
now_time = tool.get_now_time_str()
|
# print(f"now_time===={now_time}")
|
if data_cache.SERVER_RESTART_TIME < now_time < data_cache.UPDATE_DATA_TIME:
|
# print(f"在时间内使用--------------------------")
|
# 获取涨停信息列表
|
limit_up_info = get_limit_up_info()
|
# print(f"limit_up_info=={limit_up_info}")
|
data_cache.limit_up_info = get_limit_up_info()
|
# 提取涨停列表中的板块名称
|
limit_up_block_names = []
|
# 循环添加涨停概念
|
for i in limit_up_info:
|
limit_up_block_names.append(i[5])
|
# print(f"limit_up_block_names==={limit_up_block_names}")
|
# return limit_up_block_names
|
# # 使用Counter计算每个元素的出现次数
|
# counter = Counter(limit_up_block_names)
|
# # 找出出现次数最多的元素及其次数
|
# most_common_element, most_common_count = counter.most_common(1)[0]
|
# # 打印出现次数最多的元素
|
# print(f"主线概念:{most_common_element},出现了 {most_common_count} 次")
|
|
return limit_up_block_names
|
|
|
# 为开盘啦接口获取的涨停列表概念板块单独开一个进程 形参(callback)
|
def kpl_limit_up_process(callback):
|
while True:
|
try:
|
# now = time.time()
|
# print(f"kpl_limit_up_process开始了{now}")
|
limit_up_block_names = get_limit_up_block_names()
|
callback(limit_up_block_names)
|
# logger.info(f"涨停更新===={limit_up_block_names}")
|
# print(f"涨停更新数量===={len(limit_up_block_names)}")
|
# print(f"kpl_limit_up_process完成一下{now}")
|
except Exception as e:
|
logger.error(f"开盘啦涨停板块概念线程报错An error occurred: {e}")
|
finally:
|
time.sleep(1.5)
|
|
|
# kpl_limit_up_process()
|
|
|
# 构建涨停信息读写对象
|
class DailyLimitUpInfoStorageManager:
|
# 初始化文件路径
|
def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH):
|
self.file_path = file_path
|
|
# 添加单日涨停信息数据到文件中的一行 函数
|
def append_data_to_file(self, data_to_append):
|
# print(f"data_to_append=={data_to_append}")
|
# 读取所有行并解析为 JSON 对象列表
|
if os.path.exists(self.file_path):
|
with open(self.file_path, 'r', encoding='utf-8') as file:
|
# 获取当前日期并格式化
|
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
lines = [json.loads(line.strip()) for line in file if line.strip()]
|
# print(f"lines type=={type(lines)}")
|
# print(f"lines=={lines}")
|
# 检查当前日期是否已存在于文件中
|
if lines: # 如果读取到的行文件列表不为空(为真)
|
if lines[-1].get(current_date) is None: # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期)
|
# 将日期和data_to_append转换为JSON格式的字符串
|
json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n'
|
# 打开文件并追加JSON行
|
with open(self.file_path, 'a', encoding='utf-8') as file:
|
file.write(json_line)
|
else:
|
logger.info(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)")
|
else:
|
json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n'
|
# 打开文件并追加JSON行
|
with open(self.file_path, 'a', encoding='utf-8') as file:
|
file.write(json_line)
|
|
# 清理多余数据函数
|
def check_and_remove_oldest_entry(self, max_entries):
|
# 读取所有行并解析为 JSON 对象列表
|
if os.path.exists(self.file_path):
|
with open(self.file_path, 'r', encoding='utf-8') as file:
|
lines = [json.loads(line.strip()) for line in file if line.strip()]
|
else:
|
lines = []
|
|
# 如果行数超过限制,移除最早的一些行
|
if len(lines) >= max_entries:
|
# 截断列表,只保留最新的 max_entries 个对象
|
lines = lines[-max_entries:]
|
# 重新打开文件以写入模式,并写入截断后的对象列表为 JSON Lines
|
with open(self.file_path, 'w', encoding='utf-8') as file:
|
for obj in lines:
|
file.write(json.dumps(obj, ensure_ascii=False) + '\n')
|
# file.write(json.dumps(obj, ensure_ascii=False))
|
|
# 隔行整理数据并合并装入一个字典数据中调用时返回这个字典数据 函数
|
def arrange_limit_up_info(self):
|
limit_info = {}
|
# 创建一个列表来存储所有解析的 JSON 对象
|
if os.path.exists(self.file_path):
|
with open(self.file_path, 'r', encoding='utf-8') as file:
|
for line in file:
|
# 去除每行末尾的换行符(如果有的话)
|
line = line.rstrip('\n')
|
# 将每行解析为一个 JSON 对象
|
info = json.loads(line)
|
# 假设每行都是一个字典数据,且只有一个键值对,其中键是日期
|
if isinstance(info, dict) and len(info) == 1:
|
date, data = list(info.items())[0]
|
limit_info[date] = data
|
return limit_info
|
|
|
# 构建一个获取读写存储本地的并整理涨停数据的函数
|
def get_arrange_limit_up_info():
|
# 实例化每日涨停信息整理方法
|
manager = DailyLimitUpInfoStorageManager()
|
manager.append_data_to_file(get_limit_up_info())
|
manager.check_and_remove_oldest_entry(max_entries=1000)
|
|
|
# 构建一个处理历史涨停涨停信息数据的函数
|
def get_handling_limit_up_info():
|
# 实例化每日涨停信息整理方法
|
history_limit_up_info = DailyLimitUpInfoStorageManager()
|
data_cache.daily_limit_up_info = history_limit_up_info.arrange_limit_up_info()
|
# logger.info(f"读本地的日更的历史涨停数据=={data_cache.daily_limit_up_info}")
|
|
# print(f"daily_limit_up_info 类型==={type(data_cache.daily_limit_up_info)}")
|
# 统计每日主线
|
daily_limit_up_info_len = len(data_cache.daily_limit_up_info)
|
# print(f"daily_limit_up_info_len==={daily_limit_up_info_len}")
|
|
historical_transaction_date_list = []
|
date_of_the_day = data_cache.DataCache().today_date
|
for i in range(daily_limit_up_info_len):
|
pre_date = hx_qc_value_util.get_previous_trading_date(date_of_the_day) # 获取前一个交易日API
|
# target_date_str = basic_methods.pre_num_trading_day(data_cache.today_date, daily_limit_up_info_len)
|
date_format = "%Y-%m-%d"
|
target_date = datetime.datetime.strptime(pre_date, date_format).strftime("%Y-%m-%d")
|
historical_transaction_date_list.append(target_date)
|
date_of_the_day = pre_date
|
|
# print(f"historical_transaction_date_list={historical_transaction_date_list}")
|
history_sorted_plate_ranking_list = []
|
for key, value in data_cache.daily_limit_up_info.items():
|
# print(f"key=={key}")
|
for i in historical_transaction_date_list:
|
# print(f"i======={i}")
|
# 找到每上一个交易日对应的本地数据的信息
|
if key == i:
|
# print(f"{key}===找到了!value={value}")
|
#
|
plate_ranking_list = []
|
# 遍历交易日每一个涨停股的信息
|
for v in value:
|
# print(f"v =={v}")
|
# 将每一个涨停股的涨停概念和同班级数量 汇编为一个字典
|
plate_limit_up_num_dict = {
|
v[5]: v[20]
|
}
|
# 将这个字典数据不重复的添加到概念排名列表中
|
if plate_limit_up_num_dict not in plate_ranking_list:
|
plate_ranking_list.append(plate_limit_up_num_dict)
|
# plate_ranking_set.add(v[20])
|
# print(f"plate_ranking_list={plate_ranking_list}")
|
# 使用sorted函数和lambda表达式来根据字典的值进行排序
|
# 这里我们确保不修改原始字典,仅通过list(x.values())[0]来获取值
|
sorted_plate_ranking_list = sorted(plate_ranking_list, key=lambda x: list(x.values())[0], reverse=True)
|
# logger.info(f"{key}=====>>>>{sorted_plate_ranking_list}")
|
history_sorted_plate_ranking_list.append(sorted_plate_ranking_list)
|
|
# print(f"history_sorted_plate_ranking_list={history_sorted_plate_ranking_list}")
|
# for ranking_list in history_sorted_plate_ranking_list:
|
# print(f"ranking_list={ranking_list}")
|
# for i in ranking_list:
|
# print(f"i={i}")
|
|
# 计算历史涨停概念的连续出现次数函数
|
def count_key_occurrences(list_of_dicts_lists):
|
# 创建一个字典来存储每个键的总出现次数
|
key_counts = {}
|
# 遍历列表中的每个字典列表
|
for sublist in list_of_dicts_lists:
|
# 遍历当前字典列表中的每个字典
|
for dict_item in sublist:
|
# 遍历字典中的每个键
|
for key in dict_item:
|
# 如果键不在key_counts中,则初始化计数为0
|
if key not in key_counts:
|
key_counts[key] = 0
|
# 增加当前键的计数
|
key_counts[key] += 1
|
# 打印结果
|
for key, count in key_counts.items():
|
if count > 1:
|
logger.info(f"'{key}' 连续出现 {count} 次")
|
|
# 调用函数,传入整个列表
|
# count_key_occurrences(history_sorted_plate_ranking_list)
|
|
# daily_limit_up_info_list = list(reversed(daily_limit_up_info_list))
|
# print(f"daily_limit_up_info_list==={daily_limit_up_info_list}")
|
|
# 获取昨日涨停代码 (以便与K线对比)
|
pre_trading_day_limit_up_info = data_cache.daily_limit_up_info.get(data_cache.DataCache().pre_trading_day)
|
if pre_trading_day_limit_up_info is not None:
|
yesterday_limit_up_code_list = []
|
for i in pre_trading_day_limit_up_info:
|
symbol_code = basic_methods.format_stock_symbol(i[0])
|
limit_up_code = symbol_code
|
yesterday_limit_up_code_list.append(limit_up_code)
|
data_cache.yesterday_limit_up_code_list = yesterday_limit_up_code_list
|
logger.info(f"昨日涨停股票数量=={len(data_cache.yesterday_limit_up_code_list)}")
|
logger.info(f"昨日涨停代码列表=={yesterday_limit_up_code_list}")
|
|
# code = pre_trading_day_limit_up_info[0][0]
|
# logger.info(f"股票代码=={code}")
|
# cor_name = pre_trading_day_limit_up_info[0][1]
|
# logger.info(f"公司名称=={cor_name}")
|
# unknown_zero_2 = pre_trading_day_limit_up_info[0][2]
|
# logger.info(f"未知零值2=={unknown_zero_2}")
|
# none_data = pre_trading_day_limit_up_info[0][3]
|
# logger.info(f"空数据=={none_data}")
|
# # 总市值(万)?
|
# total_market_value = round((pre_trading_day_limit_up_info[0][4] / 10000), 2)
|
# logger.info(f"总市值=={total_market_value}(万)?")
|
# # 最相关概念
|
# the_most_relevant_plate = pre_trading_day_limit_up_info[0][5]
|
# logger.info(f"最相关概念=={the_most_relevant_plate}")
|
# # 收盘封单金额(万)
|
# closing_amount = round((pre_trading_day_limit_up_info[0][6] / 10000), 2)
|
# logger.info(f"收盘封单金额=={closing_amount}(万)")
|
# # 最大封单金额(万)
|
# maximum_blocked_amount = round((pre_trading_day_limit_up_info[0][7] / 10000), 2)
|
# logger.info(f"最大封单金额=={maximum_blocked_amount}(万)")
|
# # 主力净额
|
# main_net_amount = round((pre_trading_day_limit_up_info[0][8] / 10000), 2)
|
# logger.info(f"主力净额=={main_net_amount}(万)")
|
# # 主力买
|
# main_buyers = round((pre_trading_day_limit_up_info[0][9] / 10000), 2)
|
# logger.info(f"主力买=={main_buyers}(万)")
|
# # 主力卖
|
# main_sellers = round((pre_trading_day_limit_up_info[0][10] / 10000), 2)
|
# logger.info(f"主力卖=={main_sellers}(万)")
|
# # 成交额
|
# transaction_amount = round((pre_trading_day_limit_up_info[0][11] / 10000), 2)
|
# logger.info(f"成交额=={transaction_amount}(万)")
|
# # 所属精选板块
|
# selected_plate = pre_trading_day_limit_up_info[0][12]
|
# logger.info(f"所属精选板块=={selected_plate}")
|
# # 实际流通
|
# actual_circulation = round((pre_trading_day_limit_up_info[0][11] / 100000000), 2)
|
# logger.info(f"实际流通=={actual_circulation}(亿)")
|
# # 量比?(不是,不知道是什么)
|
# equivalent_ratio = pre_trading_day_limit_up_info[0][14]
|
# logger.info(f"量比?=={equivalent_ratio}")
|
# # 领涨次数
|
# leading_increases_times = pre_trading_day_limit_up_info[0][15]
|
# logger.info(f"领涨次数=={leading_increases_times}")
|
# # 未知零值
|
# unknown_zero_16 = pre_trading_day_limit_up_info[0][16]
|
# logger.info(f"未知零值16=={unknown_zero_16}")
|
# # 未知零值
|
# unknown_zero_17 = pre_trading_day_limit_up_info[0][17]
|
# logger.info(f"未知零值17=={unknown_zero_17}")
|
# # 第几板(连续涨停天数)
|
# continuous_limit_up_days = pre_trading_day_limit_up_info[0][18]
|
# logger.info(f"第几板=={continuous_limit_up_days}")
|
# # 最相关概念的代码
|
# the_most_relevant_plate_code = pre_trading_day_limit_up_info[0][19]
|
# logger.info(f"最相关概念的代码=={the_most_relevant_plate_code}")
|
# # 同班级的数量(同概念涨停数量)
|
# the_same_class_amount = pre_trading_day_limit_up_info[0][20]
|
# logger.info(f"同概念涨停数量=={the_same_class_amount}")
|
|
|
# get_handling_limit_up_info()
|
|
|
# 获取全部个股的板块并存储的函数
|
def get_all_stocks_plate_dict(stocks_list):
|
all_stocks_plate_dict = {}
|
# 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中
|
for i in stocks_list:
|
try:
|
code = i.split('.')[1]
|
# print(f"i==={i}")
|
# 获取个股的自由市值
|
free_market_value = getZYLTAmount(code)
|
# 获取个股的板块列表
|
selected_blocks = getStockIDPlate(code)
|
# 提取精选板块中的板块名称
|
selected_plate_list = [block[1] for block in selected_blocks]
|
# print(f"selected_block_names==={selected_block_list}")
|
block_data = {
|
# 添加自由市值
|
'free_market_value': free_market_value,
|
# 添加精选板块
|
'plate': selected_plate_list
|
}
|
# 将code作为键,stocks_selected_block_data作为值添加到stocks_block_data字典中
|
all_stocks_plate_dict[code] = block_data
|
# print(f"all_stocks_plate_dict==={all_stocks_plate_dict}")
|
except Exception as e:
|
print(f"获取全部个股的板块并存储的函数 An error occurred: {e}")
|
finally:
|
pass
|
# return stocks_plate_data
|
# print(f"all_stocks_plate_dict==={len(all_stocks_plate_dict)}")
|
# 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中
|
# 将字典转换为JSON格式的字符串
|
json_data = json.dumps(all_stocks_plate_dict)
|
# 写入文件
|
with open(constant.ALL_STOCKS_PLATE_PATH, 'w', encoding='utf-8') as f:
|
f.write(json_data)
|
now_time = datetime.datetime.now() # 获取本机时间
|
logger.info(f"写入所有个股板块文件完成!::{now_time}")
|
|
|
# 计算开盘啦昨日拉取的概念数据中为空的股票数量函数
|
def get_have_no_plate_num():
|
# 初始化无概念数量
|
have_no_plate_num = 0
|
plate_are_null_list = []
|
for k, v in data_cache.all_stocks_plate_dict.items():
|
pass
|
# print(f"i==={i} T==={t}")
|
if len(v['plate']) == 0:
|
have_no_plate_num += 1
|
# print(f"{k}的概念为空")
|
# logger.info(f"{k}的概念为空")
|
# 股票代码格式转化为掘金格式
|
symbol = basic_methods.format_stock_symbol(k)
|
sec_name = data_cache.all_stocks_all_K_line_property_dict.get(symbol)
|
if sec_name is not None:
|
plate_are_null_list.append(sec_name)
|
logger.info(f"有{have_no_plate_num}只股票概念为空")
|
logger.info(f"个股有历史K线但概念为空的有:{plate_are_null_list}")
|
|
|
# 获取全部个股的精选板块并存储的函数
|
def stocks_list_selected_blocks(min_stocks):
|
stocks_selected_block_data = []
|
# 逐个获取个股精选板块概念和自由市值等,并整体放入一个新创建的字典中然后添加到数据中
|
for i in min_stocks:
|
try:
|
code = i.split('.')[1]
|
# 获取个股的自由市值
|
free_market_value = getZYLTAmount(code)
|
# 获取个股的精选板块列表
|
# selected_blocks = getCodeJingXuanBlocks('000021')
|
selected_blocks = getCodeJingXuanBlocks(code)
|
# 提取精选板块中的板块名称
|
selected_block_list = [block[1] for block in selected_blocks]
|
# print(f"selected_block_names==={selected_block_list}")
|
stocks_selected_block_dict = {
|
# 添加股票代码
|
'code': code,
|
# 添加自由市值
|
'free_market_value': free_market_value,
|
# 添加精选板块
|
'selected_block': selected_block_list
|
}
|
stocks_selected_block_data.append(stocks_selected_block_dict)
|
# print(f"stocks_selected_block_data==={stocks_selected_block_dict}")
|
except Exception as e:
|
logger.error(f"获取全部个股的精选板块并存储的函数 An error occurred: {e}")
|
|
# print(f"stocks_selected_block_data==={len(stocks_selected_block_data)}")
|
# 将获取到的范围票概念板块转JSON格式并存储在本地文件夹中
|
# 将字典转换为JSON格式的字符串
|
json_data = json.dumps(stocks_selected_block_data)
|
# 写入文件
|
with open('local_storage_data/stocks_selected_block_data.json', 'w', encoding='utf-8') as f:
|
f.write(json_data)
|
now_time = datetime.datetime.now() # 获取本机时间
|
print(f"写入精选板块文件完成!::{now_time}")
|
|
|
# 获取实时大盘行情情绪综合强度 [分数] 函数 并 计算当日计划持仓数量
|
def get_real_time_market_strong():
|
while True:
|
try:
|
if data_cache.position_automatic_management_switch is True:
|
now_time = tool.get_now_time_str()
|
if data_cache.L1_DATA_START_TIME < now_time < data_cache.UPDATE_DATA_TIME:
|
# 获取大盘综合强度分数
|
data_cache.real_time_market_strong = get_market_strong()
|
# data_cache.time_sharing_market_strong_dirt = time_sharing_market_strong_dirt.update({now: data_cache.real_time_market_strong})
|
# 该logger.info的的日志不再需要打印,后续将转入到GUI客户端上直接显示,该数据的打印交由下方的打印机制异步执行单独存储,以便后续可视化呈现后进行更高效的数据分析
|
# logger.info(f"大盘行情情绪综合强度 [分数]==={data_cache.real_time_market_strong}分")
|
# 大盘综合强度分数 的 异步日志
|
# logger_Overall_market_strength_score.info(data_cache.real_time_market_strong)
|
async_log_util.info(logger_Overall_market_strength_score, f"{data_cache.real_time_market_strong}")
|
|
usefulMoney = data_cache.account_finance_dict[0].get('usefulMoney', 0)
|
logger.info(f"账户可用资金==={usefulMoney}元")
|
# 低迷情绪比例
|
low_emotion_mood_ratio = 1
|
# 33分是个两级分化阶梯不好,目前不好拿捏,暂时不用
|
# if data_cache.real_time_market_strong <= 33:
|
if data_cache.real_time_market_strong < 30:
|
low_emotion_mood_ratio = 0.1
|
if data_cache.real_time_market_strong <= 10:
|
low_emotion_mood_ratio = 0
|
logger.info(f"极端低迷情绪比例===={low_emotion_mood_ratio * 100}%")
|
|
data_cache.index_trend_expectation_score = index_trend_expectation()
|
logger.info(f"大盘指数情绪预期分数==={data_cache.index_trend_expectation_score}分")
|
# 目前大盘指数情绪预期分数 尚不科学 强制设置为初始0值
|
index_trend_expectation_score = 0
|
|
# 获取计算今天新增的持仓数量
|
addition_position_number = len(data_cache.addition_position_symbols_set)
|
# 定义一个今日的剩余新增持仓数量的变量
|
Unfinished_opening_plan_number = 3 - addition_position_number
|
if Unfinished_opening_plan_number != 0:
|
# 如果GUI看盘上没有手动设置具体的下单金额,就按照评分策略的金额下单,否则就按照GUI设置的金额下单。
|
if data_cache.BUY_MONEY_PER_CODE < 0:
|
# 根据账户可用金额 计算今日计划下单金额
|
# 账户可用金额 默认乘以0.9,永远留一点钱,一方面也冗余一些计算误差
|
# ((大盘综合强度分数 + 大盘指数情绪预期分数) * 0.01) * (账户可用金额 * 0.9 * 极端低迷情绪比例 / 今日最大新增持仓票数)
|
# data_cache.today_planned_order_amount = ((data_cache.real_time_market_strong + index_trend_expectation_score) * 0.01) * (
|
# usefulMoney * 0.9 * low_emotion_mood_ratio / Unfinished_opening_plan_number)
|
data_cache.today_planned_order_amount = (index_trend_expectation_score * 0.01) * (usefulMoney * 0.9 * low_emotion_mood_ratio / Unfinished_opening_plan_number)
|
else:
|
data_cache.today_planned_order_amount = data_cache.BUY_MONEY_PER_CODE
|
|
except Exception as error:
|
logger.error(f"获取实时大盘行情情绪综合强度[分数] 函数报错: {error}")
|
finally:
|
time.sleep(3)
|
|
|
# kpl_stocks_list_selected_blocks_process() #在 kpl_api.py中可以调用
|
# stocks_list_selected_blocks(min_stocks) #在 kpl_api.py中可以调用
|
# list = ['SHSE.600805','SHSE.600804']
|
#
|
# all_stocks_plate_dict(list)
|
|
|
if __name__ == "__main__":
|
# start_time = time.time()
|
# get_market_sift_plate_its_stock_power()
|
# print("耗时:", time.time() - start_time)
|
get_market_sift_plate_its_stock_power()
|
# get_market_sift_plate_its_stock_power_process(None)
|