*** Wartungsfenster jeden ersten Mittwoch vormittag im Monat ***

Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
formatscaper.py 6.35 KiB
#!/bin/env python3

import argparse
import dataclasses
import re
import subprocess
import sys
import threading

import progressbar
import yaml


@dataclasses.dataclass
class Format:
    puid: str
    name: str
    mime: str
    endangered: bool


@dataclasses.dataclass
class Result:
    filename: str
    record: str
    format: Format


# set up the argument parser
parser = argparse.ArgumentParser()
parser.add_argument(
    "--formats",
    "--f",
    default="formats.yml",
    help="list of known file formats and if they're endangered; this file will be updated (default: formats.yml)",  # noqa
)
parser.add_argument(
    "--input",
    "-i",
    default="-",
    help="input file for files of per record to check (default: stdin)",
)
parser.add_argument(
    "--output",
    "-o",
    default="results.yml",
    help="file in which to store the identified format for each file (default: results.yml)",  # noqa
)
parser.add_argument(
    "--parallel",
    "-p",
    default=1,
    type=int,
    help="number of siegfried processes to run in parallel (default: 1)",
)
parser.add_argument(
    "--sf-binary",
    default="sf",
    help="name of the siegfried binary to call (default: sf)",
)
parser.add_argument(
    "--sf-error-log",
    default="sf.log",
    help="file in which to store sf error logs (default: sf.log)",
)
parser.add_argument(
    "--no-progressbar",
    "-B",
    default=False,
    action="store_true",
    help="disable the progress bar",
)
args = parser.parse_args()


# check the siegfried binary
try:
    sf_output = subprocess.check_output([args.sf_binary, "-v"], text=True)
    m = re.match(r"siegfried ((\d+\.?)+)", sf_output)
    if m and m.group(1):
        ver_nums = [int(num) for num in m.group(1).split(".")]
        if not (ver_nums[0] >= 1 and ver_nums[1] >= 10):
            print(f"WARN: siegfried version too old ({m.group(1)})", file=sys.stderr)
    else:
        print("ERROR: siegfried version could not be determined", file=sys.stderr)
        sys.exit(1)
except FileNotFoundError:
    print(
        f"ERROR: siegfried binary could not be found ({args.sf_binary})",
        file=sys.stderr,
    )
    sys.exit(1)


# parse the list of known formats
formats = {}
try:
    with open(args.formats, "r") as formats_file:
        known_formats = yaml.safe_load(formats_file)
        for f in known_formats:
            format = Format(
                puid=f["puid"],
                name=f["name"],
                mime=f["mime"],
                endangered=f["endangered"],
            )
            formats[format.puid] = format
except FileNotFoundError:
    pass


# read the list of files to analyze
record_files = []
if args.input == "-":
    record_files = yaml.safe_load(sys.stdin)
else:
    with open(args.input, "r") as input_file:
        record_files = yaml.safe_load(input_file)


# try to redirect the error logs from siegfried
try:
    sf_error_log = open(args.sf_error_log, "w")
except OSError as e:
    print(
        f"WARN: couldn't open sf log file, printing to stderr instead ({e})",
        file=sys.stderr,
    )
    sf_error_log = None


# set up variables required in the collection of results
all_results = []
endangered_files = []
sem = threading.Semaphore(args.parallel)
mutex = threading.Lock()
completed_tasks = 0
progress_bar = progressbar.ProgressBar(
    max_value=len(record_files),
    widgets=[
        # fmt: off
        progressbar.Percentage(), " (", progressbar.SimpleProgress(), ") ", progressbar.Bar(), " ", progressbar.Timer(),  # noqa
        # fmt: on
    ],
)


def process_record_file(record_file):
    with sem:
        sf_output = subprocess.check_output(
            [
                args.sf_binary,
                "-z",
                "-multi",
                "1",
                "-name",
                record_file["filename"],
                record_file["uri"],
            ],
            stderr=sf_error_log,
        )

        # skip the sf info part
        file_infos = yaml.safe_load_all(sf_output)
        next(file_infos)

        # go through all the files analyzed by siegfried which can be several,
        # if the original input file was an archive
        for file_info in file_infos:
            if not file_info.get("errors", None) and file_info.get("matches", []):
                for match in file_info["matches"]:
                    if match["ns"] == "pronom":
                        format = Format(
                            name=match["format"],
                            puid=match["id"],
                            mime=match["mime"],
                            endangered=False,
                        )

                        # the storing of results needs to be mutually exclusive
                        with mutex:
                            format = formats.setdefault(format.puid, format)
                            result = Result(
                                filename=file_info["filename"],
                                record=record_file["record"],
                                format=format,
                            )
                            all_results.append(result)
                            if formats[format.puid].endangered:
                                endangered_files.append(result)

        # when the task ends, update the progress bar
        with mutex:
            global completed_tasks
            completed_tasks += 1
            if not args.no_progressbar:
                progress_bar.update(completed_tasks)


# analyze all the files in parallel, and create the summary after all threads complete
threads = []
for record_file in record_files or []:
    thread = threading.Thread(target=process_record_file, args=[record_file])
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

if sf_error_log is not None:
    sf_error_log.close()

if endangered_files:
    print(yaml.dump([dataclasses.asdict(f) for f in endangered_files]))


# store the results to files
try:
    with open(args.output, "w") as output_file:
        yaml.dump([dataclasses.asdict(res) for res in all_results], output_file)

except OSError:
    print(f"WARN: couldn't store the results ({args.output})", file=sys.stderr)

try:
    updated_formats = [dataclasses.asdict(f) for f in formats.values()]
    with open(args.formats, "w") as formats_file:
        yaml.dump(updated_formats, formats_file)

except OSError:
    print(f"ERROR: couldn't update the formats file ({args.formats})", file=sys.stderr)