1
0
Fork 0
ScrapTheChan/scrapthechan/scraper.py

112 lines
4.4 KiB
Python

"""Base class for all scrapers that will actually do the job."""
from base64 import b64encode
from os import remove, stat
from os.path import exists, join, getsize
import re
from typing import List, Callable
from urllib.request import urlretrieve, URLopener, HTTPError
import hashlib
from scrapthechan import USER_AGENT
from scrapthechan.fileinfo import FileInfo
__all__ = ["Scraper"]
class Scraper:
"""Base class for all scrapers that will actually do the job.
Arguments:
save_directory -- a path to a directory where file will be
saved;
files -- a list of FileInfo objects;
download_progress_callback -- a callback function that will be called
for each file started downloading.
"""
def __init__(self, save_directory: str, files: List[FileInfo],
download_progress_callback: Callable[[int], None] = None) -> None:
self._save_directory = save_directory
self._files = files
self._url_opener = URLopener()
self._url_opener.addheaders = [('User-Agent', USER_AGENT)]
self._url_opener.version = USER_AGENT
self._progress_callback = download_progress_callback
def run(self):
raise NotImplementedError
def _same_filename(self, filename: str, path: str) -> str:
"""Check if there is a file with same name. If so then add incremental
number enclosed in brackets to a name of a new one."""
newname = filename
while exists(join(path, newname)):
has_extension = newname.rfind(".") != -1
if has_extension:
l, r = newname.rsplit(".", 1)
lbracket = l.rfind("(")
if lbracket == -1:
newname = f"{l}(1).{r}"
else:
num = l[lbracket+1:-1]
if num.isnumeric():
newname = f"{l[:lbracket]}({int(num)+1}).{r}"
else:
newname = f"{l}(1).{r}"
else:
lbracket = l.rfind("(")
if lbracket == -1:
newname = f"{newname}(1)"
else:
num = newname[lbracket+1:-1]
if num.isnumeric():
newname = f"{newname[:lbracket]}({int(num)+1})"
return newname
def _hash_file(self, filepath: str, hash_algorithm: str = "md5",
blocksize: int = 1048576) -> (str, str):
"""Compute hash of a file."""
hash_func = hashlib.new(hash_algorithm)
with open(filepath, 'rb') as f:
buf = f.read(blocksize)
while len(buf) > 0:
hash_func.update(buf)
buf = f.read(blocksize)
return hash_func.hexdigest(), b64encode(hash_func.digest()).decode()
def _check_file(self, f: FileInfo, filepath: str) -> bool:
"""Check if a file exist and isn't broken."""
if not exists(filepath):
return False
computed_size = getsize(filepath)
if not (f.size == computed_size \
or f.size == round(computed_size / 1024)):
return False
hexdig, dig = self._hash_file(filepath, f.hash_algorithm)
return f.hash_value == hexdig or f.hash_value == dig
def _download_file(self, f: FileInfo):
"""Download a single file."""
is_same_filename = False
filepath = join(self._save_directory, f.name)
orig_filepath = filepath
if self._check_file(f, filepath):
return
elif exists(filepath):
is_same_filename = True
filepath = join(self._save_directory, \
self._same_filename(f.name, self._save_directory))
try:
self._url_opener.retrieve(f.download_url, filepath)
if is_same_filename:
f1_hexdig, f1_dig = self._hash_file(orig_filepath, f.hash_algorithm)
f2_hexdig, f2_dig = self._hash_file(filepath, f.hash_algorithm)
assert filepath != orig_filepath, 'Filepaths are matching!'
if f1_hexdig == f2_hexdig or f1_dig == f2_dig:
remove(filepath)
except HTTPError as e:
print(e, f.download_url)
except ConnectionResetError:
print("Remote host reset connection for", f.download_url, \
"Try again later.")