Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,4 @@ cython_debug/

# ignore generated files
tests/test_resources/temp_dirs/*
*/temp_dirs/cache/*
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ repos:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-merge-conflict
- id: detect-aws-credentials
- id: detect-private-key
- repo: https://github.com/psf/black
rev: 25.1.0
hooks:
Expand Down
40 changes: 40 additions & 0 deletions accelerator_core/sample_filters.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Sample query by filter


### All records that came from cedar

```json

{"technical_metadata.original_source_link":"cedar"}

```

### All records that came from cedar that were disseminated to cafe

```json

{
"technical_metadata.original_source_link": "cedar",
"technical_metadata.dissemination_endpoints.endpoint_type": "cafe"
}

```

### All records that came from cedar that were not disseminated to cafe

```json

{
"technical_metadata.original_source_link": "cedar",
"technical_metadata.dissemination_endpoints": {
"$not": {
"$elemMatch": {
"endpoint_type": "cafe"
}
}
}
}



```
24 changes: 24 additions & 0 deletions accelerator_core/service_impls/accel_db_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,27 @@ def __init__(self, accelerator_config: AcceleratorConfig):

def start_session(self):
return self.db.client.start_session()

def build_collection_reference(
self, db, document_type: str, temp_doc: bool = False
):
"""
Find the correct collection based on the document type information in the ingest_source_descriptor
:param db: Mongo db
:param document_type: type of the document, per the type matrix
:param temp_doc: bool is True if this is a temporary document
:return: the mongo collection
"""

type_matrix_info = self.accelerator_config.find_type_matrix_info_for_type(
document_type
)
if not type_matrix_info:
raise Exception(f"unknown type {document_type}")

if temp_doc:
coll_name = type_matrix_info.temp_collection
else:
coll_name = type_matrix_info.collection

return db[coll_name]
16 changes: 5 additions & 11 deletions accelerator_core/service_impls/mongo_accession.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,25 +334,19 @@ def build_collection_reference(
self, db, document_type: str, temp_doc: bool = False
):
"""

N.B. refactored to move method to AccelDbContext.

Find the correct collection based on the document type information in the ingest_source_descriptor
:param db: Mongo db
:param document_type: type of the document, per the type matrix
:param temp_doc: bool is True if this is a temporary document
:return: the mongo collection
"""

type_matrix_info = self.accelerator_config.find_type_matrix_info_for_type(
document_type
return self.accel_db_context.build_collection_reference(
db, document_type, temp_doc
)
if not type_matrix_info:
raise Exception(f"unknown type {document_type}")

if temp_doc:
coll_name = type_matrix_info.temp_collection
else:
coll_name = type_matrix_info.collection

return db[coll_name]

def check_if_insert_or_update(
self, ingest_payoad: IngestPayload, temp_doc: bool = False
Expand Down
50 changes: 46 additions & 4 deletions accelerator_core/service_impls/mongo_dissemination.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

import logging
from io import UnsupportedOperation
from typing import List

from accelerator_core.utils.data_utils import generate_guid

from accelerator_core.schema.models.base_model import (
create_timestamped_log,
Expand All @@ -13,7 +16,6 @@
Dissemination,
DisseminationDescriptor,
DisseminationPayload,
DisseminationFilter,
)
from accelerator_core.utils.accel_database_utils import AccelDatabaseUtils
from accelerator_core.utils.accelerator_config import AcceleratorConfig
Expand Down Expand Up @@ -159,17 +161,57 @@ def disseminate_by_original_source_and_id(

def disseminate_by_filter(
self,
filter: DisseminationFilter,
dissemination_request: DisseminationDescriptor,
) -> [DisseminationPayload]:
) -> List[DisseminationPayload]:
"""
Apply the given filter to create a set of documents to be disseminated to a target
@param filter: DisseminationFilter that will select documents to disseminate. The internal meaning
of the filter is dependent on the particular implementation

Note that this method does not update the database indicating a dissemination, and it returns multiple documents.
In normal usage patterns, the caller would use the result payloads to expand into individual dissemination tasks,
and in those individual task calls, the caller would invoke the disseminate by id method to disseminate each document.

@param dissemination_request: DisseminationRequest that describes the type, version, and other information
@return: array of documents as DisseminationPayload
"""
raise UnsupportedOperation("dissemination by filter not yet supported")

if not dissemination_request:
raise Exception(
"no dissemination_request provided to disseminate_by_filter, cannot continue"
)

if not dissemination_request.dissemination_filter:
raise Exception(
"no dissemination_filter provided to disseminate_by_filter, cannot continue"
)

docs = self.accel_database_utils.find_by_filter(
dissemination_request.ingest_type,
dissemination_request.dissemination_filter,
dissemination_request.temp_collection,
)
if docs is None:
logger.warning(
f"No documents found for filter: {dissemination_request.dissemination_filter}"
)

payloads = []

for doc in docs:
item_descriptor = DisseminationDescriptor.from_dict(
dissemination_request.to_dict()
)
item_descriptor.dissemination_item_id = str(doc["_id"])
dissemination_payload = DisseminationPayload(item_descriptor)
logger.info("item_id: " + item_descriptor.dissemination_item_id)
self.report_individual_dissemination(
dissemination_payload, item_descriptor.dissemination_item_id, doc
)
dissemination_payload.dissemination_successful = True
payloads.append(dissemination_payload)

return payloads

def report_individual_dissemination(
self, dissemination_payload: DisseminationPayload, item_id: str, item: dict
Expand Down
15 changes: 7 additions & 8 deletions accelerator_core/services/dissemination.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

"""

from typing import List

from accelerator_core.utils.accelerator_config import AcceleratorConfig
from accelerator_core.utils.xcom_utils import XcomPropsResolver
from accelerator_core.workflow.accel_data_models import (
DisseminationDescriptor,
DisseminationFilter,
DisseminationPayload,
)
from accelerator_core.workflow.accel_workflow_task import AcceleratorWorkflowTask
Expand Down Expand Up @@ -77,15 +78,13 @@ def disseminate_by_original_source_and_id(

def disseminate_by_filter(
self,
filter: DisseminationFilter,
dissemination_request: DisseminationDescriptor,
) -> [DisseminationPayload]:
) -> List[DisseminationPayload]:
"""
Apply the given filter to create a set of documents to be disseminated to a target
@param filter: DisseminationFilter that will select documents to disseminate. The internal meaning
of the filter is dependent on the particular implementation. This method should return a list of DisseminationPayloads, so each dag run may spawn
multiple dissemination tasks.
@param dissemination_request: DisseminationRequest that describes the type, version, and other information
@return: array of documents as DisseminationPayload

@param dissemination_request: DisseminationRequest that describes the type, version, and other information. The DisseminationRequest must contain
the desired filter information, or an exception will be thrown.
@return: array of documents as DisseminationPayload.
"""
pass
43 changes: 41 additions & 2 deletions accelerator_core/utils/accel_database_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import logging
from typing import Optional
from typing import Optional, List

from bson import ObjectId
from pymongo.synchronous.client_session import ClientSession
Expand Down Expand Up @@ -126,6 +126,45 @@ def find_doc_by_original_source_identifier(
doc = coll.find_one(query, session=session)
return doc

def find_by_filter(
self,
ingest_type: str,
filter: dict,
temp_doc: bool = False,
session: Optional[ClientSession] = None,
) -> List[dict]:
"""
Find a document based on the given original source link and identifier.

This method searches for a document in the database using the provided
original source link and original source identifier. The optional
temp_data parameter determines whether to look in the temporary data
storage.

Parameters:
ingest_type: str
String identifier of the database collection to search. This is the same as the type matrix entry for the document type.
filter: dict with a filter to apply to the query
temp_doc: bool, optional
Flag indicating whether to search in temporary data storage. Defaults to False.

Returns:
List[dict] with an array of query results
"""

db = self.connect_to_db()
coll = self.build_collection_reference(ingest_type, temp_doc=temp_doc)

docs = coll.find(filter, session=session)
return_docs = []

for doc in docs:
doc = convert_doc_to_json(doc)
logger.debug(f"Found doc {doc['_id']} in collection {ingest_type}")
return_docs.append(doc)

return return_docs

def connect_to_db(self):
return self.accel_db_context.db

Expand Down Expand Up @@ -154,7 +193,7 @@ def build_collection_reference(self, document_type: str, temp_doc: bool = False)

@staticmethod
def extract_id_from_doc(doc: dict) -> str:
return str(doc["_id"])
return str(doc["_id"]["$oid"])

def log_document_event(
self,
Expand Down
10 changes: 10 additions & 0 deletions accelerator_core/utils/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import hashlib
import json
import uuid


def sanitize_boolean(val) -> bool:
Expand Down Expand Up @@ -54,3 +55,12 @@ def checksum_data(data) -> str:
"""
stringified_data = json.dumps(data, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(stringified_data.encode("utf-8")).hexdigest()


def generate_guid() -> str:
"""
Generate a GUID for the given data, this is the data portion of a payload
This is useful when generating file names for xcom run folders
"""
myuuid = uuid.uuid4()
return str(myuuid)
Loading
Loading