*** Wartungsfenster jeden ersten Mittwoch vormittag im Monat ***

Skip to content
Snippets Groups Projects
Commit 4d2e8c48 authored by Moser, Maximilian's avatar Moser, Maximilian
Browse files

Rework auto API into a set of functions

* these functions only act as wrappers for actual (one-shot) services
  for simple use-cases and don't need state handling
* remove the `archive_data()` function for now, as that's currently not
  a relevant use-case
parent d5e6b6ae
No related branches found
No related tags found
1 merge request!5v0.2.0
"""Library for connecting VRE of TU Wien with the research data repositories."""
from .api import DBRepo, InvenioRDM
from .auto import AutoAPI
from .auto import download, suggest_repository, upload
__all__ = [
AutoAPI,
DBRepo,
InvenioRDM,
download,
suggest_repository,
upload,
]
......@@ -3,151 +3,143 @@
# Copyright (C) 2024 TU Wien.
#
"""Helpers for interacting automatically with InvenioRDM and DBRepo."""
"""Helpers for interacting automatically with various repository types."""
import pathlib
from typing import Optional, Union
from urllib.parse import urlparse, urlunparse
from typing import Optional, Tuple
from urllib.parse import urlparse
import pandas as pd
import requests
from .api.base import BaseWrapper
from .api.dbrepo import DBRepo
from .api.inveniordm import InvenioRDM
from .utils import url_regex
KNOWN_INSTANCES = {
"test.dbrepo.tuwien.ac.at": DBRepo,
"dbrepo1.ec.tuwien.ac.at": DBRepo,
"zenodo.org": InvenioRDM,
"researchdata.tuwien.ac.at": InvenioRDM,
"researchdata.tuwien.at": InvenioRDM,
"test.researchdata.tuwien.at": InvenioRDM,
"s168.dl.hpc.tuwien.ac.at": InvenioRDM,
}
DEFAULT_URLS = {
[url for url, r in KNOWN_INSTANCES.items() if r == repo][0]
for repo in set(KNOWN_INSTANCES.values())
}
def _resolve_service(host: str, full_url: str) -> Optional[BaseWrapper]:
"""Resolve service based on known URLs."""
# TODO maybe we could utilize re3data here, or deduce the repository type
# from a hint in the HTML
return KNOWN_INSTANCES.get(host, None)
def _follow_redirects(url: str) -> str:
"""Follow HTTP redirects and returns the final URL."""
try:
response = requests.head(url, allow_redirects=True)
return response.url
except requests.RequestException as e:
raise ValueError(f"Following redirect for '{url}' failed: {e}")
def suggest_repository(url: str) -> Optional[BaseWrapper]:
"""Return the suggested repository system according to the URL provided.
The returned repository wrapper isn't authenticated yet, as not all operations
require authentication.
If the URL doesn't provide sufficient hints to deduce a repository type,
``None`` will be returned.
"""
if match := url_regex.match(url):
scheme, rest = match.group(2) or "https", match.group(3)
url = f"{scheme}://{rest}"
host = urlparse(url).netloc
else:
raise ValueError(f"invalid url: {url}")
class AutoAPI:
"""Utility class which decides on which system to connect."""
def __init__(self, url: str, auth_token_or_credentials: str = None):
"""Auto API constructor."""
self.url = url
self.auth_token_or_credentials = auth_token_or_credentials
self.known_instances = {
"dbrepo1.ec.tuwien.ac.at": "DBRepo",
"test.dbrepo.tuwien.ac.at": "DBRepo",
"researchdata.tuwien.ac.at": "InvenioRDM",
"researchdata.tuwien.at": "InvenioRDM",
"test.researchdata.tuwien.at": "InvenioRDM",
"s168.dl.hpc.tuwien.ac.at": "InvenioRDM",
}
def _inject_api_route(self, url: str) -> str:
"""Injects '/api' after the host if not included."""
parsed_url = urlparse(url)
new_path = (
f"/api{parsed_url.path}"
if not parsed_url.path.startswith("/api")
else parsed_url.path
)
return urlunparse(
(
parsed_url.scheme,
parsed_url.netloc,
new_path,
parsed_url.params,
parsed_url.query,
parsed_url.fragment,
)
)
def _resolve_service(self, host: str) -> Optional[str]:
"""Resolves service based on known URLs."""
return self.known_instances.get(host)
def _follow_redirects(self, url: str):
"""Follows HTTP redirects and returns the final URL."""
try:
response = requests.head(url, allow_redirects=True)
return response.url
# if we get a DOI, we need to resolve it
if host == "doi.org":
return suggest_repository(_follow_redirects(url))
except requests.RequestException as e:
raise ValueError(f"Error following redirect {e}")
# resolve the service and return an instance
service_cls = _resolve_service(host, url)
if service_cls is None:
return None
def check_or_set_api_token(self) -> None:
"""Checks if api token is set and requests it if not."""
if self.auth_token_or_credentials is None:
self.auth_token_or_credentials = input(
"Input your access token for InvenioRDM: "
)
return service_cls(url)
def suggest_repository(self, url) -> Union[DBRepo, InvenioRDM, None]:
"""Returns the suggested repository system according to the URL provided."""
host = urlparse(url).netloc
if host == "doi.org":
return self.suggest_repository(self._follow_redirects(url))
def download(url: str, all: bool = False, interactive: bool = True) -> Optional[str]:
"""Download file automatically based on the URL."""
if (service := suggest_repository(url)) is None:
return None
# fish out the container & file from the URL
cid, fid = service.url_to_parts(url)
if cid is None and fid is None:
cid, fid = service.url_to_parts(_follow_redirects(url))
elif self._resolve_service(host) == "DBRepo":
dbrepo = DBRepo(url)
# if we couldn't determine a file, ask the user in interactive mode
if fid is None:
if all:
# TODO get a list of all files
pass
elif interactive:
# TODO ask the user which file to download
pass
else:
return None
username = input("Username for DBRepo: ")
password = input("Password for DBRepo: ")
dbrepo.authenticate(username, password)
try:
return service.download(cid, fid)
return dbrepo
except Exception:
# if the download didn't work, we can try again with authentication
if interactive:
service.authenticate_interactive()
return service.download(cid, fid)
elif self._resolve_service(host) == "InvenioRDM":
finally:
service.clear_auth()
self.check_or_set_api_token()
return InvenioRDM(
api_token=self.auth_token_or_credentials,
url=urlparse(url).scheme + "://" + host,
)
print("Unknown host for repository suggestion.")
def upload(
file_path: str, url: Optional[str] = None
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Upload the file to an auto-selected repository.
def upload_data(self, metadata: dict, file_path: str, **kwargs) -> Optional[str]:
"""Inspects the specified file and decides in which repository system to upload it to."""
service = None
Return a triple with the repository's URL, the container ID and the file ID.
"""
service = None
if url is None:
# if no URL has been specified, base the choice on the file's content
try:
pd.read_csv(file_path)
# If successful, it's tabular data
service = DBRepo(self.url)
username = input("Username for DBRepo: ")
password = input("Password for DBRepo: ")
service.authenticate(username, password)
return service.upload(
file_path, kwargs.get("database_id"), kwargs.get("table_name")
)
except pd.errors.EmptyDataError:
return "Invalid file: Empty data"
url = DEFAULT_URLS[DBRepo]
service = DBRepo(url)
except pd.errors.ParserError:
# File is not tabular or has parsing issues
self.check_or_set_api_token()
service = InvenioRDM(
self.auth_token_or_credentials,
urlparse(self.url).scheme + "://" + urlparse(self.url).netloc,
)
draft_id = service.create(metadata)
return service.upload(file_path, draft_id, pathlib.Path(file_path).name)
except Exception as e:
return f"Error: {e}"
finally:
if service is not None:
service.clear_auth()
def download_data(self, file_name: str, **kwargs) -> Optional[str]:
"""Downloads file automatically based on the URL."""
service = self.suggest_repository(self.url)
if service is not None:
modified_url = self._inject_api_route(self.url)
return service.download(modified_url, file_name)
url = DEFAULT_URLS[InvenioRDM]
service = InvenioRDM(url)
def archive_data(
self, metadata: dict, file_name: str, file_path: str
) -> Optional[str]:
"""Sends data to InvenioRDM for all file types."""
service = self.suggest_repository(self.url)
else:
service = suggest_repository(url)
try:
service.authenticate_interactive()
return url, *service.upload(file_path)
finally:
if service is not None:
if isinstance(service, InvenioRDM):
draft_id = service.create(metadata)
return service.upload(draft_id, file_name, file_path)
else:
print(
"Archive data operation is only supported for InvenioRDM instances."
)
service.clear_auth()
return None, None, None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment