Administrator
6 天以前 1c04204fcbc958a7bdef2394ff939063e56b6404
third_data/kpl_api.py
@@ -1,29 +1,142 @@
import json
import time
import requests
import constant
from third_data import kpl_util
from utils import middle_api_protocol, tool
def __base_request(url, data):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
        "User-Agent": "Dalvik / 2.1.0(Linux;U;Android 6.0.1;MuMu Build/V417IR)"
    }
    # proxies={'https': '192.168.3.251:9002'}
    # 禁止代理,不然会走本地代理
    response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None})
    return response
# 竞价
DABAN_TYPE_BIDDING = 8
# 涨停
DABAN_TYPE_LIMIT_UP = 1
# 炸板
DABAN_TYPE_OPEN_LIMIT_UP = 2
# 跌停
DABAN_TYPE_LIMIT_DOWN = 3
# 曾跌停
DABAN_TYPE_EVER_LIMIT_DOWN = 5
def __base_request(url, data, timeout=10):
    DELEGATE = True
    if not DELEGATE:
        headers = {
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
            "User-Agent": "Dalvik / 2.1.0(Linux;U;Android 6.0.1;MuMu Build/V417IR)"
        }
        # proxies={'https': '192.168.3.251:9002'}
        # 禁止代理,不然会走本地代理
        response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None},
                                 timeout=timeout)
        if response.status_code != 200:
            raise Exception("请求出错")
        return response.text
    else:
        fdata = middle_api_protocol.load_kpl(url, data, timeout)
        return middle_api_protocol.request(fdata)
def daBanList(pidType, page_size=50, index=0):
    data = f"Order=1&a=DaBanList&st={page_size}&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e24" \
           f"&VerSion=5.8.0.2&Index={index}&Is_st=1&PidType={pidType}&apiv=w32&Type=4&FilterMotherboard=0&Filter=0&FilterTIB=0" \
           "&FilterGem=0"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    return result
def __getLimitUpInfo(pidType, page, pageSize):
    data = f"Order=0&a=DailyLimitPerformance&st={pageSize}&apiv=w35&Type=4&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e24&VerSion=5.13.0.0&Index={(page - 1) * pageSize}&PidType={pidType}&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    return result
def getLimitUpInfoNew():
    pids = [(1, "首板"), (2, "2连板"), (3, "3连板"), (4, "4连板"), (5, "")]
    fresults = []
    day = ''
    for pid_info in pids:
        results = []
        for i in range(100):
            start_time = time.time()
            result = __getLimitUpInfo(pid_info[0], i + 1, 20)
            result = json.loads(result)
            datas = result["info"][0]
            results.extend(datas)
            day = result["info"][1]
            if len(datas) < 20:
                break
        for r in results:
            if not r[18] and pid_info[1]:
                r[18] = pid_info[1]
            # 替换掉板块名称
            for i in range(len(r)):
                if type(r[i]) == str:
                    r[i] = kpl_util.filter_block(r[i])
        fresults.extend(results)
    return json.dumps({"errcode": 0, "list": fresults, "day": day})
def getLimitUpInfo():
    list_ = []
    page_size = 20
    MAX_SIZE = 150
    for i in range(0, 10):
        result_str = daBanList(DABAN_TYPE_LIMIT_UP, page_size=page_size, index=len(list_))
        result = json.loads(result_str)
        temp_list = result["list"]
        list_ += temp_list
        if len(temp_list) < page_size:
            result['list'] = list_
            return json.dumps(result)
        elif len(list_) > MAX_SIZE:
            return json.dumps(result)
    return None
def getHistoryLimitUpInfo(day):
    fresults = []
    for i in range(0, 100):
        data = f"Order=1&a=HisDaBanList&st=20&c=HisHomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index={i * 20}&Is_st=1&PidType=1&apiv=w32&Type=6&FilterMotherboard=0&Filter=0&FilterTIB=0&Day={day}&FilterGem=0&"
        result = __base_request("https://apphis.longhuvip.com/w1/api/index.php", data=data)
        result = json.loads(result)
        result = result["list"]
        fresults.extend(result)
        if len(result) < 20:
            break
    return fresults
# 市场行情-行业
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                            data=data)
    return result
# 市场行情-精选
def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                            data=data)
    return result
def getMarketJingXuanRealRankingInfoByTimeRange(startTime, endTime, date, orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&st=30&a=RealRankingInfo&apiv=w35&Type=5&Index=0&RStart={startTime}&c=ZhiShuRanking&VerSion=5.13.0.0&REnd={endTime}&Date={date}&PhoneOSNew=1&ZSType=7&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9773&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                            data=data)
    return result
# 获取代码的板块
def getStockIDPlate(code):
    data = f"a=GetStockIDPlate_New&apiv=w32&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    result = response.text
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    print(result)
    if int(result["errcode"]) != 0:
        return None
    return result["ListJX"] if result["ListJX"] else result["List"]
@@ -32,49 +145,225 @@
# 获取概念代码
def getCodesByPlate(plate_code):
    data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&old=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&IsZZ=0&Token=0&Index=0&apiv=w32&Type=6&IsKZZType=0&UserID=0&PlateID={plate_code}&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
