"""Base Scraper implementation.""" from base64 import b64encode from os import remove, stat from os.path import exists, join, getsize import re from typing import List, Callable from urllib.request import urlretrieve, URLopener import hashlib from scrapthechan import __version__ from scrapthechan.fileinfo import FileInfo __all__ = ["Scraper"] class Scraper: """Base scraper implementation. Arguments: save_directory -- a path to a directory where file will be saved; files -- a list of FileInfo objects; download_progress_callback -- a callback function that will be called for each file started downloading. """ def __init__(self, save_directory: str, files: List[FileInfo], download_progress_callback: Callable[[int], None] = None) -> None: self._save_directory = save_directory self._files = files self._url_opener = URLopener() self._url_opener.version = f"ScrapTheChan/{__version__}" self._progress_callback = download_progress_callback def run(self): raise NotImplementedError def _same_filename(self, filename: str, path: str) -> str: """Check if there is a file with same name. If so then add incremental number enclosed in brackets to a name of a new one.""" newname = filename while exists(join(path, newname)): has_extension = newname.rfind(".") != -1 if has_extension: l, r = newname.rsplit(".", 1) lbracket = l.rfind("(") if lbracket == -1: newname = f"{l}(1).{r}" else: num = l[lbracket+1:-1] if num.isnumeric(): newname = f"{l[:lbracket]}({int(num)+1}).{r}" else: newname = f"{l}(1).{r}" else: lbracket = l.rfind("(") if lbracket == -1: newname = f"{newname}(1)" else: num = newname[lbracket+1:-1] if num.isnumeric(): newname = f"{newname[:lbracket]}({int(num)+1})" return newname def _hash_file(self, filename: str, hash_algo: str = "md5", blocksize: int = 1048576) -> (str, str): """Compute hash of a file.""" hash_func = hashlib.new(hash_algo) with open(filename, 'rb') as f: buf = f.read(blocksize) while len(buf) > 0: hash_func.update(buf) buf = f.read(blocksize) return hash_func.hexdigest(), hash_func.digest() def _is_file_ok(self, f: FileInfo, filepath: str) -> bool: """Check if a file exist and isn't broken.""" if not exists(filepath): return False computed_size = getsize(filepath) is_size_match = f.size == computed_size \ or f.size == round(computed_size / 1024) hexdig, dig = self._hash_file(filepath, f.hash_algo) is_hash_match = f.hash_value == hexdig \ or f.hash_value == b64encode(dig).decode() return is_size_match and is_hash_match def _download_file(self, f: FileInfo): """Download a single file.""" filepath = join(self._save_directory, f.name) if self._is_file_ok(f, filepath): return True elif exists(filepath): filepath = join(self._save_directory, \ self._same_filename(f.name, self._save_directory)) self._url_opener.retrieve(f.dlurl, filepath)