import json import time import requests import constant from third_data import kpl_util from utils import middle_api_protocol, tool # 竞价 DABAN_TYPE_BIDDING = 8 # 涨停 DABAN_TYPE_LIMIT_UP = 1 # 炸板 DABAN_TYPE_OPEN_LIMIT_UP = 2 # 跌停 DABAN_TYPE_LIMIT_DOWN = 3 # 曾跌停 DABAN_TYPE_EVER_LIMIT_DOWN = 5 def __base_request(url, data, timeout=10): DELEGATE = True if not DELEGATE: headers = { "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "User-Agent": "Dalvik / 2.1.0(Linux;U;Android 6.0.1;MuMu Build/V417IR)" } # proxies={'https': '192.168.3.251:9002'} # 禁止代理,不然会走本地代理 response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None}, timeout=timeout) if response.status_code != 200: raise Exception("请求出错") return response.text else: fdata = middle_api_protocol.load_kpl(url, data, timeout) return middle_api_protocol.request(fdata) def daBanList(pidType, page_size=50, index=0): data = f"Order=1&a=DaBanList&st={page_size}&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e24" \ f"&VerSion=5.8.0.2&Index={index}&Is_st=1&PidType={pidType}&apiv=w32&Type=4&FilterMotherboard=0&Filter=0&FilterTIB=0" \ "&FilterGem=0" result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) return result def __getLimitUpInfo(pidType, page, pageSize): data = f"Order=0&a=DailyLimitPerformance&st={pageSize}&apiv=w35&Type=4&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e24&VerSion=5.13.0.0&Index={(page - 1) * pageSize}&PidType={pidType}&" result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) return result def getLimitUpInfoNew(): pids = [(1, "首板"), (2, "2连板"), (3, "3连板"), (4, "4连板"), (5, "")] fresults = [] day = '' for pid_info in pids: results = [] for i in range(100): start_time = time.time() result = __getLimitUpInfo(pid_info[0], i + 1, 20) result = json.loads(result) datas = result["info"][0] results.extend(datas) day = result["info"][1] if len(datas) < 20: break for r in results: if not r[18] and pid_info[1]: r[18] = pid_info[1] # 替换掉板块名称 for i in range(len(r)): if type(r[i]) == str: r[i] = kpl_util.filter_block(r[i]) fresults.extend(results) return json.dumps({"errcode": 0, "list": fresults, "day": day}) def getLimitUpInfo(): list_ = [] page_size = 20 MAX_SIZE = 150 for i in range(0, 10): result_str = daBanList(DABAN_TYPE_LIMIT_UP, page_size=page_size, index=len(list_)) result = json.loads(result_str) temp_list = result["list"] list_ += temp_list if len(temp_list) < page_size: result['list'] = list_ return json.dumps(result) elif len(list_) > MAX_SIZE: return json.dumps(result) return None def getHistoryLimitUpInfo(day): fresults = [] for i in range(0, 100): data = f"Order=1&a=HisDaBanList&st=20&c=HisHomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index={i * 20}&Is_st=1&PidType=1&apiv=w32&Type=6&FilterMotherboard=0&Filter=0&FilterTIB=0&Day={day}&FilterGem=0&" result = __base_request("https://apphis.longhuvip.com/w1/api/index.php", data=data) result = json.loads(result) result = result["list"] fresults.extend(result) if len(result) < 20: break return fresults # 市场行情-行业 def getMarketIndustryRealRankingInfo(orderJingE_DESC=True): data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&" result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) return result # 市场行情-精选 def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True): data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&" result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) return result def getMarketJingXuanRealRankingInfoByTimeRange(startTime, endTime, date, orderJingE_DESC=True): data = f"Order={1 if orderJingE_DESC else 0}&st=30&a=RealRankingInfo&apiv=w35&Type=5&Index=0&RStart={startTime}&c=ZhiShuRanking&VerSion=5.13.0.0&REnd={endTime}&Date={date}&PhoneOSNew=1&ZSType=7&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9773&" result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) return result # 获取代码的板块 def getStockIDPlate(code): data = f"a=GetStockIDPlate_New&apiv=w32&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&" result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) result = json.loads(result) if int(result["errcode"]) != 0: return None return result["ListJX"] if result["ListJX"] else result["List"] # 获取概念代码 def getCodesByPlate(plate_code): data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&old=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&IsZZ=0&Token=0&Index=0&apiv=w32&Type=6&IsKZZType=0&UserID=0&PlateID={plate_code}&" return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) def getCodesByPlateOrderByLZCS(plate_code): """ 根据领涨次数排序 @param plate_code: @return: """ data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&old=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&IsZZ=0&Token=0&Index=0&apiv=w32&Type=27&IsKZZType=0&UserID=0&PlateID={plate_code}&" return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) def getHistoryCodesByPlateOrderByLZCS(plate_code, date, time_str): """ 根据领涨次数排序 @param date: 2025-05-25 @param time_str: 1025 @param plate_code: 110505 @return: """ data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&RStart=0925&old=1&DeviceID=b692e51c-1bc4-3e8c-a01b-620aa6240e28&VerSion=5.8.0.4&IsZZ=0&Token=0&Index=0&Date={date}&REnd={time_str}&apiv=w33&Type=1&IsKZZType=0&UserID=0&PlateID={plate_code}&" return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) # 获取概念中的板块强度 def getSonPlate(plate_code): data = f"a=SonPlate_Info&apiv=w32&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&PlateID={plate_code}&" return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) # 市场行情-行业 def getMarketIndustryRealRankingInfo(orderJingE_DESC=True): data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&" return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) # 市场行情-精选 def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True): data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&" return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) # 获取代码的精选板块 # 返回格式:[(板块代码,板块名称,涨幅百分比)] def getCodeJingXuanBlocks(code, jx=True): data = f"a=GetStockIDPlate&apiv=w32&Type=2&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&" result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data) result = json.loads(result) # print(result) if jx: return result.get("ListJX") if result.get("ListJX") else result.get("List") else: fresults = [] if result.get("List"): fresults.extend(result.get("List")) if result.get("ListJX"): fresults.extend(result.get("ListJX")) return fresults # 获取自由流通市值 def getZYLTAmount(code): data = f"a=GetStockPanKou_Narrow&apiv=w32&c=StockL2Data&VerSion=5.8.0.2&State=1&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&StockID={code}&" result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data, timeout=3) result = json.loads(result) if "real" in result: return result["real"].get("actualcirculation_value") return None # 获取F10中的精选板块 def __getConceptJXBK(code): data = f"a=GetConceptJXBKw23&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&" result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3) result = json.loads(result) if result: if "List" in result: names = [kpl_util.filter_block(x["CName"]) for x in result["List"]] return names if len(names) < 3 else names[:2] return [] # 获取F10常规板块 def __getConceptBK(code): data = f"a=GetConceptw23&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&" result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3) result = json.loads(result) if result: if "List" in result: names = [kpl_util.filter_block(x["CName"]) for x in result["List"]] return names return [] # 获取代码的板块 def getCodeBlocks(code): blocks = [] try: _bks = getCodeJingXuanBlocks(code) # 取前2个 if _bks and len(_bks) > 2: _bks = _bks[:2] _bks = [x[1] for x in _bks] if _bks: blocks.extend(_bks) except: pass if not blocks: try: _bks = __getConceptBK(code) print(_bks) if _bks: blocks.extend(_bks) except: pass return list(set(blocks)) # 获取F10里面的大事件 def __getF10BigReminders(code): data = f"a=BigReminder&st=25&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb8-6d893c846e23&VerSion=5.8.0.2&Index=0&" result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3) result = json.loads(result) if int(result["errcode"]) == 0: return result["info"] return None # 获取股票减持日期 def getCodeReductionDate(code): infos = __getF10BigReminders(code) if not infos: return None keys = ["终止", "结束", "结果", "不减持", "完成"] for info in infos: if info.get("type") == 5: title = str(info.get("title")) if title.find("减持") < 0: continue wrong = False for k in keys: if title.find(k) >= 0: wrong = True if wrong: continue return info.get("date").split(" ")[0] return None def getZLJECodesRank(index): """ 获取主力金额的排行 @return: """ data = f"Order=1&a=RealRankingInfo_W8&st=50&c=NewStockRanking&PhoneOSNew=1&RStart=0925&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9773&VerSion=5.13.0.0&Isst=0&index={index}&Date=&REnd=1420&apiv=w35&Type=1&FilterMotherboard=0&Filter=0&Ratio=6&FilterTIB=1&FilterGem=0&" result = __base_request("https://apphwhq.longhuvip.com/w1/api/index.php", data=data, timeout=3) result = json.loads(result) return result def getMarketStrong(): """ 获取市场强度 :return: """ result = __base_request("https://apphwhq.longhuvip.com/w1/api/index.php", data=f"a=DiskReview&apiv=w35&c=HomeDingPan&VerSion=5.13.0.0&PhoneOSNew=1&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9773&", timeout=3) data = json.loads(result) return int(data["info"]["strong"]) def request_new_blocks_codes(blocks_info): """ 请求新板块的代码 @param blocks_info:[(板块名称,板块代码)] @return: """ yesterday_codes = set() for bi in blocks_info: result = getCodesByPlate(bi[1]) result = json.loads(result) code_info_list = [] for d in result["list"]: if d[0] in yesterday_codes: continue # 涨幅要大于5% rate = d[6] / int(round((tool.get_limit_up_rate(d[0]) - 1) * 10)) if rate < 5: continue # 格式:(代码,涨幅) code_info_list.append((d[0], d[6])) if code_info_list: # 将代码加入新题材 for x in code_info_list: print("添加", x) if __name__ == "__main__": # result = getCodesByPlateOrderByLZCS("801074") # result = json.loads(result) # result = result["list"] # for r in result: # print(r) # result = getCodeJingXuanBlocks("000756", True) # for x in result: result = getHistoryCodesByPlateOrderByLZCS("801074", "2025-05-16", "1025") print(result) # request_new_blocks_codes([("机器人", "801159")]) # result = getCodesByPlate("801159") # getHistoryLimitUpInfo("2024-02-19") # result = json.loads(result) # for d in result["list"]: # print(d) # # print(result)