def getCodesByPlateOrderByLZCS(plate_code):
    """
    根据领涨次数排序
    @param plate_code:
    @return:
    """
    data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&old=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&IsZZ=0&Token=0&Index=0&apiv=w32&Type=27&IsKZZType=0&UserID=0&PlateID={plate_code}&"
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
def getHistoryCodesByPlateOrderByLZCS(plate_code, date, time_str):
    """
    根据领涨次数排序
    @param date: 2025-05-25
    @param time_str: 1025
    @param plate_code: 110505
    @return:
    """
    data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&RStart=0925&old=1&DeviceID=b692e51c-1bc4-3e8c-a01b-620aa6240e28&VerSion=5.8.0.4&IsZZ=0&Token=0&Index=0&Date={date}&REnd={time_str}&apiv=w33&Type=1&IsKZZType=0&UserID=0&PlateID={plate_code}&"
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 获取概念中的板块强度
def getSonPlate(plate_code):
    data = f"a=SonPlate_Info&apiv=w32&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&PlateID={plate_code}&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
# 市场行情-行业
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 市场行情-精选
def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 获取代码的精选板块
# 返回格式:[(板块代码,板块名称,涨幅百分比)]
def getCodeJingXuanBlocks(code):
def getCodeJingXuanBlocks(code, jx=True):
    data = f"a=GetStockIDPlate&apiv=w32&Type=2&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    result = response.text
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    return result.get("ListJX")
    # print(result)
    if jx:
        return result.get("ListJX") if result.get("ListJX") else result.get("List")
    else:
        fresults = []
        if result.get("List"):
            fresults.extend(result.get("List"))
        if result.get("ListJX"):
            fresults.extend(result.get("ListJX"))
        return fresults
# 获取自由流通市值
def getZYLTAmount(code):
    data = f"a=GetStockPanKou_Narrow&apiv=w32&c=StockL2Data&VerSion=5.8.0.2&State=1&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&StockID={code}&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if "real" in result:
        return result["real"].get("actualcirculation_value")
    return None
# 获取F10中的精选板块
def __getConceptJXBK(code):
    data = f"a=GetConceptJXBKw23&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&"
    result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if result:
        if "List" in result:
            names = [kpl_util.filter_block(x["CName"]) for x in result["List"]]
            return names if len(names) < 3 else names[:2]
    return []
# 获取F10常规板块
def __getConceptBK(code):
    data = f"a=GetConceptw23&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&"
    result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if result:
        if "List" in result:
            names = [kpl_util.filter_block(x["CName"]) for x in result["List"]]
            return names
    return []
# 获取代码的板块
def getCodeBlocks(code):
    blocks = []
    try:
        _bks = getCodeJingXuanBlocks(code)
        # 取前2个
        if _bks and len(_bks) > 2:
            _bks = _bks[:2]
        _bks = [x[1] for x in _bks]
        if _bks:
            blocks.extend(_bks)
    except:
        pass
    if not blocks:
        try:
            _bks = __getConceptBK(code)
            print(_bks)
            if _bks:
                blocks.extend(_bks)
        except:
            pass
    return list(set(blocks))
# 获取F10里面的大事件
def __getF10BigReminders(code):
    data = f"a=BigReminder&st=25&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb8-6d893c846e23&VerSion=5.8.0.2&Index=0&"
    result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if int(result["errcode"]) == 0:
        return result["info"]
    return None
# 获取股票减持日期
def getCodeReductionDate(code):
    infos = __getF10BigReminders(code)
    if not infos:
        return None
    keys = ["终止", "结束", "结果", "不减持", "完成"]
    for info in infos:
        if info.get("type") == 5:
            title = str(info.get("title"))
            if title.find("减持") < 0:
                continue
            wrong = False
            for k in keys:
                if title.find(k) >= 0:
                    wrong = True
            if wrong:
                continue
            return info.get("date").split(" ")[0]
    return None
def getZLJECodesRank(index):
    """
    获取主力金额的排行
    @return:
    """
    data = f"Order=1&a=RealRankingInfo_W8&st=50&c=NewStockRanking&PhoneOSNew=1&RStart=0925&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9773&VerSion=5.13.0.0&Isst=0&index={index}&Date=&REnd=1420&apiv=w35&Type=1&FilterMotherboard=0&Filter=0&Ratio=6&FilterTIB=1&FilterGem=0&"
    result = __base_request("https://apphwhq.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    return result
def getMarketStrong():
    """
    获取市场强度
    :return:
    """
    result = __base_request("https://apphwhq.longhuvip.com/w1/api/index.php",
                            data=f"a=DiskReview&apiv=w35&c=HomeDingPan&VerSion=5.13.0.0&PhoneOSNew=1&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9773&",
                            timeout=3)
    data = json.loads(result)
    return int(data["info"]["strong"])
def request_new_blocks_codes(blocks_info):
    """
    请求新板块的代码
    @param blocks_info:[(板块名称,板块代码)]
    @return:
    """
    yesterday_codes = set()
    for bi in blocks_info:
        result = getCodesByPlate(bi[1])
        result = json.loads(result)
        code_info_list = []
        for d in result["list"]:
            if d[0] in yesterday_codes:
                continue
            # 涨幅要大于5%
            rate = d[6] / int(round((tool.get_limit_up_rate(d[0]) - 1) * 10))
            if rate < 5:
                continue
            # 格式:(代码,涨幅)
            code_info_list.append((d[0], d[6]))
        if code_info_list:
            # 将代码加入新题材
            for x in code_info_list:
                print("添加", x)
if __name__ == "__main__":
    # result = getCodesByPlateOrderByLZCS("801074")
    # result = json.loads(result)
    # result = result["list"]
    # for r in result:
    #     print(r)
    # result = getCodeJingXuanBlocks("000756", True)
    # for x in result:
    result = getHistoryCodesByPlateOrderByLZCS("801074", "2025-05-16", "1025")
    print(result)
    # request_new_blocks_codes([("机器人", "801159")])
    # result = getCodesByPlate("801159")  # getHistoryLimitUpInfo("2024-02-19")
    # result = json.loads(result)
    # for d in result["list"]:
    #     print(d)
    #
    # print(result)