Administrator
2023-12-13 11a349588bd3a277ef87ff186cb338093c7287f3
third_data/kpl_api.py
@@ -1,27 +1,87 @@
import json
import time
import requests
import constant
from utils import middle_api_protocol
def __base_request(url, data):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
        "User-Agent": "Dalvik / 2.1.0(Linux;U;Android 6.0.1;MuMu Build/V417IR)"
    }
    # proxies={'https': '192.168.3.251:9002'}
    # 禁止代理,不然会走本地代理
    response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None})
    return response
# 竞价
DABAN_TYPE_BIDDING = 8
# 涨停
DABAN_TYPE_LIMIT_UP = 1
# 炸板
DABAN_TYPE_OPEN_LIMIT_UP = 2
# 跌停
DABAN_TYPE_LIMIT_DOWN = 3
# 曾跌停
DABAN_TYPE_EVER_LIMIT_DOWN = 5
def __base_request(url, data, timeout=60):
    DELEGATE = True
    if not DELEGATE:
        headers = {
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
            "User-Agent": "Dalvik / 2.1.0(Linux;U;Android 6.0.1;MuMu Build/V417IR)"
        }
        # proxies={'https': '192.168.3.251:9002'}
        # 禁止代理,不然会走本地代理
        response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None},
                                 timeout=timeout)
        if response.status_code != 200:
            raise Exception("请求出错")
        return response.text
    else:
        fdata = middle_api_protocol.load_kpl(url, data)
        return middle_api_protocol.request(fdata)
def daBanList(pidType, page_size=50, index=0):
    data = f"Order=1&a=DaBanList&st={page_size}&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23" \
           f"&VerSion=5.8.0.2&Index={index}&Is_st=1&PidType={pidType}&apiv=w32&Type=4&FilterMotherboard=0&Filter=0&FilterTIB=0" \
           "&FilterGem=0 "
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    return result
def getLimitUpInfo():
    list_ = []
    page_size = 50
    MAX_SIZE = 150
    for i in range(0, 10):
        result_str = daBanList(DABAN_TYPE_LIMIT_UP, page_size=page_size, index=len(list_))
        result = json.loads(result_str)
        temp_list = result["list"]
        list_ += temp_list
        if len(temp_list) < page_size:
            result['list'] = list_
            return json.dumps(result)
        elif len(list_) > MAX_SIZE:
            return json.dumps(result)
    return None
# 市场行情-行业
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                            data=data)
    return result
# 市场行情-精选
def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                            data=data)
    return result
# 获取代码的板块
def getStockIDPlate(code):
    data = f"a=GetStockIDPlate_New&apiv=w32&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    result = response.text
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    if int(result["errcode"]) != 0:
        return None
@@ -31,66 +91,103 @@
# 获取概念代码
def getCodesByPlate(plate_code):
    data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&old=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&IsZZ=0&Token=0&Index=0&apiv=w32&Type=6&IsKZZType=0&UserID=0&PlateID={plate_code}&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 获取概念中的板块强度
def getSonPlate(plate_code):
    data = f"a=SonPlate_Info&apiv=w32&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&PlateID={plate_code}&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
# 市场行情-行业
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 市场行情-精选
def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                              data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 获取代码的精选板块
# 返回格式:[(板块代码,板块名称,涨幅百分比)]
def getCodeJingXuanBlocks(code):
    data = f"a=GetStockIDPlate&apiv=w32&Type=2&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    result = response.text
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    return result.get("ListJX")
    print(result)
    return result.get("ListJX") if result.get("ListJX") else result.get("List")
# 获取自由流通市值
def getZYLTAmount(code):
    data = f"a=GetStockPanKou_Narrow&apiv=w32&c=StockL2Data&VerSion=5.8.0.2&State=1&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&StockID={code}&"
    response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    if response.status_code != 200:
        raise Exception("请求出错")
    result = response.text
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if "real" in result:
        return result["real"].get("actualcirculation_value")
    return None
# 获取F10中的精选板块
def __getConceptJXBK(code):
    data = f"a=GetConceptJXBKw23&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&"
    result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if result:
        if "List" in result:
            names = [x["CName"] for x in result["List"]]
            return names
    return []
# 获取F10常规板块
def __getConceptBK(code):
    data = f"a=GetConceptw23&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&"
    result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if result:
        if "List" in result:
            names = [x["CName"] for x in result["List"]]
            return names
    return []
# 获取代码的板块
def getCodeBlocks(code):
    blocks = []
    try:
        _bks = __getConceptJXBK(code)
        if _bks:
            blocks.extend(_bks)
    except:
        pass
    try:
        _bks = __getConceptBK(code)
        if _bks:
            blocks.extend(_bks)
    except:
        pass
    return list(set(blocks))
if __name__ == "__main__":
    print(getZYLTAmount("000333"))
    blocks = getCodeBlocks("600713")
    blocks1 = getCodeJingXuanBlocks("600713")
    if len(blocks) > 2:
        # 根据涨幅排序
        blocks.sort(key=lambda x: x[2])
        blocks.reverse()
        datas = []
        for b in blocks:
            if b[2] > 0 and b[1] not in constant.KPL_INVALID_BLOCKS:
                datas.append(b)
        print(datas)