Skip to content

Commit

Permalink
INSPIRE harvester: add reader component
Browse files Browse the repository at this point in the history
* configure and register the corresponding invenio-job
* create the Reader component
* draft of the Transformer component
* closes #322
  • Loading branch information
anikachurilova committed Feb 5, 2025
1 parent 46a35f0 commit 5fe2b89
Show file tree
Hide file tree
Showing 7 changed files with 438 additions and 158 deletions.
322 changes: 164 additions & 158 deletions Pipfile.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions invenio.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ from cds_rdm.permissions import (
CDSRDMPreservationSyncPermissionPolicy,
)
from cds_rdm.files import storage_factory
from cds_rdm.inspire_harvester.reader import InspireHTTPReader
from cds_rdm.inspire_harvester.transformer import InspireJsonTransformer
from celery.schedules import crontab
from invenio_app_rdm.config import STATS_EVENTS as _APP_RDM_STATS_EVENTS, STATS_AGGREGATIONS as _APP_RDM_STATS_AGGREGATIONS, APP_RDM_ROUTES
from invenio_vocabularies.services.custom_fields import VocabularyCF
Expand Down Expand Up @@ -556,6 +558,15 @@ VOCABULARIES_NAMES_SCHEMES = {
}
"""Names allowed identifier schemes."""

VOCABULARIES_DATASTREAM_READERS = {
"inspire-http-reader": InspireHTTPReader,
}
"""Data Streams readers."""

VOCABULARIES_DATASTREAM_TRANSFORMERS = {
"inspire-json-transformer": InspireJsonTransformer,
}
"""Data Streams transformers."""

# Invenio Stats
# =============
Expand Down
8 changes: 8 additions & 0 deletions site/cds_rdm/inspire_harvester/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
#
# CDS-RDM is free software; you can redistribute it and/or modify it under
# the terms of the MIT License; see LICENSE file for more details.

"""INSPIRE to CDS harvester module."""
97 changes: 97 additions & 0 deletions site/cds_rdm/inspire_harvester/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
#
# CDS-RDM is free software; you can redistribute it and/or modify it under
# the terms of the MIT License; see LICENSE file for more details.

"""Jobs module."""

from datetime import timezone

from invenio_i18n import gettext as _
from invenio_vocabularies.jobs import ProcessDataStreamJob
from marshmallow import Schema, fields
from marshmallow_utils.fields import TZDateTime


class InspireArgsSchema(Schema):
"""Schema of task input arguments."""

since = TZDateTime(
timezone=timezone.utc,
format="%Y-%m-%d",
metadata={
"description": _(
"YYYY-MM-DD format. "
"Leave field empty if it should continue since last successful run."
)
},
)

until = TZDateTime(
timezone=timezone.utc,
format="%Y-%m-%d",
metadata={
"description": _(
"YYYY-MM-DD format. "
"End date of the date range. If this field is provided, then Since field is mandatory. Start/End date is included."
)
},
)

on = TZDateTime(
timezone=timezone.utc,
format="%Y-%m-%d",
metadata={"description": _("YYYY-MM-DD format. Harvest by exact date.")},
)

inspire_id = fields.String()

job_arg_schema = fields.String(
metadata={"type": "hidden"},
dump_default="InspireArgsSchema",
load_default="InspireArgsSchema",
)


class ProcessInspireHarvesterJob(ProcessDataStreamJob):
"""Process INSPIRE to CDS harvester registered task."""

description = "Inspire to CDS records harvester"
title = "Inspire harvester"
id = "process_inspire"
arguments_schema = InspireArgsSchema

@classmethod
def build_task_arguments(
cls, job_obj, since=None, inspire_id=None, until=None, on=None, **kwargs
):
"""Build task arguments."""
return {
"config": {
"readers": [
{
"args": {
"since": since,
"until": until,
"on": on,
"inspire_id": inspire_id,
},
"type": "inspire-http-reader",
},
],
"writers": [
{
"args": {
"writer": {
"type": "TODO",
"args": {},
}
},
"type": "async",
}
],
"transformers": [{"type": "inspire-json-transformer"}],
}
}
135 changes: 135 additions & 0 deletions site/cds_rdm/inspire_harvester/reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
#
# CDS-RDM is free software; you can redistribute it and/or modify it under
# the terms of the MIT License; see LICENSE file for more details.

"""Reader component."""
from datetime import datetime
from urllib.parse import urlencode

import requests
from invenio_vocabularies.datastreams.errors import ReaderError


class InspireHTTPReader:
"""INSPIRE HTTP Reader."""

