97 lines
3.7 KiB
Python
97 lines
3.7 KiB
Python
"""Base class for all scrapers that will actually do the job."""
|
|
|
|
from base64 import b64encode
|
|
from os import remove, stat
|
|
from os.path import exists, join, getsize
|
|
import re
|
|
from typing import List, Callable
|
|
from urllib.request import urlretrieve, URLopener
|
|
import hashlib
|
|
|
|
from scrapthechan import USER_AGENT
|
|
from scrapthechan.fileinfo import FileInfo
|
|
|
|
__all__ = ["Scraper"]
|
|
|
|
|
|
class Scraper:
|
|
"""Base class for all scrapers that will actually do the job.
|
|
|
|
Arguments:
|
|
save_directory -- a path to a directory where file will be
|
|
saved;
|
|
files -- a list of FileInfo objects;
|
|
download_progress_callback -- a callback function that will be called
|
|
for each file started downloading.
|
|
"""
|
|
def __init__(self, save_directory: str, files: List[FileInfo],
|
|
download_progress_callback: Callable[[int], None] = None) -> None:
|
|
self._save_directory = save_directory
|
|
self._files = files
|
|
self._url_opener = URLopener()
|
|
self._url_opener.version = USER_AGENT
|
|
self._progress_callback = download_progress_callback
|
|
|
|
def run(self):
|
|
raise NotImplementedError
|
|
|
|
def _same_filename(self, filename: str, path: str) -> str:
|
|
"""Check if there is a file with same name. If so then add incremental
|
|
number enclosed in brackets to a name of a new one."""
|
|
newname = filename
|
|
while exists(join(path, newname)):
|
|
has_extension = newname.rfind(".") != -1
|
|
if has_extension:
|
|
l, r = newname.rsplit(".", 1)
|
|
lbracket = l.rfind("(")
|
|
if lbracket == -1:
|
|
newname = f"{l}(1).{r}"
|
|
else:
|
|
num = l[lbracket+1:-1]
|
|
if num.isnumeric():
|
|
newname = f"{l[:lbracket]}({int(num)+1}).{r}"
|
|
else:
|
|
newname = f"{l}(1).{r}"
|
|
else:
|
|
lbracket = l.rfind("(")
|
|
if lbracket == -1:
|
|
newname = f"{newname}(1)"
|
|
else:
|
|
num = newname[lbracket+1:-1]
|
|
if num.isnumeric():
|
|
newname = f"{newname[:lbracket]}({int(num)+1})"
|
|
return newname
|
|
|
|
def _hash_file(self, filename: str, hash_algo: str = "md5",
|
|
blocksize: int = 1048576) -> (str, str):
|
|
"""Compute hash of a file."""
|
|
hash_func = hashlib.new(hash_algo)
|
|
with open(filename, 'rb') as f:
|
|
buf = f.read(blocksize)
|
|
while len(buf) > 0:
|
|
hash_func.update(buf)
|
|
buf = f.read(blocksize)
|
|
return hash_func.hexdigest(), hash_func.digest()
|
|
|
|
def _is_file_ok(self, f: FileInfo, filepath: str) -> bool:
|
|
"""Check if a file exist and isn't broken."""
|
|
if not exists(filepath):
|
|
return False
|
|
computed_size = getsize(filepath)
|
|
is_size_match = f.size == computed_size \
|
|
or f.size == round(computed_size / 1024)
|
|
hexdig, dig = self._hash_file(filepath, f.hash_algo)
|
|
is_hash_match = f.hash_value == hexdig \
|
|
or f.hash_value == b64encode(dig).decode()
|
|
return is_size_match and is_hash_match
|
|
|
|
def _download_file(self, f: FileInfo):
|
|
"""Download a single file."""
|
|
filepath = join(self._save_directory, f.name)
|
|
if self._is_file_ok(f, filepath):
|
|
return True
|
|
elif exists(filepath):
|
|
filepath = join(self._save_directory, \
|
|
self._same_filename(f.name, self._save_directory))
|
|
self._url_opener.retrieve(f.dlurl, filepath)
|