import threading
import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import comment_manager
import setting
from video_manager import VideoManger
class CommentManager:
def __init__(self):
self.options = Options()
self.options.add_argument("--disable-blink-features")
self.options.add_argument("--disable-blink-features=AutomationControlled")
# driver = webdriver.Chrome(options=options)
# 另外一种方式
# 谷歌浏览器位置
chrome_location = r'C:\Program Files (x86)\Google\Chrome\Application\chrome.exe'
# 谷歌浏览器驱动地址
chromedriver_path = r'chromedriver.exe'
self.options.binary_location = chrome_location # 指定chrome的路径
self.service = Service(chromedriver_path)
# 获取正则表达式
comment_template_str = ""
# 将中文替换为正则表达式支持的unicode编码
comment_template_str = comment_template_str.encode('unicode_escape').decode("utf-8")
self.comment_templates = [(x.split("#")[0], x.split("#")[1]) for x in comment_template_str.split("\n") if
x.find("#") >= 0]
self.driver = None
def __init(self):
if not self.driver:
self.driver = webdriver.Chrome(service=self.service, options=self.options)
def start_process_comment(self):
"""
开始处理评论
:return:
"""
self.__init()
self.driver.get(
"https://channels.weixin.qq.com/platform/comment?isImageMode=0")
# 等到左侧菜单出现后才能执行后续操作
wait = WebDriverWait(self.driver, 100) # 最多等待10秒
element = wait.until(EC.visibility_of_element_located((By.ID, "side-bar")))
threading.Thread(target=lambda: self.process_videos(self.driver), daemon=True).start()
def __process_like(self, video_name, video_date, comment_element):
"""
处理单条评论的赞
:param comment_content:
:param comment_element:
:return:
"""
user_name, comment_content, comment_time = self.__parse_comment_info(comment_element)
if not comment_manager.is_need_click_like(video_name, video_date, user_name, comment_content):
return
comment_actions = comment_element.find_element(By.CLASS_NAME, "action-list").find_elements(By.CLASS_NAME,
"action-item")
classes: str = comment_actions[0].find_element(By.TAG_NAME, "svg").get_attribute("class")
if classes and classes.find("fill-like") >= 0:
# 已经点过赞了
return
like = comment_element.find_element(By.CLASS_NAME, "like-action")
like.click()
# self.driver.implicitly_wait(2)
time.sleep(2)
def __parse_comment_info(self, comment_element):
"""
解析评论信息
:param comment_element:
:return:(评论人,评论内容,评论时间)
"""
user_name = comment_element.find_element(By.CLASS_NAME, "comment-user-name").get_attribute("innerHTML")
comment_time = comment_element.find_element(By.CLASS_NAME, "comment-time").get_attribute("innerHTML")
comment_content = comment_element.find_element(By.CLASS_NAME, "comment-content").get_attribute("innerHTML")
return user_name, comment_content, comment_time
def __parse_video_info(self, video_element):
"""
解析视频信息
:param comment_element:
:return:(视频名称,视频时间,评论数量)
"""
video_time = video_element.find_element(By.CLASS_NAME, "feed-time").get_attribute("innerHTML")
comment_count = video_element.find_element(By.CLASS_NAME, "feed-comment-total").find_element(By.XPATH,
"./span[2]").get_attribute(
"innerHTML")
video_name = video_element.find_element(By.CLASS_NAME, "comment-content").get_attribute("innerHTML")
return video_name, video_time, comment_count
def __process_reply(self, video_name, video_date, comment_element):
"""
处理单条评论回复
:param video_name:
:param video_date:
:param comment_element:
:return: 是否需要点赞
"""
def get_reply_comment(nick_name, content):
if not setting.is_reply_comment():
return None
# 内容是否符合标准
return comment_manager.get_replay_content(video_name, video_date, nick_name, content)
comment_user_name, comment_content, comment_time = self.__parse_comment_info(comment_element)
try:
comment_reply_list = comment_element.find_element(By.CLASS_NAME, "comment-reply-list")
if comment_reply_list.find_element(By.XPATH, ".//div[normalize-space()='作者']"):
print("已经回复。。。。")
return False
except:
print("还没回复。。。。")
replay_content = get_reply_comment(comment_user_name, comment_element)
if not replay_content:
# 不需要评论
print("不需要回复。。。。")
return True
# 需要回复
comment_actions = comment_element.find_element(By.CLASS_NAME, "action-list").find_elements(By.CLASS_NAME,
"action-item")
# 点击评论
comment_actions[1].click()
wait = WebDriverWait(self.driver, 5) # 最多等待10秒
wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "comment-create-content")))
time.sleep(1)
self.driver.find_element(By.CLASS_NAME,
"comment-create-content").find_element(By.TAG_NAME,
"textarea").send_keys(replay_content)
self.driver.implicitly_wait(1)
self.driver.find_element(By.CLASS_NAME,
"comment-create-content").find_element(By.XPATH, "div[3]/div[2]").click()
time.sleep(2)
return False
def __process_comments(self, driver, start_index=0, video_info=None):
"""
:param driver:
:param start_index:
:param video_info:(视频内容, 视频日期, 评论次数)
:return:
"""
scroll_list = driver.find_element(By.CLASS_NAME, "feed-comment__wrp")
loadmore = scroll_list.find_element(By.CLASS_NAME, "loadmore__dot")
# 获取父控件
loadmore = loadmore.find_element(By.XPATH, "./..")
has_more = loadmore.value_of_css_property("display") == "none"
comments = scroll_list.find_elements(By.XPATH, "div[2]/div/div/div[contains(@class,'comment-item')]")
print("评论条数", len(comments))
for index in range(start_index, len(comments)):
comment = comments[index]
# 评论内容
user_name, comment_content, comment_time = self.__parse_comment_info(comment)
if not comment_content:
continue
print("=======评论内容:", comment_content)
need_like = self.__process_reply(video_info[0], video_info[1], comment)
if need_like:
print("需要点赞")
self.__process_like(video_info[0], video_info[1], comment)
else:
print("不需要点赞")
# 往下滑动
if has_more:
driver.execute_script(
"var __comment_scroll = document.getElementsByClassName('feed-comment__wrp')[0]; __comment_scroll.scrollTop = __comment_scroll.scrollHeight;")
print("往下滚动")
time.sleep(3)
self.__process_comments(driver, start_index=len(comments))
else:
print("没有更多评论了")
def process_videos(self, driver: webdriver.Chrome, start_index=0):
wait = WebDriverWait(driver, 100) # 最多等待10秒
scroll_list = wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "feeds-container")))
time.sleep(1)
# 查找视频列表
videos_root = driver.find_element(By.CLASS_NAME, "feeds-container")
videos = videos_root.find_elements(By.CLASS_NAME, "comment-feed-wrap")
for i in range(start_index, len(videos)):
# 选择视频
video = videos[i]
# 解析视频内容
video_name, video_time, comment_count = self.__parse_video_info(video)
# 判断是否要点击进去
if not VideoManger().is_need_click(video_name, video_time, comment_count):
continue
video.click()
driver.implicitly_wait(2)
self.__process_comments(driver, video_info=(video_name, video_time, comment_count))
# 5s处理一个视频
time.sleep(5)
loadmore = scroll_list.find_element(By.CLASS_NAME, "loadmore__dot")
# 获取父控件
loadmore = loadmore.find_element(By.XPATH, "./..")
has_more = loadmore.value_of_css_property("display") == "none"
if has_more:
driver.execute_script(
"var __video_scroll = document.getElementsByClassName('feeds-container')[0]; __video_scroll.scrollTop = __video_scroll.scrollHeight;")
print("往下滚动")
time.sleep(3)
self.process_videos(driver, start_index=len(videos))
else:
print("没有更多视频了")
VideoManger().add_video_infos([self.__parse_video_info(v) for v in videos])
def close(self):
if self.driver:
try:
self.driver.close()
except:
pass
if __name__ == "__main__":
# CommentManager().start_process_comment()
s = "你好"
print(s.encode('unicode_escape').decode("utf-8"))
# if __name__ == "__main__":
# data = """
# 213![[呲牙]](https://res.wx.qq.com/mpres/zh_CN/htmledition/comm_htmledition/images/pic/common/pic_blank.gif)
![[呲牙]](https://res.wx.qq.com/mpres/zh_CN/htmledition/comm_htmledition/images/pic/common/pic_blank.gif)
# """
#
# result = re.compile(r'[\u4e00-\u9fa5]+').search(re.sub("", "", data))
# print(result)