136 lines
4.0 KiB
Python
136 lines
4.0 KiB
Python
from argparse import ArgumentParser
|
|
from os import makedirs
|
|
from os.path import join, exists
|
|
from re import search
|
|
from sys import argv
|
|
from typing import List, Optional
|
|
|
|
from scrapthechan import VERSION
|
|
from scrapthechan.parser import Parser, ThreadNotFoundError
|
|
from scrapthechan.parsers import get_parser_by_url, get_parser_by_site, \
|
|
SUPPORTED_IMAGEBOARDS
|
|
from scrapthechan.scrapers.threadedscraper import ThreadedScraper
|
|
|
|
|
|
__all__ = ["main"]
|
|
|
|
|
|
USAGE: str = \
|
|
f"""Usage: scrapthechan [OPTIONS] (URL | IMAGEBOARD BOARD THREAD)
|
|
|
|
Options:
|
|
\t-h,--help -- print this help and exit;
|
|
\t-v,--version -- print program's version and exit;
|
|
\t-o,--output-dir -- directory where to place scraped files. By default
|
|
\t following structure will be created in current directory:
|
|
\t <imageboard>/<board>/<thread>;
|
|
\t-N,--no-op -- by default OP's post will be written in !op.txt file. This
|
|
\t option disables this behaviour;
|
|
\t-S,--skip-posts <num> -- skip given number of posts.
|
|
|
|
Arguments:
|
|
\tURL -- URL of a thread;
|
|
\tIMAGEBOARD -- name of a imageboard. E.g. 4chan;
|
|
\tBOARD -- short name of a board. E.g. b;
|
|
\tTHREAD -- ID of a thread. E.g. 100500.
|
|
|
|
Supported imageboards: {', '.join(SUPPORTED_IMAGEBOARDS)}.
|
|
"""
|
|
|
|
|
|
def parse_common_arguments(args: str) -> Optional[dict]:
|
|
r = r"(?P<help>-h|--help)|(?P<version>-v|--version)"
|
|
args = search(r, args)
|
|
if not args is None:
|
|
args = args.groupdict()
|
|
return {
|
|
"help": not args["help"] is None,
|
|
"version": not args["version"] is None }
|
|
return None
|
|
|
|
def parse_arguments(args: str) -> dict:
|
|
rlink = r"^(https?:\/\/)?(?P<site>[\w.-]+)[ \/](?P<board>\w+)(\S+)?[ \/](?P<thread>\w+)"
|
|
link = search(rlink, args)
|
|
if not link is None:
|
|
link = link.groupdict()
|
|
out_dir = search(r"(?=(-o|--output-dir) (?P<outdir>\S+))", args)
|
|
skip_posts = search(r"(?=(-S|--skip-posts) (?P<skip>\d+))", args)
|
|
return {
|
|
"site": None if link is None else link["site"],
|
|
"board": None if link is None else link["board"],
|
|
"thread": None if link is None else link["thread"],
|
|
"skip-posts": None if skip_posts is None else int(skip_posts.group('skip')),
|
|
"no-op": not search(r"-N|--no-op", args) is None,
|
|
"output-dir": None if out_dir is None \
|
|
else out_dir.groupdict()["outdir"] }
|
|
|
|
def main() -> None:
|
|
if len(argv) == 1:
|
|
print(USAGE)
|
|
exit()
|
|
|
|
cargs = parse_common_arguments(' '.join(argv[1:]))
|
|
if not cargs is None:
|
|
if cargs["help"]:
|
|
print(USAGE)
|
|
exit()
|
|
elif cargs["version"]:
|
|
print(VERSION)
|
|
exit()
|
|
|
|
args = parse_arguments(' '.join(argv[1:]))
|
|
if args is None \
|
|
or not "site" in args or not "board" in args or not "thread" in args:
|
|
print(USAGE)
|
|
exit()
|
|
|
|
try:
|
|
if not args["skip-posts"] is None:
|
|
parser = get_parser_by_site(args["site"], args["board"],
|
|
args["thread"], args["skip-posts"])
|
|
else:
|
|
parser = get_parser_by_site(args["site"], args["board"],
|
|
args["thread"])
|
|
except NotImplementedError as ex:
|
|
print(f"{str(ex)}.")
|
|
print(f"Supported image boards are {', '.join(SUPPORTED_IMAGEBOARDS)}")
|
|
exit()
|
|
except ThreadNotFoundError as e:
|
|
print(f"Thread {args['site']}/{args['board']}/{args['thread']} " \
|
|
"not found. Reason: {e.reason}")
|
|
exit()
|
|
|
|
files_count = len(parser.files)
|
|
|
|
if not args["output-dir"] is None:
|
|
save_dir = args["output-dir"]
|
|
else:
|
|
save_dir = join(parser.imageboard, parser.board,
|
|
parser.thread)
|
|
|
|
print(f"{files_count} files in " \
|
|
f"{args['site']}/{args['board']}/{args['thread']}. " \
|
|
f"They're going to {save_dir}. ", end="")
|
|
|
|
makedirs(save_dir, exist_ok=True)
|
|
|
|
|
|
if not args["no-op"]:
|
|
if parser.op is None:
|
|
print("OP's empty.")
|
|
elif not exists(join(save_dir, "!op.txt")):
|
|
with open(join(save_dir, "!op.txt"), 'w', encoding='utf-8') as opf:
|
|
opf.write(f"{parser.op}\n")
|
|
print("OP's written.")
|
|
else:
|
|
print("OP exists.")
|
|
|
|
|
|
scraper = ThreadedScraper(save_dir, parser.files, \
|
|
lambda i: print(f"{i}/{files_count}", end="\r"))
|
|
scraper.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|