| | |
| | | import re |
| | | import json |
| | | import os |
| | | import random |
| | | import threading |
| | | import time |
| | | |
| | |
| | | from selenium.webdriver.support.wait import WebDriverWait |
| | | from selenium.webdriver.support import expected_conditions as EC |
| | | |
| | | import comment_manager |
| | | import constant |
| | | import setting |
| | | from video_manager import VideoManger |
| | | |
| | | |
| | | class CommentManager: |
| | |
| | | # driver = webdriver.Chrome(options=options) |
| | | # 另外一种方式 |
| | | # 谷歌浏览器位置 |
| | | chrome_location = r'C:\Program Files (x86)\Google\Chrome\Application\chrome.exe' |
| | | chrome_location = setting.get_chrome_path() |
| | | # 谷歌浏览器驱动地址 |
| | | chromedriver_path = r'chromedriver.exe' |
| | | chromedriver_path = setting.get_chromedriver_path() |
| | | self.options.binary_location = chrome_location # 指定chrome的路径 |
| | | self.service = Service(chromedriver_path) |
| | | # 获取正则表达式 |
| | |
| | | self.comment_templates = [(x.split("#")[0], x.split("#")[1]) for x in comment_template_str.split("\n") if |
| | | x.find("#") >= 0] |
| | | self.driver = None |
| | | self.break_excute = False # 中断执行 |
| | | |
| | | def break_excute(self): |
| | | """ |
| | | 中断执行 |
| | | :return: |
| | | """ |
| | | print("=====中断执行=====") |
| | | self.break_excute = True |
| | | |
| | | def __init(self): |
| | | self.break_excute = False |
| | | if not self.driver: |
| | | self.driver = webdriver.Chrome(service=self.service, options=self.options) |
| | | self.driver.get( |
| | | "https://channels.weixin.qq.com/platform/comment?isImageMode=0") |
| | | else: |
| | | self.driver.refresh() |
| | | |
| | | def start_process_comment(self): |
| | | """ |
| | | 开始处理评论 |
| | | :return: |
| | | """ |
| | | self.__init() |
| | | self.driver.get( |
| | | "https://channels.weixin.qq.com/platform/comment?isImageMode=0") |
| | | # 等到左侧菜单出现后才能执行后续操作 |
| | | wait = WebDriverWait(self.driver, 100) # 最多等待10秒 |
| | | element = wait.until(EC.visibility_of_element_located((By.ID, "side-bar"))) |
| | | threading.Thread(target=lambda: self.click_like(self.driver), daemon=True).start() |
| | | while True: |
| | | try: |
| | | self.__start_excute_task() |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(constant.REFRESH_TIME_SPACE) |
| | | |
| | | def __process_like(self, comment_content, comment_element): |
| | | def __start_excute_task(self): |
| | | """ |
| | | 开始执行任务 |
| | | :return: |
| | | """ |
| | | self.__init() |
| | | # 等到左侧菜单出现后才能执行后续操作 |
| | | wait = WebDriverWait(self.driver, 1000000) # 最多等待10秒 |
| | | element = wait.until(EC.visibility_of_element_located((By.ID, "side-bar"))) |
| | | self.process_videos(self.driver) |
| | | |
| | | def __process_like(self, video_name, video_date, comment_element): |
| | | """ |
| | | 处理单条评论的赞 |
| | | :param comment_content: |
| | | :param comment_element: |
| | | :return: |
| | | """ |
| | | comment_pattern = re.compile(r'[\u4e00-\u9fa5]+') |
| | | comment_content = re.sub("<img.*?>", "", comment_content) |
| | | if not comment_pattern.search(comment_content): |
| | | # 符合标准的评论 |
| | | user_name, comment_content, comment_time = self.__parse_comment_info(comment_element) |
| | | if not comment_manager.is_need_click_like(video_name, video_date, user_name, comment_content): |
| | | return |
| | | |
| | | comment_actions = comment_element.find_element(By.CLASS_NAME, "action-list").find_elements(By.CLASS_NAME, |
| | | "action-item") |
| | | classes: str = comment_actions[0].find_element(By.TAG_NAME, "svg").get_attribute("class") |
| | |
| | | return |
| | | like = comment_element.find_element(By.CLASS_NAME, "like-action") |
| | | like.click() |
| | | self.driver.implicitly_wait(2) |
| | | # self.driver.implicitly_wait(2) |
| | | time.sleep(2) |
| | | |
| | | def __process_reply(self, comment_content, comment_element): |
| | | def __parse_comment_info(self, comment_element): |
| | | """ |
| | | 处理单条评论的回复 |
| | | :param comment_content: |
| | | 解析评论信息 |
| | | :param comment_element: |
| | | :return: |
| | | :return:(评论人,评论内容,评论时间) |
| | | """ |
| | | user_name = comment_element.find_element(By.CLASS_NAME, "comment-user-name").get_attribute("innerHTML") |
| | | comment_time = comment_element.find_element(By.CLASS_NAME, "comment-time").get_attribute("innerHTML") |
| | | try: |
| | | comment_content = comment_element.find_element(By.CLASS_NAME, "comment-content").get_attribute("innerHTML") |
| | | except: |
| | | comment_content = '' |
| | | return user_name, comment_content, comment_time |
| | | |
| | | def __parse_video_info(self, video_element): |
| | | """ |
| | | 解析视频信息 |
| | | :param comment_element: |
| | | :return:(视频名称,视频时间,评论数量) |
| | | """ |
| | | video_time = video_element.find_element(By.CLASS_NAME, "feed-time").get_attribute("innerHTML") |
| | | comment_count = video_element.find_element(By.CLASS_NAME, "feed-comment-total").find_element(By.XPATH, |
| | | "./span[2]").get_attribute( |
| | | "innerHTML") |
| | | video_name = video_element.find_element(By.CLASS_NAME, "feed-title").get_attribute("innerHTML") |
| | | video_title = video_name |
| | | if video_title.find("#") >= 0: |
| | | video_title = video_title[:video_title.find("#")] |
| | | return video_name, video_time, comment_count, video_title |
| | | |
| | | def __process_reply(self, video_info, comment_element): |
| | | """ |
| | | 处理单条评论回复 |
| | | :param video_name:(video_name, video_date, comment_count, video_title) |
| | | :param comment_element: |
| | | :return: 是否需要点赞 |
| | | """ |
| | | |
| | | def get_reply_comment(nick_name, content): |
| | | if not setting.is_reply_comment(): |
| | | return None |
| | | # 内容是否符合标准 |
| | | for t in self.comment_templates: |
| | | if re.match(t[0], content): |
| | | # 满足内容 |
| | | return t[1].replace("[昵称]", nick_name) |
| | | return comment_manager.get_replay_content(video_info, nick_name, content) |
| | | |
| | | comment_pattern = re.compile(r'[\u4e00-\u9fa5]+') |
| | | comment_content = re.sub("<img.*?>", "", comment_content) |
| | | if not comment_pattern.search(comment_content): |
| | | # 符合标准的评论 |
| | | return |
| | | comment_user_name, comment_content, comment_time = self.__parse_comment_info(comment_element) |
| | | |
| | | try: |
| | | comment_reply_list = comment_element.find_element(By.CLASS_NAME, "comment-reply-list") |
| | | if comment_reply_list.find_element(By.XPATH, ".//div[normalize-space()='作者']"): |
| | | print("已经回复。。。。") |
| | | return |
| | | return False |
| | | except: |
| | | print("还没回复。。。。") |
| | | |
| | | replay_content = get_reply_comment(comment_user_name, comment_content) |
| | | if not replay_content: |
| | | # 不需要评论 |
| | | print("不需要回复。。。。") |
| | | return True |
| | | |
| | | # 需要回复 |
| | | |
| | | comment_actions = comment_element.find_element(By.CLASS_NAME, "action-list").find_elements(By.CLASS_NAME, |
| | | "action-item") |
| | | # 点击评论 |
| | | comment_actions[1].click() |
| | | wait = WebDriverWait(self.driver, 5) # 最多等待10秒 |
| | | element = wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "comment-create-content"))) |
| | | # TODO 昵称 |
| | | replay_content = get_reply_comment("昵称", comment_element) |
| | | if not replay_content: |
| | | # 不需要评论 |
| | | return |
| | | |
| | | wait = WebDriverWait(self.driver, 10) # 最多等待10秒 |
| | | wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "comment-create-content"))) |
| | | time.sleep(1) |
| | | self.driver.find_element(By.CLASS_NAME, |
| | | "comment-create-content").find_element(By.TAG_NAME, |
| | | "textarea").send_keys(replay_content) |
| | |
| | | self.driver.find_element(By.CLASS_NAME, |
| | | "comment-create-content").find_element(By.XPATH, "div[3]/div[2]").click() |
| | | time.sleep(2) |
| | | time.sleep(random.randint(constant.COMMENT_REPLY_SPACE_TIME_MIN, constant.COMMENT_REPLY_SPACE_TIME_MAX)) |
| | | return False |
| | | |
| | | def __click_like_all(self, driver, start_index=0): |
| | | def __process_comments(self, driver, start_index=0, video_info=None): |
| | | """ |
| | | |
| | | :param driver: |
| | | :param start_index: |
| | | :param video_info:(视频内容, 视频日期, 评论次数, 视频标题) |
| | | :return: |
| | | """ |
| | | scroll_list = driver.find_element(By.CLASS_NAME, "feed-comment__wrp") |
| | | |
| | | loadmore = scroll_list.find_element(By.CLASS_NAME, "loadmore__dot") |
| | | # 获取父控件 |
| | | loadmore = loadmore.find_element(By.XPATH, "./..") |
| | | has_more = loadmore.value_of_css_property("display") == "none" |
| | | try: |
| | | loadmore = scroll_list.find_element(By.CLASS_NAME, "loadmore__dot") |
| | | # 获取父控件 |
| | | loadmore = loadmore.find_element(By.XPATH, "./..") |
| | | has_more = loadmore.value_of_css_property("display") == "none" |
| | | except: |
| | | has_more = False |
| | | comments = scroll_list.find_elements(By.XPATH, "div[2]/div/div/div[contains(@class,'comment-item')]") |
| | | print("评论条数", len(comments)) |
| | | |
| | | for index in range(start_index, len(comments)): |
| | | if self.break_excute: |
| | | return |
| | | comment = comments[index] |
| | | # 评论内容 |
| | | comment_content = comment.find_element(By.CLASS_NAME, "comment-content").get_attribute("innerHTML") |
| | | user_name, comment_content, comment_time = self.__parse_comment_info(comment) |
| | | if not comment_content: |
| | | continue |
| | | comment_content = '' |
| | | print("=======评论内容:", comment_content) |
| | | self.__process_reply(comment_content, comment) |
| | | self.__process_like(comment_content, comment) |
| | | need_like = self.__process_reply(video_info, comment) |
| | | if need_like: |
| | | print("需要点赞") |
| | | self.__process_like(video_info[0], video_info[1], comment) |
| | | else: |
| | | print("不需要点赞") |
| | | # 往下滑动 |
| | | if has_more: |
| | | driver.execute_script( |
| | | "var __comment_scroll = document.getElementsByClassName('feed-comment__wrp')[0]; __comment_scroll.scrollTop = __comment_scroll.scrollHeight;") |
| | | print("往下滚动") |
| | | time.sleep(3) |
| | | self.__click_like_all(driver, start_index=len(comments)) |
| | | self.__process_comments(driver, start_index=len(comments), video_info=video_info) |
| | | else: |
| | | print("没有更多了") |
| | | print("没有更多评论了, 评论数量:", len(comments)) |
| | | |
| | | def click_like(self, driver: webdriver.Chrome): |
| | | def process_videos(self, driver: webdriver.Chrome, start_index=0): |
| | | wait = WebDriverWait(driver, 100) # 最多等待10秒 |
| | | element = wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "feeds-container"))) |
| | | scroll_list = wait.until(EC.visibility_of_element_located((By.CLASS_NAME, "feeds-container"))) |
| | | time.sleep(1) |
| | | # 查找视频列表 |
| | | videos_root = driver.find_element(By.CLASS_NAME, "feeds-container") |
| | | videos = videos_root.find_elements(By.CLASS_NAME, "comment-feed-wrap") |
| | | for video in videos: |
| | | for i in range(start_index, len(videos)): |
| | | if self.break_excute: |
| | | return |
| | | # 选择视频 |
| | | video = videos[i] |
| | | # 解析视频内容 |
| | | video_info = self.__parse_video_info(video) |
| | | # 判断是否要点击进去 |
| | | if not VideoManger().is_need_click(video_info): |
| | | continue |
| | | video.click() |
| | | driver.implicitly_wait(2) |
| | | self.__click_like_all(driver) |
| | | self.__process_comments(driver, video_info=video_info) |
| | | # 处理一个视频 |
| | | time.sleep(constant.VIDEO_CLICK_SPACE_TIME) |
| | | |
| | | loadmore = scroll_list.find_element(By.CLASS_NAME, "loadmore__dot") |
| | | # 获取父控件 |
| | | loadmore = loadmore.find_element(By.XPATH, "./..") |
| | | has_more = loadmore.value_of_css_property("display") == "none" |
| | | if has_more: |
| | | driver.execute_script( |
| | | "var __video_scroll = document.getElementsByClassName('feeds-container')[0]; __video_scroll.scrollTop = __video_scroll.scrollHeight;") |
| | | print("往下滚动") |
| | | time.sleep(3) |
| | | self.process_videos(driver, start_index=len(videos)) |
| | | else: |
| | | print("没有更多视频了") |
| | | VideoManger().add_video_infos([self.__parse_video_info(v) for v in videos]) |
| | | |
| | | def close(self): |
| | | if self.driver: |
| | |
| | | except: |
| | | pass |
| | | |
| | | def __save_cookie(self): |
| | | cookies = self.driver.get_cookies() |
| | | with open("datas/cookies.json", "w") as f: |
| | | json.dump(cookies, f) |
| | | |
| | | def __fill_cookie(self): |
| | | if os.path.exists("datas/cookies.json"): |
| | | with open("datas/cookies.json", "r") as f: |
| | | cookies = json.load(f) |
| | | for cookie in cookies: |
| | | self.driver.add_cookie(cookie) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | # CommentManager().start_process_comment() |