import datetime import logging import time import pandas as pd import requests import json import requests from utils import tool # 竞价 from kpl import kpl_util DABAN_TYPE_BIDDING = 8 # 涨停 DABAN_TYPE_LIMIT_UP = 1 # 炸板 DABAN_TYPE_OPEN_LIMIT_UP = 2 # 跌停 DABAN_TYPE_LIMIT_DOWN = 3 # 曾跌停 DABAN_TYPE_EVER_LIMIT_DOWN = 5 def __base_request(url, data): headers = { "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "User-Agent": "Dalvik / 2.1.0(Linux;U;Android 6.0.1;MuMu Build/V417IR)" } # proxies={'https': '192.168.3.251:9002'} # 禁止代理,不然会走本地代理 response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None}) return response def daBanList(pidType): data = "Order=1&a=DaBanList&st=100&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23" \ f"&VerSion=5.8.0.2&Index=0&Is_st=1&PidType={pidType}&apiv=w32&Type=4&FilterMotherboard=0&Filter=0&FilterTIB=0" \ "&FilterGem=0 " response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) if response.status_code != 200: raise Exception("请求出错") return response.text def getFengKouList(day): data = f"c=StockFengKData&a=GetFengKList&Index=0&st=500&Order=17&Day={day}&Time=&DeviceID=a38adabd-99ef-3116" \ "-8bb9-6d893c846e23 " response = __base_request("https://apphq.longhuvip.com/w1/api/index.php?apiv=w32&PhoneOSNew=1&VerSion=5.8.0.2", data=data) if response.status_code != 200: raise Exception("请求出错") return response.text def getFengXiangBiao(): data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&old=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&IsZZ=0&Token=0&Index=0&apiv=w32&Type=6&IsKZZType=0&UserID=0&PlateID=801225&" response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) if response.status_code != 200: raise Exception("请求出错") return response.text def getWeiTuo_W14(code, index): data = f"a=GetWeiTuo_W14&st=200&c=StockL2Data&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&Index={index}&Tur=30&apiv=w32&Type=3&Vol=500&StockID={code}&VType=1&UserID=0&VOrder=0&" response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) if response.status_code != 200: raise Exception("请求出错") return response.text # 获取行业涨幅 def getIndustryRealRankingInfo(): data = f"Order=1&a=RealRankingInfo&st=20&apiv=w32&Type=2&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&" response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) if response.status_code != 200: raise Exception("请求出错") return response.text # 市场行情-行业 def getMarketIndustryRealRankingInfo(orderJingE_DESC=True): data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&" response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) if response.status_code != 200: raise Exception("请求出错") return response.text # 市场行情-精选 def getMarketJingXuanRealRankingInfo(page=1, orderJingE_DESC=True): data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=50&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index={page - 1}&ZSType=7&" response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) if response.status_code != 200: raise Exception("请求出错") return response.text # 获取代码的概念 def getStockIDPlate(code): data = f"a=GetStockIDPlate_New&apiv=w32&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&" response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) if response.status_code != 200: raise Exception("请求出错") return response.text # 获取概念代码 def getCodesByPlate(plate): data = f"Order=1&a=ZhiShuStockList_W8&st=100&c=ZhiShuRanking&PhoneOSNew=1&old=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&IsZZ=0&Token=0&Index=0&apiv=w32&Type=6&IsKZZType=0&UserID=0&PlateID={plate}&" response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) if response.status_code != 200: raise Exception("请求出错") return response.text # 获取概念中的板块强度 def getSonPlate(plate): data = f"a=SonPlate_Info&apiv=w32&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&PlateID={plate}&" response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) if response.status_code != 200: raise Exception("请求出错") return response.text # 获取涨幅排行 def get_jingxuan_block(code): data = f"a=GetStockIDPlate&apiv=w32&Type=2&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&" response = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) if response.status_code != 200: raise Exception("请求出错") return response.text def __getLimitUpInfo(pidType, page, pageSize): data = f"Order=0&a=DailyLimitPerformance&st={pageSize}&apiv=w35&Type=4&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabb-99ef-3116-8bb9-6d893c846e24&VerSion=5.13.0.0&Index={(page - 1) * pageSize}&PidType={pidType}&" result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) return result.text def getLimitUpInfoNew(): pids = [(1, "首板"), (2, "2连板"), (3, "3连板"), (4, "4连板"), (5, "")] fresults = [] for pid_info in pids: results = [] for i in range(10): start_time = time.time() result = __getLimitUpInfo(pid_info[0], i + 1, 20) print("请求用时", time.time() - start_time) result = json.loads(result) datas = result["info"][0] results.extend(datas) day = result["info"][1] if len(datas) < 20: break for r in results: if not r[18] and pid_info[1]: r[18] = pid_info[1] # 替换掉板块名称 for i in range(len(r)): if type(r[i]) == str: r[i] = kpl_util.filter_block(r[i]) fresults.extend(results) return json.dumps({"errcode": 0, "list": fresults}) # 数据格式: # SJZT:实际涨停 SJDT:实际跌停 SZJS:涨数量 ZT:涨停 DT:跌停 XDJS:跌数量 sign:人气概述 def getMarketFelling(): result = __base_request("https://apphwhq.longhuvip.com/w1/api/index.php", f"a=ZhangFuDetail&apiv=w35&c=HomeDingPan&PhoneOSNew=1&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9774&VerSion=5.13.0.0&") data = result.text data = json.loads(data) return data["info"] def getMarketStrong(): """ 获取市场强度 :return: """ result = __base_request("https://apphwhq.longhuvip.com/w1/api/index.php", f"a=DiskReview&apiv=w35&c=HomeDingPan&VerSion=5.13.0.0&PhoneOSNew=1&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9773&") data = result.text data = json.loads(data) return int(data["info"]["strong"]) def changeStatistics(): """ 获取市场强度 :return: """ result = __base_request("https://apphwhq.longhuvip.com/w1/api/index.php", f"a=ChangeStatistics&apiv=w35&c=HomeDingPan&PhoneOSNew=1&UserID=0&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9774&VerSion=5.13.0.0&Token=0&") data = result.text data = json.loads(data) return data["info"][0] def test_l2(): code = "600981" count = 0 while True: result = getWeiTuo_W14(code, count) result = json.loads(result) new_count = result["Count"] list_ = result["List"] if new_count != count: print(tool.get_now_time_str(), len(list_), result) count = new_count if __name__ == '__main__': print(changeStatistics())