Administrator
2024-10-12 aacc6148dd43a9cffbff9a23a273a55b64bf3d8c
third_data/kpl_api.py
@@ -19,7 +19,7 @@
DABAN_TYPE_EVER_LIMIT_DOWN = 5
def __base_request(url, data, timeout=60):
def __base_request(url, data, timeout=10):
    DELEGATE = True
    if not DELEGATE:
        headers = {
@@ -34,7 +34,7 @@
            raise Exception("请求出错")
        return response.text
    else:
        fdata = middle_api_protocol.load_kpl(url, data)
        fdata = middle_api_protocol.load_kpl(url, data, timeout)
        return middle_api_protocol.request(fdata)
@@ -47,7 +47,7 @@
def __getLimitUpInfo(pidType, page, pageSize):
    data = f"Order=0&a=DailyLimitPerformance&st={pageSize}&apiv=w35&Type=4&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.13.0.0&Index={(page - 1) * pageSize}&PidType={pidType}&"
    data = f"Order=0&a=DailyLimitPerformance&st={pageSize}&apiv=w35&Type=4&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e24&VerSion=5.13.0.0&Index={(page - 1) * pageSize}&PidType={pidType}&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    return result
@@ -57,18 +57,22 @@
    fresults = []
    for pid_info in pids:
        results = []
        for i in range(10):
        for i in range(100):
            start_time = time.time()
            result = __getLimitUpInfo(pid_info[0], i + 1, 20)
            result = json.loads(result)
            datas = result["info"][0]
            results.extend(datas)
            day = result["info"][1]
            print(datas)
            if len(datas) < 20:
                break
        for r in results:
            if not r[18] and pid_info[1]:
                r[18] = pid_info[1]
            # 替换掉板块名称
            for i in range(len(r)):
                if type(r[i]) == str:
                    r[i] = kpl_util.filter_block(r[i])
        fresults.extend(results)
    return json.dumps({"errcode": 0, "list": fresults})
@@ -93,7 +97,7 @@
def getHistoryLimitUpInfo(day):
    fresults = []
    for i in range(0, 5):
    for i in range(0, 100):
        data = f"Order=1&a=HisDaBanList&st=20&c=HisHomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index={i * 20}&Is_st=1&PidType=1&apiv=w32&Type=6&FilterMotherboard=0&Filter=0&FilterTIB=0&Day={day}&FilterGem=0&"
        result = __base_request("https://apphis.longhuvip.com/w1/api/index.php", data=data)
        result = json.loads(result)
@@ -159,12 +163,20 @@
# 获取代码的精选板块
# 返回格式:[(板块代码,板块名称,涨幅百分比)]
def getCodeJingXuanBlocks(code):
def getCodeJingXuanBlocks(code, jx=True):
    data = f"a=GetStockIDPlate&apiv=w32&Type=2&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    print(result)
    return result.get("ListJX") if result.get("ListJX") else result.get("List")
    # print(result)
    if jx:
        return result.get("ListJX") if result.get("ListJX") else result.get("List")
    else:
        fresults = []
        if result.get("List"):
            fresults.extend(result.get("List"))
        if result.get("ListJX"):
            fresults.extend(result.get("ListJX"))
        return fresults
# 获取自由流通市值
@@ -184,8 +196,8 @@
    result = json.loads(result)
    if result:
        if "List" in result:
            names = [x["CName"].replace("概念", "") for x in result["List"]]
            return names
            names = [kpl_util.filter_block(x["CName"]) for x in result["List"]]
            return names if len(names) < 3 else names[:2]
    return []
@@ -196,7 +208,7 @@
    result = json.loads(result)
    if result:
        if "List" in result:
            names = [x["CName"].replace("概念", "") for x in result["List"]]
            names = [kpl_util.filter_block(x["CName"]) for x in result["List"]]
            return names
    return []
@@ -205,18 +217,23 @@
def getCodeBlocks(code):
    blocks = []
    try:
        _bks = __getConceptJXBK(code)
        _bks = getCodeJingXuanBlocks(code)
        # 取前2个
        if _bks and len(_bks) > 2:
            _bks = _bks[:2]
        _bks = [x[1] for x in _bks]
        if _bks:
            blocks.extend(_bks)
    except:
        pass
    try:
        _bks = __getConceptBK(code)
        if _bks:
            blocks.extend(_bks)
    except:
        pass
    if not blocks:
        try:
            _bks = __getConceptBK(code)
            print(_bks)
            if _bks:
                blocks.extend(_bks)
        except:
            pass
    return list(set(blocks))
@@ -252,7 +269,11 @@
if __name__ == "__main__":
    data = f"Order=0&a=DailyLimitPerformance&st=20&apiv=w35&Type=4&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.13.0.0&Index=0&PidType={5}&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    print(result)
    print(getLimitUpInfoNew())
    # __getConceptBK("300564")
    # data = (getMarketJingXuanRealRankingInfo())
    # data=json.loads(data)
    # print(len(data["list"]))
    # data = json.loads(getCodesByPlate("801235"))
    # print(data)