-
Moser, Maximilian authored
* this should help cut down runtime if there are large files involved (especially archives)
Moser, Maximilian authored* this should help cut down runtime if there are large files involved (especially archives)
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
formatscaper.py 6.35 KiB
#!/bin/env python3
import argparse
import dataclasses
import re
import subprocess
import sys
import threading
import progressbar
import yaml
@dataclasses.dataclass
class Format:
puid: str
name: str
mime: str
endangered: bool
@dataclasses.dataclass
class Result:
filename: str
record: str
format: Format
# set up the argument parser
parser = argparse.ArgumentParser()
parser.add_argument(
"--formats",
"--f",
default="formats.yml",
help="list of known file formats and if they're endangered; this file will be updated (default: formats.yml)", # noqa
)
parser.add_argument(
"--input",
"-i",
default="-",
help="input file for files of per record to check (default: stdin)",
)
parser.add_argument(
"--output",
"-o",
default="results.yml",
help="file in which to store the identified format for each file (default: results.yml)", # noqa
)
parser.add_argument(
"--parallel",
"-p",
default=1,
type=int,
help="number of siegfried processes to run in parallel (default: 1)",
)
parser.add_argument(
"--sf-binary",
default="sf",
help="name of the siegfried binary to call (default: sf)",
)
parser.add_argument(
"--sf-error-log",
default="sf.log",
help="file in which to store sf error logs (default: sf.log)",
)
parser.add_argument(
"--no-progressbar",
"-B",
default=False,
action="store_true",
help="disable the progress bar",
)
args = parser.parse_args()
# check the siegfried binary
try:
sf_output = subprocess.check_output([args.sf_binary, "-v"], text=True)
m = re.match(r"siegfried ((\d+\.?)+)", sf_output)
if m and m.group(1):
ver_nums = [int(num) for num in m.group(1).split(".")]
if not (ver_nums[0] >= 1 and ver_nums[1] >= 10):
print(f"WARN: siegfried version too old ({m.group(1)})", file=sys.stderr)
else:
print("ERROR: siegfried version could not be determined", file=sys.stderr)
sys.exit(1)
except FileNotFoundError:
print(
f"ERROR: siegfried binary could not be found ({args.sf_binary})",
file=sys.stderr,
)
sys.exit(1)
# parse the list of known formats
formats = {}
try:
with open(args.formats, "r") as formats_file:
known_formats = yaml.safe_load(formats_file)
for f in known_formats:
format = Format(
puid=f["puid"],
name=f["name"],
mime=f["mime"],
endangered=f["endangered"],
)
formats[format.puid] = format
except FileNotFoundError:
pass
# read the list of files to analyze
record_files = []
if args.input == "-":
record_files = yaml.safe_load(sys.stdin)
else:
with open(args.input, "r") as input_file:
record_files = yaml.safe_load(input_file)
# try to redirect the error logs from siegfried
try:
sf_error_log = open(args.sf_error_log, "w")
except OSError as e:
print(
f"WARN: couldn't open sf log file, printing to stderr instead ({e})",
file=sys.stderr,
)
sf_error_log = None
# set up variables required in the collection of results
all_results = []
endangered_files = []
sem = threading.Semaphore(args.parallel)
mutex = threading.Lock()
completed_tasks = 0
progress_bar = progressbar.ProgressBar(
max_value=len(record_files),
widgets=[
# fmt: off
progressbar.Percentage(), " (", progressbar.SimpleProgress(), ") ", progressbar.Bar(), " ", progressbar.Timer(), # noqa
# fmt: on
],
)
def process_record_file(record_file):
with sem:
sf_output = subprocess.check_output(
[
args.sf_binary,
"-z",
"-multi",
"1",
"-name",
record_file["filename"],
record_file["uri"],
],
stderr=sf_error_log,
)
# skip the sf info part
file_infos = yaml.safe_load_all(sf_output)
next(file_infos)
# go through all the files analyzed by siegfried which can be several,
# if the original input file was an archive
for file_info in file_infos:
if not file_info.get("errors", None) and file_info.get("matches", []):
for match in file_info["matches"]:
if match["ns"] == "pronom":
format = Format(
name=match["format"],
puid=match["id"],
mime=match["mime"],
endangered=False,
)
# the storing of results needs to be mutually exclusive
with mutex:
format = formats.setdefault(format.puid, format)
result = Result(
filename=file_info["filename"],
record=record_file["record"],
format=format,
)
all_results.append(result)
if formats[format.puid].endangered:
endangered_files.append(result)
# when the task ends, update the progress bar
with mutex:
global completed_tasks
completed_tasks += 1
if not args.no_progressbar:
progress_bar.update(completed_tasks)
# analyze all the files in parallel, and create the summary after all threads complete
threads = []
for record_file in record_files or []:
thread = threading.Thread(target=process_record_file, args=[record_file])
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if sf_error_log is not None:
sf_error_log.close()
if endangered_files:
print(yaml.dump([dataclasses.asdict(f) for f in endangered_files]))
# store the results to files
try:
with open(args.output, "w") as output_file:
yaml.dump([dataclasses.asdict(res) for res in all_results], output_file)
except OSError:
print(f"WARN: couldn't store the results ({args.output})", file=sys.stderr)
try:
updated_formats = [dataclasses.asdict(f) for f in formats.values()]
with open(args.formats, "w") as formats_file:
yaml.dump(updated_formats, formats_file)
except OSError:
print(f"ERROR: couldn't update the formats file ({args.formats})", file=sys.stderr)