From 08f9eb1eb18264e50dbea3fdaefad240aa846c9e Mon Sep 17 00:00:00 2001
From: Maximilian Moser <maximilian.moser@tuwien.ac.at>
Date: Wed, 26 Jul 2023 16:16:27 +0200
Subject: [PATCH] Add task for updating the names vocabulary with TISS data

* for now, we're limiting the updates to employees who have their ORCID
  identifier linked on their TISS profile, because that allows us to
  create perfect matches
---
 invenio_config_tuw/tasks.py         | 136 ++++++++++++++++++++++++++++
 invenio_config_tuw/tiss/__init__.py |  17 ++++
 invenio_config_tuw/tiss/models.py   |  86 ++++++++++++++++++
 invenio_config_tuw/tiss/utils.py    |  54 +++++++++++
 4 files changed, 293 insertions(+)
 create mode 100644 invenio_config_tuw/tasks.py
 create mode 100644 invenio_config_tuw/tiss/__init__.py
 create mode 100644 invenio_config_tuw/tiss/models.py
 create mode 100644 invenio_config_tuw/tiss/utils.py

diff --git a/invenio_config_tuw/tasks.py b/invenio_config_tuw/tasks.py
new file mode 100644
index 0000000..69e07f3
--- /dev/null
+++ b/invenio_config_tuw/tasks.py
@@ -0,0 +1,136 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2023 TU Wien.
+#
+# Invenio-Config-TUW is free software; you can redistribute it and/or modify
+# it under the terms of the MIT License; see LICENSE file for more details.
+
+"""Celery tasks running in the background."""
+
+import copy
+from typing import List, Optional
+
+import requests
+from celery import shared_task
+from flask import current_app
+from invenio_access.permissions import system_identity
+from invenio_db import db
+from invenio_records_resources.services.uow import UnitOfWork
+from invenio_vocabularies.contrib.names.api import Name
+
+from .tiss import Employee, fetch_tiss_data
+
+
+def get_tuw_ror_aliases():
+    """Fetch the aliases of TU Wien known to ROR."""
+    try:
+        response = requests.get("https://api.ror.org/organizations/04d836q62")
+        if response == 200:
+            tuw_ror = response.json()
+            tuw_ror_names = [tuw_ror["name"], *tuw_ror["acronyms"], *tuw_ror["aliases"]]
+            return tuw_ror_names
+
+    except Exception as e:
+        current_app.logger.warn(
+            f"Error while fetching TU Wien information from ROR: {e}"
+        )
+
+    return [
+        "TU Wien",
+        "TUW",
+        "Technische Universität Wien",
+        "Vienna University of Technology",
+    ]
+
+
+def find_orcid_match(employee: Employee, names: List[Name]) -> Optional[Name]:
+    """Find the name entry with the same ORCID as the given employee."""
+    if not employee.orcid:
+        return None
+
+    for name in names:
+        if {"scheme": "orcid", "identifier": employee.orcid} in name.get(
+            "identifiers", []
+        ):
+            return name
+
+    return None
+
+
+def update_name_data(
+    name: dict, employee: Employee, tuw_aliases: Optional[List[str]] = None
+) -> dict:
+    """Update the given name entry data with the information from the employee."""
+    tuw_aliases = tuw_aliases or ["TU Wien"]
+    name = copy.deepcopy(name)
+    name["given_name"] = employee.first_name
+    name["family_name"] = employee.last_name
+
+    # normalize & deduplicate affilations, and make sure that TU Wien is one of them
+    # NOTE: sorting is done to remove indeterminism and prevent unnecessary updates
+    affiliations = {
+        aff["name"] for aff in name["affiliations"] if aff["name"] not in tuw_aliases
+    }
+    affiliations.add("TU Wien")
+    name["affiliations"] = sorted(
+        [{"name": aff} for aff in affiliations], key=lambda aff: aff["name"]
+    )
+
+    # similar to above, add the ORCID mentioned in TISS and deduplicate
+    identifiers = {(id_["scheme"], id_["identifier"]) for id_ in name["identifiers"]}
+    if employee.orcid:
+        identifiers.add(("orcid", employee.orcid))
+
+    name["identifiers"] = sorted(
+        [{"scheme": scheme, "identifier": id_} for scheme, id_ in identifiers],
+        key=lambda id_: f'{id_["scheme"]}:{id_["identifier"]}',
+    )
+
+    return name
+
+
+@shared_task(ignore_result=True)
+def sync_names_from_tiss():
+    """Look up TU Wien employees via TISS and update the names vocabulary."""
+    results = {"created": 0, "updated": 0}
+    tuw_ror_aliases = get_tuw_ror_aliases()
+    svc = current_app.extensions["invenio-vocabularies"].names_service
+
+    all_names = [
+        svc.record_cls.get_record(model.id)
+        for model in svc.record_cls.model_cls.query.all()
+        if not model.is_deleted and model.data
+    ]
+
+    _, employees = fetch_tiss_data()
+    employees_with_orcid = [e for e in employees if not e.pseudoperson and e.orcid]
+
+    with UnitOfWork(db.session) as uow:
+        for employee in employees_with_orcid:
+            matching_name = find_orcid_match(employee, all_names)
+
+            if matching_name:
+                # if we found a match via ORCID, we update it according to the TISS data
+                name = svc.read(identity=system_identity, id_=matching_name["id"])
+                new_name_data = update_name_data(name.data, employee, tuw_ror_aliases)
+
+                # only update the entry if it actually differs somehow
+                if name.data != new_name_data:
+                    svc.update(
+                        identity=system_identity,
+                        id_=name.id,
+                        data=new_name_data,
+                        uow=uow,
+                    )
+                    results["updated"] += 1
+
+            else:
+                # if we couldn't find a match via ORCID, that's a new entry
+                svc.create(
+                    identity=system_identity, data=employee.to_name_entry(), uow=uow
+                )
+                results["created"] += 1
+
+        uow.commit()
+
+    return results
diff --git a/invenio_config_tuw/tiss/__init__.py b/invenio_config_tuw/tiss/__init__.py
new file mode 100644
index 0000000..2bcd3a5
--- /dev/null
+++ b/invenio_config_tuw/tiss/__init__.py
@@ -0,0 +1,17 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2023 TU Wien.
+#
+# Invenio-Config-TUW is free software; you can redistribute it and/or modify
+# it under the terms of the MIT License; see LICENSE file for more details.
+
+"""Utilities for integrating InvenioRDM with TISS."""
+
+from .models import Employee, OrgUnit
+from .utils import fetch_tiss_data
+
+__all__ = (
+    "Employee",
+    "OrgUnit",
+    "fetch_tiss_data",
+)
diff --git a/invenio_config_tuw/tiss/models.py b/invenio_config_tuw/tiss/models.py
new file mode 100644
index 0000000..0efad7b
--- /dev/null
+++ b/invenio_config_tuw/tiss/models.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2023 TU Wien.
+#
+# Invenio-Config-TUW is free software; you can redistribute it and/or modify
+# it under the terms of the MIT License; see LICENSE file for more details.
+
+"""Data classes for representing information from TISS."""
+
+
+from dataclasses import dataclass
+from typing import List, Optional
+
+
+@dataclass
+class OrgUnit:
+    """An organizational unit at TU Wien."""
+
+    tiss_id: int
+    code: str
+    name_en: str
+    name_de: str
+    employees: List["Employee"]
+
+    @classmethod
+    def from_dict(cls, data: dict) -> "OrgUnit":
+        """Parse the organizational unit from the given dictionary."""
+        return cls(
+            tiss_id=data["tiss_id"],
+            code=data["code"],
+            name_de=data.get("name_de", ""),
+            name_en=data.get("name_en", ""),
+            employees=[Employee.from_dict(emp) for emp in data.get("employees")],
+        )
+
+    def __hash__(self):
+        """Use the TISS ID for hashing."""
+        return hash(self.tiss_id)
+
+
+@dataclass
+class Employee:
+    """An employee at TU Wien."""
+
+    tiss_id: int
+    orcid: Optional[str]
+    first_name: str
+    last_name: str
+    pseudoperson: bool
+    titles_pre: str
+    titles_post: str
+
+    @property
+    def full_name(self):
+        """Create the full name in the same style as InvenioRDM does."""
+        return f"{self.last_name}, {self.first_name}"
+
+    @classmethod
+    def from_dict(cls, data: dict) -> "Employee":
+        """Parse the employee from the given dictionary."""
+        return cls(
+            tiss_id=data["tiss_id"],
+            orcid=data.get("orcid", None),
+            first_name=data["first_name"],
+            last_name=data["last_name"],
+            pseudoperson=data.get("pseudoperson", False),
+            titles_pre=data.get("preceding_titles", ""),
+            titles_post=data.get("postpositioned_titles", ""),
+        )
+
+    def to_name_entry(self):
+        """Massage the employee into the shape of a name entry."""
+        ids = []
+        if self.orcid:
+            ids.append({"scheme": "orcid", "identifier": self.orcid})
+
+        return {
+            "given_name": self.first_name,
+            "family_name": self.last_name,
+            "identifiers": ids,
+            "affiliations": [{"name": "TU Wien"}],
+        }
+
+    def __hash__(self):
+        """Use the TISS ID for hashing."""
+        return hash(self.tiss_id)
diff --git a/invenio_config_tuw/tiss/utils.py b/invenio_config_tuw/tiss/utils.py
new file mode 100644
index 0000000..a788832
--- /dev/null
+++ b/invenio_config_tuw/tiss/utils.py
@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2023 TU Wien.
+#
+# Invenio-Config-TUW is free software; you can redistribute it and/or modify
+# it under the terms of the MIT License; see LICENSE file for more details.
+
+"""Functions for fetching information from TISS."""
+
+from typing import Optional, Set, Tuple
+
+import requests
+
+from .models import Employee, OrgUnit
+
+
+def _get_org_unit_dict(code: str) -> dict:
+    """Fetch the data about the org unit from TISS."""
+    response = requests.get(
+        f"https://tiss.tuwien.ac.at/api/orgunit/v22/code/{code}?persons=true"
+    )
+    # NOTE: some org units don't seem to have an OID
+    #       (e.g. "E366t1 - Institutsbibliothek"),
+    #       it seems to be safer to go through the 'code'
+    assert response.status_code == 200
+
+    org_unit = response.json()
+    return org_unit
+
+
+def _fetch_tiss_data(
+    org_unit: dict,
+    org_units: Optional[Set[OrgUnit]] = None,
+    employees: Optional[Set[Employee]] = None,
+) -> Tuple[Set[OrgUnit], Set[Employee]]:
+    """Fetch and parse the info about org units and employees from TISS."""
+    org_units = org_units if org_units is not None else set()
+    employees = employees if employees is not None else set()
+
+    unit = OrgUnit.from_dict(org_unit)
+    org_units.add(unit)
+    employees.update(set(unit.employees))
+
+    child_units = org_unit.get("children", org_unit.get("child_orgs_refs", []))
+    for child_unit in child_units:
+        child_unit_dict = _get_org_unit_dict(child_unit["code"])
+        _fetch_tiss_data(child_unit_dict, org_units, employees)
+
+    return org_units, employees
+
+
+def fetch_tiss_data() -> Tuple[Set[OrgUnit], Set[Employee]]:
+    """Fetch and parse the info about all org units and their employees from TISS."""
+    return _fetch_tiss_data(_get_org_unit_dict("E000"))
-- 
GitLab