1
0
Fork 0
ScrapTheChan/scrapthechan/cli/scraper.py

136 lines
4.0 KiB
Python

from argparse import ArgumentParser
from os import makedirs
from os.path import join, exists
from re import search
from sys import argv
from typing import List, Optional
from scrapthechan import VERSION
from scrapthechan.parser import Parser, ThreadNotFoundError
from scrapthechan.parsers import get_parser_by_url, get_parser_by_site, \
SUPPORTED_IMAGEBOARDS
from scrapthechan.scrapers.threadedscraper import ThreadedScraper
__all__ = ["main"]
USAGE: str = \
f"""Usage: scrapthechan [OPTIONS] (URL | IMAGEBOARD BOARD THREAD)
Options:
\t-h,--help -- print this help and exit;
\t-v,--version -- print program's version and exit;
\t-o,--output-dir -- directory where to place scraped files. By default
\t following structure will be created in current directory:
\t <imageboard>/<board>/<thread>;
\t-N,--no-op -- by default OP's post will be written in !op.txt file. This
\t option disables this behaviour;
\t-S,--skip-posts <num> -- skip given number of posts.
Arguments:
\tURL -- URL of a thread;
\tIMAGEBOARD -- name of a imageboard. E.g. 4chan;
\tBOARD -- short name of a board. E.g. b;
\tTHREAD -- ID of a thread. E.g. 100500.
Supported imageboards: {', '.join(SUPPORTED_IMAGEBOARDS)}.
"""
def parse_common_arguments(args: str) -> Optional[dict]:
r = r"(?P<help>-h|--help)|(?P<version>-v|--version)"
args = search(r, args)
if not args is None:
args = args.groupdict()
return {
"help": not args["help"] is None,
"version": not args["version"] is None }
return None
def parse_arguments(args: str) -> dict:
rlink = r"^(https?:\/\/)?(?P<site>[\w.-]+)[ \/](?P<board>\w+)(\S+)?[ \/](?P<thread>\w+)"
link = search(rlink, args)
if not link is None:
link = link.groupdict()
out_dir = search(r"(?=(-o|--output-dir) (?P<outdir>\S+))", args)
skip_posts = search(r"(?=(-S|--skip-posts) (?P<skip>\d+))", args)
return {
"site": None if link is None else link["site"],
"board": None if link is None else link["board"],
"thread": None if link is None else link["thread"],
"skip-posts": None if skip_posts is None else int(skip_posts.group('skip')),
"no-op": not search(r"-N|--no-op", args) is None,
"output-dir": None if out_dir is None \
else out_dir.groupdict()["outdir"] }
def main() -> None:
if len(argv) == 1:
print(USAGE)
exit()
cargs = parse_common_arguments(' '.join(argv[1:]))
if not cargs is None:
if cargs["help"]:
print(USAGE)
exit()
elif cargs["version"]:
print(VERSION)
exit()
args = parse_arguments(' '.join(argv[1:]))
if args is None \
or not "site" in args or not "board" in args or not "thread" in args:
print(USAGE)
exit()
try:
if not args["skip-posts"] is None:
parser = get_parser_by_site(args["site"], args["board"],
args["thread"], args["skip-posts"])
else:
parser = get_parser_by_site(args["site"], args["board"],
args["thread"])
except NotImplementedError as ex:
print(f"{str(ex)}.")
print(f"Supported image boards are {', '.join(SUPPORTED_IMAGEBOARDS)}")
exit()
except ThreadNotFoundError as e:
print(f"Thread {args['site']}/{args['board']}/{args['thread']} " \
f"not found. Reason: {e.reason}")
exit()
files_count = len(parser.files)
if not args["output-dir"] is None:
save_dir = args["output-dir"]
else:
save_dir = join(parser.imageboard, parser.board,
parser.thread)
print(f"{files_count} files in " \
f"{args['site']}/{args['board']}/{args['thread']}. " \
f"They're going to {save_dir}. ", end="")
makedirs(save_dir, exist_ok=True)
if not args["no-op"]:
if parser.op is None:
print("OP's empty.")
elif not exists(join(save_dir, "!op.txt")):
with open(join(save_dir, "!op.txt"), 'w', encoding='utf-8') as opf:
opf.write(f"{parser.op}\n")
print("OP's written.")
else:
print("OP exists.")
scraper = ThreadedScraper(save_dir, parser.files, \
lambda i: print(f"{i}/{files_count}", end="\r"))
scraper.run()
if __name__ == "__main__":
main()