def __init__(
self,
since=None,
until=None,
on=None,
inspire_id=None,
):
"""Constructor."""
self._since = since
self._until = until
self._on = on
self._inspire_id = inspire_id

def _validate_date_format(self, date):
"""Validate that input dates are in correct format."""
try:
date_converted = datetime.strptime(date, "%Y-%m-%d")
except ValueError:
raise ValueError("Date must be in YYYY-MM-DD format.")
return date_converted

def _validate_input_args(self):
"""Different validations for the input args."""
if self._inspire_id:
if any([self._on, self._until, self._since]):
raise ReaderError(
"When providing INSPIRE record ID for the search, all other args ('On', 'Since' and "
"'Until') are ignored. Please specify only inspire_id value."
)
elif self._on:
if any([self._inspire_id, self._until, self._since]):
raise ReaderError(
"When searching by exact date, all other args ('Inspire_id', 'Since' and "
"'Until') are ignored. Please specify only 'On' value."
)

self._validate_date_format(self._on)
elif self._until:
if self._since is None: # make sure the start date is also provided
raise ReaderError(
"Only end date of the date range is provided. Please specify also 'Since' parameter."
)

if any([self._inspire_id, self._on]):
raise ReaderError(
"When searching by the date range, all irrelevant args ('Inspire_id' and 'On' are "
"ignored. Please specify only 'Until' and 'Since' values."
)

since_date = self._validate_date_format(self._since)
until_date = self._validate_date_format(self._until)

if since_date > until_date:
raise ValueError(
"The 'Since' date must be earlier than or equal to the 'Until' date."
)
else:
self._validate_date_format(self._since)

def _iter(self, url, *args, **kwargs):
"""Yields HTTP response."""
headers = {"Accept": "application/json"}

while url: # Continue until there is no "next" link
response = requests.get(url, headers=headers)
data = response.json()

if response.status_code == 200:
if (
data["hits"]["total"] == 0
): # TODO make it a warning or info when we have proper logging
raise ReaderError(
f"No results found when querying INSPIRE. See URL: {url}."
)

yield response.content
else:
raise ReaderError(
f"Error occurred while getting JSON data from INSPIRE. Error message: {response.text}. See URL: {url}."
)

# Get the next page URL if available
url = data.get("links", {}).get("next")

def read(self, item=None, *args, **kwargs):
"""Builds a query depending on the input data."""
self._validate_input_args()

document_type = "thesis"
oai_set = "ForCDS"

if self._inspire_id:
# get by INSPIRE id
query_params = {
"q": f"_oai.sets:{oai_set} AND document_type:{document_type} AND id:{self._inspire_id}"
}
elif self._on:
# get by the exact date
query_params = {
"q": f"_oai.sets:{oai_set} AND document_type:{document_type} AND du:{self._on}"
}
elif self._until:
# get by the date range
query_params = {
"q": f"_oai.sets:{oai_set} AND document_type:{document_type} AND du >= {self._since} AND du <= {self._until}"
}
else:
# get since specified date until now
query_params = {
"q": f"_oai.sets:{oai_set} AND document_type:{document_type} AND du >= {self._since}"
}

base_url = "https://inspirehep.net/api/literature"
encoded_query = urlencode(query_params)
url = f"{base_url}?{encoded_query}"

yield from self._iter(url=url, *args, **kwargs)
22 changes: 22 additions & 0 deletions site/cds_rdm/inspire_harvester/transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
#
# CDS-RDM is free software; you can redistribute it and/or modify it under
# the terms of the MIT License; see LICENSE file for more details.

"""Transformer module."""

from invenio_vocabularies.datastreams.transformers import BaseTransformer


class InspireJsonTransformer(BaseTransformer):
"""INSPIRE JSON transformer."""

def __init__(self, root_element=None, *args, **kwargs):
"""Initializes the transformer."""
self.root_element = root_element
super().__init__(*args, **kwargs)

def apply(self, stream_entry, **kwargs):
"""TODO."""
1 change: 1 addition & 0 deletions site/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ invenio_jobs.jobs =
sync_cern_groups = cds_rdm.jobs:SyncGroups
sync_local_accounts_to_names = cds_rdm.jobs:SyncLocalAccounts
merge_duplicate_names = cds_rdm.jobs:MergeDuplicateNames
process_inspire = cds_rdm.inspire_harvester.jobs:ProcessInspireHarvesterJob
invenio_pidstore.minters =
legacy = cds_rdm.minters:legacy_recid_minter
idutils.custom_schemes =
Expand Down

0 comments on commit 5fe2b89

Please sign in to comment.