import threading import time from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import comment_manager import setting from video_manager import VideoManger class CommentManager: def __init__(self): self.options = Options() self.options.add_argument("--disable-blink-features") self.options.add_argument("--disable-blink-features=AutomationControlled") # driver = webdriver.Chrome(options=options) # 另外一种方式 # 谷歌浏览器位置 chrome_location = r'C:\Program Files (x86)\Google\Chrome\Application\chrome.exe' # 谷歌浏览器驱动地址 chromedriver_path = r'chromedriver.exe' self.options.binary_location = chrome_location # 指定chrome的路径 self.service = Service(chromedriver_path) # 获取正则表达式 comment_template_str = "" # 将中文替换为正则表达式支持的unicode编码 comment_template_str = comment_template_str.encode('unicode_escape').decode("utf-8") self.comment_templates = [(x.split("#")[0], x.split("#")[1]) for x in comment_template_str.split("\n") if x.find("#") >= 0] self.driver = None def __init(self): if not self.driver: self.driver = webdriver.Chrome(service=self.service, options=self.options) def start_process_comment(self): """ 开始处理评论 :return: """ self.__init() self.driver.get( "https://channels.weixin.qq.com/platform/comment?isImageMode=0") # 等到左侧菜单出现后才能执行后续操作 wait = WebDriverWait(self.driver, 100) # 最多等待10秒 element = wait.until(EC.visibility_of_element_located((By.ID, "side-bar"))) threading.Thread(target=lambda: self.process_videos(self.driver), daemon=True).start() def __process_like(self, video_name, video_date, comment_element): """ 处理单条评论的赞 :param comment_content: :param comment_element: :return: """ user_name, comment_content, comment_time = self.__parse_comment_info(comment_element) if not comment_manager.is_need_click_like(video_name, video_date, user_name, comment_content): return comment_actions = comment_element.find_element(By.CLASS_NAME, "action-list").find_elements(By.CLASS_NAME, "action-item") classes: str = comment_actions[0].find_element(By.TAG_NAME, "svg").get_attribute("class") if classes and classes.find("fill-like") >= 0: # 已经点过赞了 return like = comment_element.find_element(By.CLASS_NAME, "like-action") like.click() # self.driver.implicitly_wait(2) time.sleep(2) def __parse_comment_info(self, comment_element): """ 解析评论信息 :param comment_element: :return:(评论人,评论内容,评论时间) """ user_name = comment_element.find_element(By.CLASS_NAME, "comment-user-name").get_attribute("innerHTML") comment_time = comment_element.find_element(By.CLASS_NAME, "comment-time").get_attribute("innerHTML") comment_content = comment_element.find_element(By.CLASS_NAME, "comment-content").get_attribute("innerHTML") return user_name, comment_content, comment_time def __parse_video_info(self, video_element): """ 解析视频信息 :param comment_element: :return:(视频名称,视频时间,评论数量) """ video_time = video_element.find_element(By.CLASS_NAME, "feed-time").get_attribute("innerHTML") comment_count = video_element.find_element(By.CLASS_NAME, "feed-comment-total").find_element(By.XPATH, "./span[2]").get_attribute( "innerHTML") video_name = video_element.find_element(By.CLASS_NAME, "comment-content").get_attribute("innerHTML") return video_name, video_time, comment_count def __process_reply(self, video_name, video_date, comment_element): """ 处理单条评论回复 :param video_name: :param video_date: :param comment_element: :return: 是否需要点赞 """ def get_reply_comment(nick_name, content): if not setting.is_reply_comment(): return None # 内容是否符合标准 return comment_manager.get_replay_content(video_name, video_date, nick_name, content) comment_user_name, comment_content, comment_time = self.__parse_comment_info(comment_element) try: comment_reply_list = comment_element.find_element(By.CLASS_NAME, "comment-reply-list") if comment_reply_list.find_element(By.XPATH, ".//div[normalize-space()='作者']"): print("已经回复。。。。") return False except: print("还没回复。。。。") replay_content = get_reply_comment(comment_user_name, comment_element) if not replay_content: # 不需要评论 print("不需要回复。。。。") return True # 需要回复 comment_actions = comment_element.find_element(By.CLASS_NAME, "action-list").find_elements(By.CLASS_NAME, "action-item") # 点击评论 comment_actions[1].click() wait = WebDriverWait(self.driver, 5) # 最多等待10秒 wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "comment-create-content"))) time.sleep(1) self.driver.find_element(By.CLASS_NAME, "comment-create-content").find_element(By.TAG_NAME, "textarea").send_keys(replay_content) self.driver.implicitly_wait(1) self.driver.find_element(By.CLASS_NAME, "comment-create-content").find_element(By.XPATH, "div[3]/div[2]").click() time.sleep(2) return False def __process_comments(self, driver, start_index=0, video_info=None): """ :param driver: :param start_index: :param video_info:(视频内容, 视频日期, 评论次数) :return: """ scroll_list = driver.find_element(By.CLASS_NAME, "feed-comment__wrp") loadmore = scroll_list.find_element(By.CLASS_NAME, "loadmore__dot") # 获取父控件 loadmore = loadmore.find_element(By.XPATH, "./..") has_more = loadmore.value_of_css_property("display") == "none" comments = scroll_list.find_elements(By.XPATH, "div[2]/div/div/div[contains(@class,'comment-item')]") print("评论条数", len(comments)) for index in range(start_index, len(comments)): comment = comments[index] # 评论内容 user_name, comment_content, comment_time = self.__parse_comment_info(comment) if not comment_content: continue print("=======评论内容:", comment_content) need_like = self.__process_reply(video_info[0], video_info[1], comment) if need_like: print("需要点赞") self.__process_like(video_info[0], video_info[1], comment) else: print("不需要点赞") # 往下滑动 if has_more: driver.execute_script( "var __comment_scroll = document.getElementsByClassName('feed-comment__wrp')[0]; __comment_scroll.scrollTop = __comment_scroll.scrollHeight;") print("往下滚动") time.sleep(3) self.__process_comments(driver, start_index=len(comments)) else: print("没有更多评论了") def process_videos(self, driver: webdriver.Chrome, start_index=0): wait = WebDriverWait(driver, 100) # 最多等待10秒 scroll_list = wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "feeds-container"))) time.sleep(1) # 查找视频列表 videos_root = driver.find_element(By.CLASS_NAME, "feeds-container") videos = videos_root.find_elements(By.CLASS_NAME, "comment-feed-wrap") for i in range(start_index, len(videos)): # 选择视频 video = videos[i] # 解析视频内容 video_name, video_time, comment_count = self.__parse_video_info(video) # 判断是否要点击进去 if not VideoManger().is_need_click(video_name, video_time, comment_count): continue video.click() driver.implicitly_wait(2) self.__process_comments(driver, video_info=(video_name, video_time, comment_count)) # 5s处理一个视频 time.sleep(5) loadmore = scroll_list.find_element(By.CLASS_NAME, "loadmore__dot") # 获取父控件 loadmore = loadmore.find_element(By.XPATH, "./..") has_more = loadmore.value_of_css_property("display") == "none" if has_more: driver.execute_script( "var __video_scroll = document.getElementsByClassName('feeds-container')[0]; __video_scroll.scrollTop = __video_scroll.scrollHeight;") print("往下滚动") time.sleep(3) self.process_videos(driver, start_index=len(videos)) else: print("没有更多视频了") VideoManger().add_video_infos([self.__parse_video_info(v) for v in videos]) def close(self): if self.driver: try: self.driver.close() except: pass if __name__ == "__main__": # CommentManager().start_process_comment() s = "你好" print(s.encode('unicode_escape').decode("utf-8")) # if __name__ == "__main__": # data = """ # 213[呲牙][呲牙][呲牙] # """ # # result = re.compile(r'[\u4e00-\u9fa5]+').search(re.sub("", "", data)) # print(result)