diff --git a/processor/clients/__init__.py b/processor/clients/__init__.py index f56b939..3ec433b 100644 --- a/processor/clients/__init__.py +++ b/processor/clients/__init__.py @@ -3,6 +3,7 @@ from .base_client import SessionManager as SessionManager from .import_client import ImportClient as ImportClient from .import_client import ImportFile as ImportFile +from .packages_client import PackagesClient as PackagesClient from .timeseries_client import TimeSeriesClient as TimeSeriesClient from .workflow_client import WorkflowClient as WorkflowClient from .workflow_client import WorkflowInstance as WorkflowInstance diff --git a/processor/clients/packages_client.py b/processor/clients/packages_client.py new file mode 100644 index 0000000..114e4f8 --- /dev/null +++ b/processor/clients/packages_client.py @@ -0,0 +1,106 @@ +import json +import logging + +import requests + +from .base_client import BaseClient + +log = logging.getLogger() + + +class PackagesClient(BaseClient): + def __init__(self, api_host, session_manager): + super().__init__(session_manager) + + self.api_host = api_host + + @BaseClient.retry_with_refresh + def get_parent_package_id(self, package_id: str) -> str: + """ + Get the parent package ID for a given package. + + Args: + package_id: The package ID to query + + Returns: + str: The parent node ID + + Raises: + requests.HTTPError: If the API request fails + """ + url = f"{self.api_host}/packages/{package_id}?includeAncestors=true&startAtEpoch=false&limit=100&offset=0" + headers = { + "accept": "application/json", + "Authorization": f"Bearer {self.session_manager.session_token}", + } + + try: + log.info(f"Fetching parent package ID for package: {package_id}") + response = requests.get(url, headers=headers) + response.raise_for_status() + package_info = response.json() + parent_node_id = package_info["parent"]["content"]["nodeId"] + return parent_node_id + except requests.HTTPError as e: + log.error(f"failed to get parent package ID for {package_id}: {e}") + raise e + except json.JSONDecodeError as e: + log.error(f"failed to decode package response: {e}") + raise e + except Exception as e: + log.error(f"failed to get parent package ID: {e}") + raise e + + @BaseClient.retry_with_refresh + def update_properties(self, package_id: str, properties: list[dict]) -> None: + """ + Updates a package's properties on the Pennsieve API. + + Args: + package_id: The package (node) ID + properties: List of property dicts with keys: key, value, dataType, category, fixed, hidden + """ + url = f"{self.api_host}/packages/{package_id}?updateStorage=true" + + payload = {"properties": properties} + + headers = { + "accept": "*/*", + "content-type": "application/json", + "Authorization": f"Bearer {self.session_manager.session_token}", + } + + try: + response = requests.put(url, json=payload, headers=headers) + response.raise_for_status() + return None + except Exception as e: + log.error(f"failed to update package {package_id} properties: {e}") + raise e + + def set_timeseries_properties(self, package_id: str) -> None: + """ + Sets the time series viewer properties on a package. + + Args: + package_id: The package (node) ID + """ + properties = [ + { + "key": "subtype", + "value": "pennsieve_timeseries", + "dataType": "string", + "category": "Viewer", + "fixed": False, + "hidden": True, + }, + { + "key": "icon", + "value": "timeseries", + "dataType": "string", + "category": "Pennsieve", + "fixed": False, + "hidden": True, + }, + ] + return self.update_properties(package_id, properties) diff --git a/processor/config.py b/processor/config.py index cf33fc0..769498d 100644 --- a/processor/config.py +++ b/processor/config.py @@ -9,7 +9,6 @@ def __init__(self): self.INPUT_DIR = os.getenv("INPUT_DIR") self.OUTPUT_DIR = os.getenv("OUTPUT_DIR") - self.CHUNK_SIZE_MB = int(os.getenv("CHUNK_SIZE_MB", "1")) # continue to use INTEGRATION_ID environment variable until runner diff --git a/processor/importer.py b/processor/importer.py index 739bffd..5d72b32 100644 --- a/processor/importer.py +++ b/processor/importer.py @@ -5,10 +5,19 @@ import uuid from concurrent.futures import ThreadPoolExecutor from multiprocessing import Lock, Value +from typing import Optional import backoff import requests -from clients import AuthenticationClient, ImportClient, ImportFile, SessionManager, TimeSeriesClient, WorkflowClient +from clients import ( + AuthenticationClient, + ImportClient, + ImportFile, + PackagesClient, + SessionManager, + TimeSeriesClient, + WorkflowClient, +) from constants import TIME_SERIES_BINARY_FILE_EXTENSION, TIME_SERIES_METADATA_FILE_EXTENSION from timeseries_channel import TimeSeriesChannel @@ -49,10 +58,15 @@ def import_timeseries(api_host, api2_host, api_key, api_secret, workflow_instanc workflow_client = WorkflowClient(api2_host, session_manager) workflow_instance = workflow_client.get_workflow_instance(workflow_instance_id) - # constraint until we implement (upstream) performing imports over directories - # and specifying how to group time series files together into an imported package - assert len(workflow_instance.package_ids) == 1, "NWB post processor only supports a single package for import" - package_id = workflow_instance.package_ids[0] + # fetch the target package for channel data and time series properties + packages_client = PackagesClient(api_host, session_manager) + package_id = determine_target_package(packages_client, workflow_instance.package_ids) + if not package_id: + log.error("dataset_id={workflow_instance.dataset_id} could not determine target time series package") + return None + + packages_client.set_timeseries_properties(package_id) + log.info(f"updated package {package_id} with time series properties") log.info(f"dataset_id={workflow_instance.dataset_id} package_id={package_id} starting import of time series files") @@ -140,3 +154,46 @@ def upload_timeseries_file(timeseries_file): log.info(f"import_id={import_id} uploaded {upload_counter.value} time series files") assert sum(successful_uploads) == len(import_files), "Failed to upload all time series files" + + +def determine_target_package(packages_client: PackagesClient, package_ids: list[str]) -> Optional[str]: + """ + Determine which package should receive the time series data and properties. + + If there's only one package ID, use that package directly. + If there are multiple package IDs, find the first one with 'N:package:' prefix + and get its parent package ID. + + Args: + packages_client: PackagesClient instance for API calls + package_ids: List of package IDs from the workflow instance + + Returns: + The package ID to update with properties, or None if unable to determine + """ + if not package_ids: + log.warning("No package IDs provided") + return None + + if len(package_ids) == 1: + log.info("Single package ID found, using it directly: %s", package_ids[0]) + return package_ids[0] + + first_package = None + for package_id in package_ids: + if package_id.startswith("N:package:"): + first_package = package_id + break + + if first_package is None: + log.warning("No package ID with 'N:package:' prefix found in: %s", package_ids) + return None + + log.info("Multiple package IDs found, getting parent of first package: %s", first_package) + try: + parent_id = packages_client.get_parent_package_id(first_package) + log.info("Parent package ID: %s", parent_id) + return parent_id + except Exception as e: + log.error("Failed to get parent package ID: %s", e) + return None diff --git a/processor/reader.py b/processor/reader.py index fd28404..061b424 100644 --- a/processor/reader.py +++ b/processor/reader.py @@ -43,14 +43,19 @@ def __init__(self, electrical_series, session_start_time): assert ( len(self.electrical_series.electrodes.table) == self.num_channels ), "Electrode channels do not align with data shape" + log.info(f"NWB file has {self.num_samples} samples") self._sampling_rate = None self._compute_sampling_rate() + log.info(f"NWB file has sampling rate: {self.sampling_rate} Hz") if self.has_explicit_timestamps: + log.info("NWB file has explicit timestamps") assert self.num_samples == len( self.electrical_series.timestamps ), "Differing number of sample and timestamp value" + else: + log.info("NWB file has implicit timestamps") self._channels = None @@ -68,7 +73,7 @@ def _compute_sampling_rate(self): TimeSeries objects but its worth handling this case by validating the sampling_rate against the timestamps if this case does somehow appear. """ - if self.electrical_series.rate is None and self.electrical_series.timestamps is None: + if self.electrical_series.rate is None and not self.has_explicit_timestamps: raise Exception("electrical series has no defined sampling rate or timestamp values") # if both the timestamps and rate properties are set on the electrical @@ -78,7 +83,7 @@ def _compute_sampling_rate(self): sampling_rate = self.electrical_series.rate sample_size = min(10000, self.num_samples) - sample_timestamps = self.electrical_series.timestamps[:sample_size] + sample_timestamps = self.get_timestamps(0, sample_size) inferred_sampling_rate = infer_sampling_rate(sample_timestamps) error = abs(inferred_sampling_rate - sampling_rate) * (1.0 / sampling_rate) @@ -97,7 +102,7 @@ def _compute_sampling_rate(self): # if only the timestamps are given, calculate the sampling rate using a sample of timestamps elif self.has_explicit_timestamps: sample_size = min(10000, self.num_samples) - sample_timestamps = self.electrical_series.timestamps[:sample_size] + sample_timestamps = self.get_timestamps(0, sample_size) self._sampling_rate = round(infer_sampling_rate(sample_timestamps)) def get_timestamp(self, index): @@ -199,7 +204,7 @@ def contiguous_chunks(self): for batch_start in range(0, self.num_samples, batch_size): batch_end = min(batch_start + batch_size, self.num_samples) - batch_timestamps = self.electrical_series.timestamps[batch_start:batch_end] + batch_timestamps = self.get_timestamps(batch_start, batch_end) # check gap between batches if prev_timestamp is not None: diff --git a/processor/writer.py b/processor/writer.py index 56cb688..589d3ae 100644 --- a/processor/writer.py +++ b/processor/writer.py @@ -89,8 +89,8 @@ def write_chunk(chunk, start_time, end_time, channel_index, output_dir): file_name = "channel-{}_{}_{}{}".format( "{index:05d}".format(index=channel_index), - round(start_time * 1e6), - round(end_time * 1e6), + int(start_time * 1e6), + int(end_time * 1e6), TIME_SERIES_BINARY_FILE_EXTENSION, ) file_path = os.path.join(output_dir, file_name) diff --git a/tests/test_importer.py b/tests/test_importer.py new file mode 100644 index 0000000..2990be1 --- /dev/null +++ b/tests/test_importer.py @@ -0,0 +1,171 @@ +from unittest.mock import Mock + +import responses +from clients.packages_client import PackagesClient +from importer import determine_target_package + + +class TestDetermineTargetPackage: + """Tests for determine_target_package function.""" + + def test_returns_none_for_empty_list(self, mock_session_manager): + """Test returns None when package_ids is empty.""" + packages_client = PackagesClient("https://api.test.com", mock_session_manager) + + result = determine_target_package(packages_client, []) + + assert result is None + + def test_returns_single_package_directly(self, mock_session_manager): + """Test returns the single package ID directly when only one exists.""" + packages_client = PackagesClient("https://api.test.com", mock_session_manager) + package_ids = ["N:package:single-123"] + + result = determine_target_package(packages_client, package_ids) + + assert result == "N:package:single-123" + + def test_returns_single_package_regardless_of_type(self, mock_session_manager): + """Test returns single package even if it's not N:package: prefixed.""" + packages_client = PackagesClient("https://api.test.com", mock_session_manager) + package_ids = ["N:collection:single-collection"] + + result = determine_target_package(packages_client, package_ids) + + assert result == "N:collection:single-collection" + + @responses.activate + def test_multiple_packages_returns_parent_of_first(self, mock_session_manager): + """Test with multiple packages returns parent of first N:package: package.""" + responses.add( + responses.GET, + "https://api.test.com/packages/N:package:first-123?includeAncestors=true&startAtEpoch=false&limit=100&offset=0", + json={ + "parent": {"content": {"nodeId": "N:collection:parent-folder"}}, + "content": {"nodeId": "N:package:first-123"}, + }, + status=200, + ) + + packages_client = PackagesClient("https://api.test.com", mock_session_manager) + package_ids = [ + "N:package:first-123", + "N:package:second-456", + "N:package:third-789", + ] + + result = determine_target_package(packages_client, package_ids) + + assert result == "N:collection:parent-folder" + + @responses.activate + def test_multiple_packages_finds_package_among_mixed_types(self, mock_session_manager): + """Test finds N:package: among mixed ID types.""" + responses.add( + responses.GET, + "https://api.test.com/packages/N:package:the-package?includeAncestors=true&startAtEpoch=false&limit=100&offset=0", + json={ + "parent": {"content": {"nodeId": "N:collection:parent-folder"}}, + "content": {"nodeId": "N:package:the-package"}, + }, + status=200, + ) + + packages_client = PackagesClient("https://api.test.com", mock_session_manager) + package_ids = [ + "N:collection:not-a-package", + "N:package:the-package", + "N:dataset:also-not-package", + ] + + result = determine_target_package(packages_client, package_ids) + + assert result == "N:collection:parent-folder" + + def test_multiple_packages_no_package_prefix_returns_none(self, mock_session_manager): + """Test returns None when multiple IDs but none have N:package: prefix.""" + packages_client = PackagesClient("https://api.test.com", mock_session_manager) + package_ids = [ + "N:collection:abc", + "N:dataset:def", + "some-other-id", + ] + + result = determine_target_package(packages_client, package_ids) + + assert result is None + + @responses.activate + def test_multiple_packages_api_error_returns_none(self, mock_session_manager): + """Test returns None when API call fails for parent package.""" + responses.add( + responses.GET, + "https://api.test.com/packages/N:package:first-123?includeAncestors=true&startAtEpoch=false&limit=100&offset=0", + json={"error": "Not found"}, + status=404, + ) + + packages_client = PackagesClient("https://api.test.com", mock_session_manager) + package_ids = [ + "N:package:first-123", + "N:package:second-456", + ] + + result = determine_target_package(packages_client, package_ids) + + assert result is None + + @responses.activate + def test_multiple_packages_uses_first_package_for_parent_lookup(self, mock_session_manager): + """Test that only the first N:package: is used for parent lookup.""" + # Only set up response for first package + responses.add( + responses.GET, + "https://api.test.com/packages/N:package:first?includeAncestors=true&startAtEpoch=false&limit=100&offset=0", + json={ + "parent": {"content": {"nodeId": "N:collection:parent"}}, + "content": {"nodeId": "N:package:first"}, + }, + status=200, + ) + + packages_client = PackagesClient("https://api.test.com", mock_session_manager) + package_ids = [ + "N:package:first", + "N:package:second", + "N:package:third", + ] + + result = determine_target_package(packages_client, package_ids) + + # Verify only one API call was made + assert len(responses.calls) == 1 + assert "N:package:first" in responses.calls[0].request.url + assert result == "N:collection:parent" + + +class TestDetermineTargetPackageIntegration: + """Integration-style tests for determine_target_package with mocked client.""" + + def test_with_mock_packages_client(self): + """Test using a fully mocked PackagesClient.""" + mock_client = Mock(spec=PackagesClient) + mock_client.get_parent_package_id.return_value = "N:collection:mocked-parent" + + package_ids = ["N:package:pkg1", "N:package:pkg2"] + + result = determine_target_package(mock_client, package_ids) + + assert result == "N:collection:mocked-parent" + mock_client.get_parent_package_id.assert_called_once_with("N:package:pkg1") + + def test_mock_client_raises_exception(self): + """Test handling when mocked client raises exception.""" + mock_client = Mock(spec=PackagesClient) + mock_client.get_parent_package_id.side_effect = Exception("API Error") + + package_ids = ["N:package:pkg1", "N:package:pkg2"] + + result = determine_target_package(mock_client, package_ids) + + assert result is None diff --git a/tests/test_packages_client.py b/tests/test_packages_client.py new file mode 100644 index 0000000..94b94d7 --- /dev/null +++ b/tests/test_packages_client.py @@ -0,0 +1,232 @@ +import pytest +import responses +from clients.packages_client import PackagesClient + + +class TestPackagesClientInit: + """Tests for PackagesClient initialization.""" + + def test_initialization(self, mock_session_manager): + """Test basic initialization.""" + client = PackagesClient("https://api.test.com", mock_session_manager) + + assert client.api_host == "https://api.test.com" + assert client.session_manager == mock_session_manager + + +class TestPackagesClientGetParentPackageId: + """Tests for PackagesClient.get_parent_package_id method.""" + + @responses.activate + def test_get_parent_package_id_success(self, mock_session_manager): + """Test successful parent package ID retrieval.""" + responses.add( + responses.GET, + "https://api.test.com/packages/N:package:child-123?includeAncestors=true&startAtEpoch=false&limit=100&offset=0", + json={ + "parent": { + "content": { + "nodeId": "N:collection:parent-456", + "name": "Parent Folder", + } + }, + "content": { + "nodeId": "N:package:child-123", + "name": "Child Package", + }, + }, + status=200, + ) + + client = PackagesClient("https://api.test.com", mock_session_manager) + parent_id = client.get_parent_package_id("N:package:child-123") + + assert parent_id == "N:collection:parent-456" + + @responses.activate + def test_get_parent_package_id_includes_auth_header(self, mock_session_manager): + """Test that authorization header is included.""" + responses.add( + responses.GET, + "https://api.test.com/packages/N:package:test-123?includeAncestors=true&startAtEpoch=false&limit=100&offset=0", + json={ + "parent": {"content": {"nodeId": "N:collection:parent-456"}}, + "content": {"nodeId": "N:package:test-123"}, + }, + status=200, + ) + + client = PackagesClient("https://api.test.com", mock_session_manager) + client.get_parent_package_id("N:package:test-123") + + assert responses.calls[0].request.headers["Authorization"] == "Bearer mock-token-12345" + + @responses.activate + def test_get_parent_package_id_raises_on_http_error(self, mock_session_manager): + """Test that HTTP errors are raised.""" + responses.add( + responses.GET, + "https://api.test.com/packages/N:package:not-found?includeAncestors=true&startAtEpoch=false&limit=100&offset=0", + json={"error": "Not found"}, + status=404, + ) + + client = PackagesClient("https://api.test.com", mock_session_manager) + + with pytest.raises(Exception): + client.get_parent_package_id("N:package:not-found") + + @responses.activate + def test_get_parent_package_id_retries_on_401(self, mock_session_manager): + """Test that get_parent_package_id retries after 401.""" + # First call returns 401 + responses.add( + responses.GET, + "https://api.test.com/packages/N:package:test-123?includeAncestors=true&startAtEpoch=false&limit=100&offset=0", + json={"error": "Unauthorized"}, + status=401, + ) + # Second call succeeds + responses.add( + responses.GET, + "https://api.test.com/packages/N:package:test-123?includeAncestors=true&startAtEpoch=false&limit=100&offset=0", + json={ + "parent": {"content": {"nodeId": "N:collection:parent-456"}}, + "content": {"nodeId": "N:package:test-123"}, + }, + status=200, + ) + + client = PackagesClient("https://api.test.com", mock_session_manager) + parent_id = client.get_parent_package_id("N:package:test-123") + + assert parent_id == "N:collection:parent-456" + mock_session_manager.refresh_session.assert_called_once() + + +class TestPackagesClientUpdateProperties: + """Tests for PackagesClient.update_properties method.""" + + @responses.activate + def test_update_properties_success(self, mock_session_manager): + """Test successful property update.""" + responses.add( + responses.PUT, + "https://api.test.com/packages/N:package:test-123?updateStorage=true", + json={"success": True}, + status=200, + ) + + client = PackagesClient("https://api.test.com", mock_session_manager) + properties = [{"key": "test_key", "value": "test_value", "dataType": "string"}] + + result = client.update_properties("N:package:test-123", properties) + + assert result is None + + @responses.activate + def test_update_properties_sends_correct_payload(self, mock_session_manager): + """Test that update_properties sends correct request body.""" + responses.add( + responses.PUT, + "https://api.test.com/packages/N:package:test-123?updateStorage=true", + json={"success": True}, + status=200, + ) + + client = PackagesClient("https://api.test.com", mock_session_manager) + properties = [ + {"key": "key1", "value": "value1", "dataType": "string"}, + {"key": "key2", "value": "value2", "dataType": "integer"}, + ] + + client.update_properties("N:package:test-123", properties) + + import json + + body = json.loads(responses.calls[0].request.body) + assert body["properties"] == properties + + @responses.activate + def test_update_properties_includes_auth_header(self, mock_session_manager): + """Test that authorization header is included.""" + responses.add( + responses.PUT, + "https://api.test.com/packages/N:package:test-123?updateStorage=true", + json={"success": True}, + status=200, + ) + + client = PackagesClient("https://api.test.com", mock_session_manager) + client.update_properties("N:package:test-123", []) + + assert responses.calls[0].request.headers["Authorization"] == "Bearer mock-token-12345" + + @responses.activate + def test_update_properties_raises_on_http_error(self, mock_session_manager): + """Test that HTTP errors are raised.""" + responses.add( + responses.PUT, + "https://api.test.com/packages/N:package:test-123?updateStorage=true", + json={"error": "Bad request"}, + status=400, + ) + + client = PackagesClient("https://api.test.com", mock_session_manager) + + with pytest.raises(Exception): + client.update_properties("N:package:test-123", []) + + +class TestPackagesClientSetTimeseriesProperties: + """Tests for PackagesClient.set_timeseries_properties method.""" + + @responses.activate + def test_set_timeseries_properties_success(self, mock_session_manager): + """Test successful timeseries properties update.""" + responses.add( + responses.PUT, + "https://api.test.com/packages/N:package:test-123?updateStorage=true", + json={"success": True}, + status=200, + ) + + client = PackagesClient("https://api.test.com", mock_session_manager) + result = client.set_timeseries_properties("N:package:test-123") + + assert result is None + + @responses.activate + def test_set_timeseries_properties_sends_correct_payload(self, mock_session_manager): + """Test that set_timeseries_properties sends the correct properties.""" + responses.add( + responses.PUT, + "https://api.test.com/packages/N:package:test-123?updateStorage=true", + json={"success": True}, + status=200, + ) + + client = PackagesClient("https://api.test.com", mock_session_manager) + client.set_timeseries_properties("N:package:test-123") + + import json + + body = json.loads(responses.calls[0].request.body) + properties = body["properties"] + + # Should have exactly 2 properties + assert len(properties) == 2 + + # Check subtype property + subtype_prop = next(p for p in properties if p["key"] == "subtype") + assert subtype_prop["value"] == "pennsieve_timeseries" + assert subtype_prop["dataType"] == "string" + assert subtype_prop["category"] == "Viewer" + assert subtype_prop["hidden"] is True + + # Check icon property + icon_prop = next(p for p in properties if p["key"] == "icon") + assert icon_prop["value"] == "timeseries" + assert icon_prop["dataType"] == "string" + assert icon_prop["category"] == "Pennsieve" + assert icon_prop["hidden"] is True diff --git a/tests/test_writer.py b/tests/test_writer.py index 67c00f2..d9c10e1 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -34,9 +34,9 @@ def test_write_chunk_creates_file(self, temp_output_dir): TimeSeriesChunkWriter.write_chunk(chunk, start_time, end_time, 0, temp_output_dir) - # Check file was created (use round() to match writer behavior) + # Check file was created (use int() to match writer behavior) expected_filename = ( - f"channel-00000_{round(start_time * 1e6)}_{round(end_time * 1e6)}{TIME_SERIES_BINARY_FILE_EXTENSION}" + f"channel-00000_{int(start_time * 1e6)}_{int(end_time * 1e6)}{TIME_SERIES_BINARY_FILE_EXTENSION}" ) file_path = os.path.join(temp_output_dir, expected_filename) assert os.path.exists(file_path)