""" 热门板块监听 """ import datetime import json import logging import socket import time from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options def __parseData(driver): time_ele = driver.find_element(by=By.TAG_NAME, value="time") date_element = \ driver.find_element(by=By.ID, value="nuxt-layout-container").find_elements(by=By.TAG_NAME, value="time")[0] date_elements = date_element.find_elements(by=By.TAG_NAME, value="span") month = date_elements[0].text.split(' ')[0][:-1] day = date_elements[1].text # 获取当前的年 year = datetime.datetime.now().strftime("%Y") day_str = "{0}-{1:0>2}-{2:0>2}".format(year, int(month), int(day)) items = driver.find_element(by=By.ID, value="nuxt-layout-container").find_element(by=By.CLASS_NAME, value="topgainer-content-left").find_elements( by=By.CLASS_NAME, value="topgainer-tag") data_list = [] for item in items: print("----------------------") header = item.find_element(by=By.TAG_NAME, value="section").find_element(by=By.TAG_NAME, value="header") title = header.find_element(by=By.TAG_NAME, value="h3").text total_rate = None try: total_rate = header.find_element(by=By.TAG_NAME, value="span").text except: pass print(title, total_rate) contents = item.find_element(by=By.TAG_NAME, value="div").find_element(by=By.TAG_NAME, value="tbody").find_elements( by=By.TAG_NAME, value="tr") codes_list = [] for content in contents: tds = content.find_elements(by=By.TAG_NAME, value="td") code_name = tds[0].find_elements(by=By.TAG_NAME, value="span")[0].text code = tds[0].find_elements(by=By.TAG_NAME, value="span")[1].text limit_up_info = tds[1].text price = tds[2].text rate = tds[3].text limit_up_time = tds[4].text huanshou = tds[5].text ltsz = tds[6].text codes_list.append(((code_name, code), limit_up_info, price, rate, limit_up_time, huanshou, ltsz)) data_list.append((title, total_rate, codes_list)) print("----------------------") return day_str, data_list # 获取热门板块 def get_hot_block(callback): # 先启动浏览器 options = Options() chrome_path = "res/chromedriver.exe" options.add_argument("--disable-blink-features") options.add_argument("--disable-blink-features=AutomationControlled") driver = webdriver.Chrome(chrome_path, options=options) driver.get("https://xuangubao.cn/top-gainer") time.sleep(5) while True: time.sleep(3) time_str = datetime.datetime.now().strftime("%H%M%S") # 交易时间才识别 # if int(time_str) < int("092500") or int(time_str) > int("150000"): # continue # 每天9点25到9点26刷新 if int("092500") < int(time_str) < int("092700"): driver.refresh() # if int("113000") < int(time_str) < int("130000"): # continue try: day, result = __parseData(driver) callback(day, result) except Exception as e: logging.exception(e) def upload_data(day, datas): client = socket.socket() # 生成socket,连接server ip_port = ("192.168.3.252", 9001) # server地址和端口号(最好是10000以后) client.connect(ip_port) data = {"type": 70, "day": day, "data": datas} client.send(json.dumps(data).encode("gbk")) result = client.recv(1024) client.close() # 打包命令 # cd D:\workspace\trade\third_data # C:\Users\Administrator\AppData\Roaming\Python\Python37\Scripts\pyinstaller.exe hot_block.spec if __name__ == "__main__": def callback(day, result): upload_data(day, result) pass get_hot_block(callback)