import json
|
import os
|
import random
|
import threading
|
import time
|
|
from selenium import webdriver
|
from selenium.webdriver.chrome.options import Options
|
from selenium.webdriver.chrome.service import Service
|
from selenium.webdriver.common.by import By
|
from selenium.webdriver.support.wait import WebDriverWait
|
from selenium.webdriver.support import expected_conditions as EC
|
|
import comment_manager
|
import constant
|
import setting
|
from video_manager import VideoManger
|
|
|
class CommentManager:
|
|
def __init__(self):
|
self.options = Options()
|
self.options.add_argument("--disable-blink-features")
|
self.options.add_argument("--disable-blink-features=AutomationControlled")
|
|
# driver = webdriver.Chrome(options=options)
|
# 另外一种方式
|
# 谷歌浏览器位置
|
chrome_location = setting.get_chrome_path()
|
# 谷歌浏览器驱动地址
|
chromedriver_path = setting.get_chromedriver_path()
|
self.options.binary_location = chrome_location # 指定chrome的路径
|
self.service = Service(chromedriver_path)
|
# 获取正则表达式
|
comment_template_str = ""
|
# 将中文替换为正则表达式支持的unicode编码
|
comment_template_str = comment_template_str.encode('unicode_escape').decode("utf-8")
|
self.comment_templates = [(x.split("#")[0], x.split("#")[1]) for x in comment_template_str.split("\n") if
|
x.find("#") >= 0]
|
self.driver = None
|
self.break_excute = False # 中断执行
|
|
def break_excute(self):
|
"""
|
中断执行
|
:return:
|
"""
|
print("=====中断执行=====")
|
self.break_excute = True
|
|
def __init(self):
|
self.break_excute = False
|
if not self.driver:
|
self.driver = webdriver.Chrome(service=self.service, options=self.options)
|
self.driver.get(
|
"https://channels.weixin.qq.com/platform/comment?isImageMode=0")
|
else:
|
self.driver.refresh()
|
|
def start_process_comment(self):
|
"""
|
开始处理评论
|
:return:
|
"""
|
while True:
|
try:
|
self.__start_excute_task()
|
except:
|
pass
|
finally:
|
time.sleep(constant.REFRESH_TIME_SPACE)
|
|
def __start_excute_task(self):
|
"""
|
开始执行任务
|
:return:
|
"""
|
self.__init()
|
# 等到左侧菜单出现后才能执行后续操作
|
wait = WebDriverWait(self.driver, 1000000) # 最多等待10秒
|
element = wait.until(EC.visibility_of_element_located((By.ID, "side-bar")))
|
self.process_videos(self.driver)
|
|
def __process_like(self, video_name, video_date, comment_element):
|
"""
|
处理单条评论的赞
|
:param comment_content:
|
:param comment_element:
|
:return:
|
"""
|
user_name, comment_content, comment_time = self.__parse_comment_info(comment_element)
|
if not comment_manager.is_need_click_like(video_name, video_date, user_name, comment_content):
|
return
|
|
comment_actions = comment_element.find_element(By.CLASS_NAME, "action-list").find_elements(By.CLASS_NAME,
|
"action-item")
|
classes: str = comment_actions[0].find_element(By.TAG_NAME, "svg").get_attribute("class")
|
if classes and classes.find("fill-like") >= 0:
|
# 已经点过赞了
|
return
|
like = comment_element.find_element(By.CLASS_NAME, "like-action")
|
like.click()
|
# self.driver.implicitly_wait(2)
|
time.sleep(2)
|
|
def __parse_comment_info(self, comment_element):
|
"""
|
解析评论信息
|
:param comment_element:
|
:return:(评论人,评论内容,评论时间)
|
"""
|
user_name = comment_element.find_element(By.CLASS_NAME, "comment-user-name").get_attribute("innerHTML")
|
comment_time = comment_element.find_element(By.CLASS_NAME, "comment-time").get_attribute("innerHTML")
|
try:
|
comment_content = comment_element.find_element(By.CLASS_NAME, "comment-content").get_attribute("innerHTML")
|
except:
|
comment_content = ''
|
return user_name, comment_content, comment_time
|
|
def __parse_video_info(self, video_element):
|
"""
|
解析视频信息
|
:param comment_element:
|
:return:(视频名称,视频时间,评论数量)
|
"""
|
video_time = video_element.find_element(By.CLASS_NAME, "feed-time").get_attribute("innerHTML")
|
comment_count = video_element.find_element(By.CLASS_NAME, "feed-comment-total").find_element(By.XPATH,
|
"./span[2]").get_attribute(
|
"innerHTML")
|
video_name = video_element.find_element(By.CLASS_NAME, "feed-title").get_attribute("innerHTML")
|
video_title = video_name
|
if video_title.find("#") >= 0:
|
video_title = video_title[:video_title.find("#")]
|
return video_name, video_time, comment_count, video_title
|
|
def __process_reply(self, video_info, comment_element):
|
"""
|
处理单条评论回复
|
:param video_name:(video_name, video_date, comment_count, video_title)
|
:param comment_element:
|
:return: 是否需要点赞
|
"""
|
|
def get_reply_comment(nick_name, content):
|
if not setting.is_reply_comment():
|
return None
|
# 内容是否符合标准
|
return comment_manager.get_replay_content(video_info, nick_name, content)
|
|
comment_user_name, comment_content, comment_time = self.__parse_comment_info(comment_element)
|
|
try:
|
comment_reply_list = comment_element.find_element(By.CLASS_NAME, "comment-reply-list")
|
if comment_reply_list.find_element(By.XPATH, ".//div[normalize-space()='作者']"):
|
print("已经回复。。。。")
|
return False
|
except:
|
print("还没回复。。。。")
|
|
replay_content = get_reply_comment(comment_user_name, comment_content)
|
if not replay_content:
|
# 不需要评论
|
print("不需要回复。。。。")
|
return True
|
|
# 需要回复
|
|
comment_actions = comment_element.find_element(By.CLASS_NAME, "action-list").find_elements(By.CLASS_NAME,
|
"action-item")
|
# 点击评论
|
comment_actions[1].click()
|
wait = WebDriverWait(self.driver, 10) # 最多等待10秒
|
wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "comment-create-content")))
|
time.sleep(1)
|
self.driver.find_element(By.CLASS_NAME,
|
"comment-create-content").find_element(By.TAG_NAME,
|
"textarea").send_keys(replay_content)
|
self.driver.implicitly_wait(1)
|
self.driver.find_element(By.CLASS_NAME,
|
"comment-create-content").find_element(By.XPATH, "div[3]/div[2]").click()
|
time.sleep(2)
|
time.sleep(random.randint(constant.COMMENT_REPLY_SPACE_TIME_MIN, constant.COMMENT_REPLY_SPACE_TIME_MAX))
|
return False
|
|
def __process_comments(self, driver, start_index=0, video_info=None):
|
"""
|
|
:param driver:
|
:param start_index:
|
:param video_info:(视频内容, 视频日期, 评论次数, 视频标题)
|
:return:
|
"""
|
scroll_list = driver.find_element(By.CLASS_NAME, "feed-comment__wrp")
|
try:
|
loadmore = scroll_list.find_element(By.CLASS_NAME, "loadmore__dot")
|
# 获取父控件
|
loadmore = loadmore.find_element(By.XPATH, "./..")
|
has_more = loadmore.value_of_css_property("display") == "none"
|
except:
|
has_more = False
|
comments = scroll_list.find_elements(By.XPATH, "div[2]/div/div/div[contains(@class,'comment-item')]")
|
print("评论条数", len(comments))
|
|
for index in range(start_index, len(comments)):
|
if self.break_excute:
|
return
|
comment = comments[index]
|
# 评论内容
|
user_name, comment_content, comment_time = self.__parse_comment_info(comment)
|
if not comment_content:
|
comment_content = ''
|
print("=======评论内容:", comment_content)
|
need_like = self.__process_reply(video_info, comment)
|
if need_like:
|
print("需要点赞")
|
self.__process_like(video_info[0], video_info[1], comment)
|
else:
|
print("不需要点赞")
|
# 往下滑动
|
if has_more:
|
driver.execute_script(
|
"var __comment_scroll = document.getElementsByClassName('feed-comment__wrp')[0]; __comment_scroll.scrollTop = __comment_scroll.scrollHeight;")
|
print("往下滚动")
|
time.sleep(3)
|
self.__process_comments(driver, start_index=len(comments), video_info=video_info)
|
else:
|
print("没有更多评论了, 评论数量:", len(comments))
|
|
def process_videos(self, driver: webdriver.Chrome, start_index=0):
|
wait = WebDriverWait(driver, 100) # 最多等待10秒
|
scroll_list = wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "feeds-container")))
|
time.sleep(1)
|
# 查找视频列表
|
videos_root = driver.find_element(By.CLASS_NAME, "feeds-container")
|
videos = videos_root.find_elements(By.CLASS_NAME, "comment-feed-wrap")
|
for i in range(start_index, len(videos)):
|
if self.break_excute:
|
return
|
# 选择视频
|
video = videos[i]
|
# 解析视频内容
|
video_info = self.__parse_video_info(video)
|
# 判断是否要点击进去
|
if not VideoManger().is_need_click(video_info):
|
continue
|
video.click()
|
driver.implicitly_wait(2)
|
self.__process_comments(driver, video_info=video_info)
|
# 处理一个视频
|
time.sleep(constant.VIDEO_CLICK_SPACE_TIME)
|
|
loadmore = scroll_list.find_element(By.CLASS_NAME, "loadmore__dot")
|
# 获取父控件
|
loadmore = loadmore.find_element(By.XPATH, "./..")
|
has_more = loadmore.value_of_css_property("display") == "none"
|
if has_more:
|
driver.execute_script(
|
"var __video_scroll = document.getElementsByClassName('feeds-container')[0]; __video_scroll.scrollTop = __video_scroll.scrollHeight;")
|
print("往下滚动")
|
time.sleep(3)
|
self.process_videos(driver, start_index=len(videos))
|
else:
|
print("没有更多视频了")
|
VideoManger().add_video_infos([self.__parse_video_info(v) for v in videos])
|
|
def close(self):
|
if self.driver:
|
try:
|
self.driver.close()
|
except:
|
pass
|
|
def __save_cookie(self):
|
cookies = self.driver.get_cookies()
|
with open("datas/cookies.json", "w") as f:
|
json.dump(cookies, f)
|
|
def __fill_cookie(self):
|
if os.path.exists("datas/cookies.json"):
|
with open("datas/cookies.json", "r") as f:
|
cookies = json.load(f)
|
for cookie in cookies:
|
self.driver.add_cookie(cookie)
|
|
|
if __name__ == "__main__":
|
# CommentManager().start_process_comment()
|
s = "你好"
|
print(s.encode('unicode_escape').decode("utf-8"))
|
|
# if __name__ == "__main__":
|
# data = """
|
# 213<img src="https://res.wx.qq.com/mpres/zh_CN/htmledition/comm_htmledition/images/pic/common/pic_blank.gif" alt="[呲牙]" class="we-emoji we-emoji__Grin"><img src="https://res.wx.qq.com/mpres/zh_CN/htmledition/comm_htmledition/images/pic/common/pic_blank.gif" alt="[呲牙]" class="we-emoji we-emoji__Grin"><img src="https://res.wx.qq.com/mpres/zh_CN/htmledition/comm_htmledition/images/pic/common/pic_blank.gif" alt="[呲牙]" class="we-emoji we-emoji__Grin">
|
# """
|
#
|
# result = re.compile(r'[\u4e00-\u9fa5]+').search(re.sub("<img.*?>", "", data))
|
# print(result)
|