import json import os import random import threading import time from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import comment_manager import constant import setting from video_manager import VideoManger class CommentManager: def __init__(self): self.options = Options() self.options.add_argument("--disable-blink-features") self.options.add_argument("--disable-blink-features=AutomationControlled") # driver = webdriver.Chrome(options=options) # 另外一种方式 # 谷歌浏览器位置 chrome_location = setting.get_chrome_path() # 谷歌浏览器驱动地址 chromedriver_path = setting.get_chromedriver_path() self.options.binary_location = chrome_location # 指定chrome的路径 self.service = Service(chromedriver_path) # 获取正则表达式 comment_template_str = "" # 将中文替换为正则表达式支持的unicode编码 comment_template_str = comment_template_str.encode('unicode_escape').decode("utf-8") self.comment_templates = [(x.split("#")[0], x.split("#")[1]) for x in comment_template_str.split("\n") if x.find("#") >= 0] self.driver = None self.break_excute = False # 中断执行 def break_excute(self): """ 中断执行 :return: """ print("=====中断执行=====") self.break_excute = True def __init(self): self.break_excute = False if not self.driver: self.driver = webdriver.Chrome(service=self.service, options=self.options) self.driver.get( "https://channels.weixin.qq.com/platform/comment?isImageMode=0") else: self.driver.refresh() def start_process_comment(self): """ 开始处理评论 :return: """ while True: try: self.__start_excute_task() except: pass finally: time.sleep(constant.REFRESH_TIME_SPACE) def __start_excute_task(self): """ 开始执行任务 :return: """ self.__init() # 等到左侧菜单出现后才能执行后续操作 wait = WebDriverWait(self.driver, 1000000) # 最多等待10秒 element = wait.until(EC.visibility_of_element_located((By.ID, "side-bar"))) self.process_videos(self.driver) def __process_like(self, video_name, video_date, comment_element): """ 处理单条评论的赞 :param comment_content: :param comment_element: :return: """ user_name, comment_content, comment_time = self.__parse_comment_info(comment_element) if not comment_manager.is_need_click_like(video_name, video_date, user_name, comment_content): return comment_actions = comment_element.find_element(By.CLASS_NAME, "action-list").find_elements(By.CLASS_NAME, "action-item") classes: str = comment_actions[0].find_element(By.TAG_NAME, "svg").get_attribute("class") if classes and classes.find("fill-like") >= 0: # 已经点过赞了 return like = comment_element.find_element(By.CLASS_NAME, "like-action") like.click() # self.driver.implicitly_wait(2) time.sleep(2) def __parse_comment_info(self, comment_element): """ 解析评论信息 :param comment_element: :return:(评论人,评论内容,评论时间) """ user_name = comment_element.find_element(By.CLASS_NAME, "comment-user-name").get_attribute("innerHTML") comment_time = comment_element.find_element(By.CLASS_NAME, "comment-time").get_attribute("innerHTML") try: comment_content = comment_element.find_element(By.CLASS_NAME, "comment-content").get_attribute("innerHTML") except: comment_content = '' return user_name, comment_content, comment_time def __parse_video_info(self, video_element): """ 解析视频信息 :param comment_element: :return:(视频名称,视频时间,评论数量) """ video_time = video_element.find_element(By.CLASS_NAME, "feed-time").get_attribute("innerHTML") comment_count = video_element.find_element(By.CLASS_NAME, "feed-comment-total").find_element(By.XPATH, "./span[2]").get_attribute( "innerHTML") video_name = video_element.find_element(By.CLASS_NAME, "feed-title").get_attribute("innerHTML") video_title = video_name if video_title.find("#") >= 0: video_title = video_title[:video_title.find("#")] return video_name, video_time, comment_count, video_title def __process_reply(self, video_info, comment_element): """ 处理单条评论回复 :param video_name:(video_name, video_date, comment_count, video_title) :param comment_element: :return: 是否需要点赞 """ def get_reply_comment(nick_name, content): if not setting.is_reply_comment(): return None # 内容是否符合标准 return comment_manager.get_replay_content(video_info, nick_name, content) comment_user_name, comment_content, comment_time = self.__parse_comment_info(comment_element) try: comment_reply_list = comment_element.find_element(By.CLASS_NAME, "comment-reply-list") if comment_reply_list.find_element(By.XPATH, ".//div[normalize-space()='作者']"): print("已经回复。。。。") return False except: print("还没回复。。。。") replay_content = get_reply_comment(comment_user_name, comment_content) if not replay_content: # 不需要评论 print("不需要回复。。。。") return True # 需要回复 comment_actions = comment_element.find_element(By.CLASS_NAME, "action-list").find_elements(By.CLASS_NAME, "action-item") # 点击评论 comment_actions[1].click() wait = WebDriverWait(self.driver, 10) # 最多等待10秒 wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "comment-create-content"))) time.sleep(1) self.driver.find_element(By.CLASS_NAME, "comment-create-content").find_element(By.TAG_NAME, "textarea").send_keys(replay_content) self.driver.implicitly_wait(1) self.driver.find_element(By.CLASS_NAME, "comment-create-content").find_element(By.XPATH, "div[3]/div[2]").click() time.sleep(2) time.sleep(random.randint(constant.COMMENT_REPLY_SPACE_TIME_MIN, constant.COMMENT_REPLY_SPACE_TIME_MAX)) return False def __process_comments(self, driver, start_index=0, video_info=None): """ :param driver: :param start_index: :param video_info:(视频内容, 视频日期, 评论次数, 视频标题) :return: """ scroll_list = driver.find_element(By.CLASS_NAME, "feed-comment__wrp") try: loadmore = scroll_list.find_element(By.CLASS_NAME, "loadmore__dot") # 获取父控件 loadmore = loadmore.find_element(By.XPATH, "./..") has_more = loadmore.value_of_css_property("display") == "none" except: has_more = False comments = scroll_list.find_elements(By.XPATH, "div[2]/div/div/div[contains(@class,'comment-item')]") print("评论条数", len(comments)) for index in range(start_index, len(comments)): if self.break_excute: return comment = comments[index] # 评论内容 user_name, comment_content, comment_time = self.__parse_comment_info(comment) if not comment_content: comment_content = '' print("=======评论内容:", comment_content) need_like = self.__process_reply(video_info, comment) if need_like: print("需要点赞") self.__process_like(video_info[0], video_info[1], comment) else: print("不需要点赞") # 往下滑动 if has_more: driver.execute_script( "var __comment_scroll = document.getElementsByClassName('feed-comment__wrp')[0]; __comment_scroll.scrollTop = __comment_scroll.scrollHeight;") print("往下滚动") time.sleep(3) self.__process_comments(driver, start_index=len(comments), video_info=video_info) else: print("没有更多评论了, 评论数量:", len(comments)) def process_videos(self, driver: webdriver.Chrome, start_index=0): wait = WebDriverWait(driver, 100) # 最多等待10秒 scroll_list = wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "feeds-container"))) time.sleep(1) # 查找视频列表 videos_root = driver.find_element(By.CLASS_NAME, "feeds-container") videos = videos_root.find_elements(By.CLASS_NAME, "comment-feed-wrap") for i in range(start_index, len(videos)): if self.break_excute: return # 选择视频 video = videos[i] # 解析视频内容 video_info = self.__parse_video_info(video) # 判断是否要点击进去 if not VideoManger().is_need_click(video_info): continue video.click() driver.implicitly_wait(2) self.__process_comments(driver, video_info=video_info) # 处理一个视频 time.sleep(constant.VIDEO_CLICK_SPACE_TIME) loadmore = scroll_list.find_element(By.CLASS_NAME, "loadmore__dot") # 获取父控件 loadmore = loadmore.find_element(By.XPATH, "./..") has_more = loadmore.value_of_css_property("display") == "none" if has_more: driver.execute_script( "var __video_scroll = document.getElementsByClassName('feeds-container')[0]; __video_scroll.scrollTop = __video_scroll.scrollHeight;") print("往下滚动") time.sleep(3) self.process_videos(driver, start_index=len(videos)) else: print("没有更多视频了") VideoManger().add_video_infos([self.__parse_video_info(v) for v in videos]) def close(self): if self.driver: try: self.driver.close() except: pass def __save_cookie(self): cookies = self.driver.get_cookies() with open("datas/cookies.json", "w") as f: json.dump(cookies, f) def __fill_cookie(self): if os.path.exists("datas/cookies.json"): with open("datas/cookies.json", "r") as f: cookies = json.load(f) for cookie in cookies: self.driver.add_cookie(cookie) if __name__ == "__main__": # CommentManager().start_process_comment() s = "你好" print(s.encode('unicode_escape').decode("utf-8")) # if __name__ == "__main__": # data = """ # 213[呲牙][呲牙][呲牙] # """ # # result = re.compile(r'[\u4e00-\u9fa5]+').search(re.sub("", "", data)) # print(result)