"""
|
热门板块监听
|
"""
|
import datetime
|
import json
|
import logging
|
import socket
|
import time
|
from selenium import webdriver
|
from selenium.webdriver.common.by import By
|
from selenium.webdriver.chrome.options import Options
|
|
|
def __parseData(driver):
|
time_ele = driver.find_element(by=By.TAG_NAME, value="time")
|
date_element = \
|
driver.find_element(by=By.ID, value="nuxt-layout-container").find_elements(by=By.TAG_NAME, value="time")[0]
|
date_elements = date_element.find_elements(by=By.TAG_NAME, value="span")
|
month = date_elements[0].text.split(' ')[0][:-1]
|
day = date_elements[1].text
|
# 获取当前的年
|
year = datetime.datetime.now().strftime("%Y")
|
day_str = "{0}-{1:0>2}-{2:0>2}".format(year, int(month), int(day))
|
|
items = driver.find_element(by=By.ID, value="nuxt-layout-container").find_element(by=By.CLASS_NAME,
|
value="topgainer-content-left").find_elements(
|
by=By.CLASS_NAME,
|
value="topgainer-tag")
|
data_list = []
|
for item in items:
|
print("----------------------")
|
header = item.find_element(by=By.TAG_NAME, value="section").find_element(by=By.TAG_NAME, value="header")
|
title = header.find_element(by=By.TAG_NAME, value="h3").text
|
total_rate = None
|
try:
|
total_rate = header.find_element(by=By.TAG_NAME, value="span").text
|
except:
|
pass
|
print(title, total_rate)
|
contents = item.find_element(by=By.TAG_NAME, value="div").find_element(by=By.TAG_NAME,
|
value="tbody").find_elements(
|
by=By.TAG_NAME, value="tr")
|
codes_list = []
|
for content in contents:
|
tds = content.find_elements(by=By.TAG_NAME, value="td")
|
code_name = tds[0].find_elements(by=By.TAG_NAME, value="span")[0].text
|
code = tds[0].find_elements(by=By.TAG_NAME, value="span")[1].text
|
limit_up_info = tds[1].text
|
price = tds[2].text
|
rate = tds[3].text
|
limit_up_time = tds[4].text
|
huanshou = tds[5].text
|
ltsz = tds[6].text
|
codes_list.append(((code_name, code), limit_up_info, price, rate, limit_up_time, huanshou, ltsz))
|
data_list.append((title, total_rate, codes_list))
|
print("----------------------")
|
|
return day_str, data_list
|
|
|
# 获取热门板块
|
def get_hot_block(callback):
|
# 先启动浏览器
|
options = Options()
|
chrome_path = "res/chromedriver.exe"
|
options.add_argument("--disable-blink-features")
|
options.add_argument("--disable-blink-features=AutomationControlled")
|
driver = webdriver.Chrome(chrome_path, options=options)
|
driver.get("https://xuangubao.cn/top-gainer")
|
time.sleep(5)
|
while True:
|
time.sleep(3)
|
time_str = datetime.datetime.now().strftime("%H%M%S")
|
# 交易时间才识别
|
# if int(time_str) < int("092500") or int(time_str) > int("150000"):
|
# continue
|
# 每天9点25到9点26刷新
|
if int("092500") < int(time_str) < int("092700"):
|
driver.refresh()
|
# if int("113000") < int(time_str) < int("130000"):
|
# continue
|
try:
|
day, result = __parseData(driver)
|
callback(day, result)
|
except Exception as e:
|
logging.exception(e)
|
|
|
def upload_data(day, datas):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = ("192.168.3.252", 9001) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": 70, "day": day, "data": datas}
|
client.send(json.dumps(data).encode("gbk"))
|
result = client.recv(1024)
|
client.close()
|
|
|
# 打包命令
|
# cd D:\workspace\trade\third_data
|
# C:\Users\Administrator\AppData\Roaming\Python\Python37\Scripts\pyinstaller.exe hot_block.spec
|
if __name__ == "__main__":
|
def callback(day, result):
|
upload_data(day, result)
|
pass
|
|
|
get_hot_block(callback)
|