"""Base class for all scrapers that will actually do the job.""" from base64 import b64encode from os import remove, stat from os.path import exists, join, getsize import re from typing import List, Callable from urllib.request import urlretrieve, URLopener, HTTPError import hashlib from scrapthechan import USER_AGENT from scrapthechan.fileinfo import FileInfo __all__ = ["Scraper"] class Scraper: """Base class for all scrapers that will actually do the job. Arguments: save_directory -- a path to a directory where file will be saved; files -- a list of FileInfo objects; download_progress_callback -- a callback function that will be called for each file started downloading. """ def __init__(self, save_directory: str, files: List[FileInfo], download_progress_callback: Callable[[int], None] = None) -> None: self._save_directory = save_directory self._files = files self._url_opener = URLopener() self._url_opener.addheaders = [('User-Agent', USER_AGENT)] self._url_opener.version = USER_AGENT self._progress_callback = download_progress_callback def run(self): raise NotImplementedError def _same_filename(self, filename: str, path: str) -> str: """Check if there is a file with same name. If so then add incremental number enclosed in brackets to a name of a new one.""" newname = filename while exists(join(path, newname)): has_extension = newname.rfind(".") != -1 if has_extension: l, r = newname.rsplit(".", 1) lbracket = l.rfind("(") if lbracket == -1: newname = f"{l}(1).{r}" else: num = l[lbracket+1:-1] if num.isnumeric(): newname = f"{l[:lbracket]}({int(num)+1}).{r}" else: newname = f"{l}(1).{r}" else: lbracket = l.rfind("(") if lbracket == -1: newname = f"{newname}(1)" else: num = newname[lbracket+1:-1] if num.isnumeric(): newname = f"{newname[:lbracket]}({int(num)+1})" return newname def _hash_file(self, filepath: str, hash_algorithm: str = "md5", blocksize: int = 1048576) -> (str, str): """Compute hash of a file.""" hash_func = hashlib.new(hash_algorithm) with open(filepath, 'rb') as f: buf = f.read(blocksize) while len(buf) > 0: hash_func.update(buf) buf = f.read(blocksize) return hash_func.hexdigest(), b64encode(hash_func.digest()).decode() def _check_file(self, f: FileInfo, filepath: str) -> bool: """Check if a file exist and isn't broken.""" if not exists(filepath): return False computed_size = getsize(filepath) if not (f.size == computed_size \ or f.size == round(computed_size / 1024)): return False hexdig, dig = self._hash_file(filepath, f.hash_algorithm) return f.hash_value == hexdig or f.hash_value == dig def _download_file(self, f: FileInfo): """Download a single file.""" is_same_filename = False filepath = join(self._save_directory, f.name) orig_filepath = filepath if self._check_file(f, filepath): return elif exists(filepath): is_same_filename = True filepath = join(self._save_directory, \ self._same_filename(f.name, self._save_directory)) try: retries = 3 while retries > 0: self._url_opener.retrieve(f.download_url, filepath) if not self._check_file(f, filepath): print(filepath, f.size, f.hash_value) remove(filepath) retries -= 1 else: break if is_same_filename: f1_hexdig, f1_dig = self._hash_file(orig_filepath, f.hash_algorithm) f2_hexdig, f2_dig = self._hash_file(filepath, f.hash_algorithm) if f1_hexdig == f2_hexdig or f1_dig == f2_dig: remove(filepath) except HTTPError as e: print("HTTP Error", e.code, e.reason, f.download_url) if exists(filepath): remove(filepath) except ConnectionResetError: print("Connection reset for", f.download_url) if exists(filepath): remove(filepath) except ConnectionRefusedError: print("Connection refused for", f.download_url) if exists(filepath): remove(filepath) except ConnectionAbortedError: print("Connection aborted for", f.download_url) if exists(filepath): remove(filepath)