1029 create custom handling for hi goodtimes jobs#1034
1029 create custom handling for hi goodtimes jobs#1034subagonsouth wants to merge 15 commits intoIMAP-Science-Operations-Center:devfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR implements custom handling for Hi Goodtimes jobs to support multi-repoint dependencies. Hi Goodtimes ancillary processing requires data from multiple repoints (past and future), unlike typical jobs that only need data from a single repoint. The implementation allows the repoint parameter to accept either a single integer or a list of integers in dependency query functions, and adds special logic to expand repoint ranges for Hi Goodtimes jobs both when triggering jobs and when retrieving dependencies.
Key changes:
- Extended
get_upstream_dependency_inputsandget_filesfunctions to accept a list of repoint numbers - Added Hi Goodtimes-specific logic to expand single repoints into ranges when querying dependencies
- Implemented multi-repoint job triggering when Hi L1B DE files arrive
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
| sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/dependency.py | Added configuration constants for Hi Goodtimes repoint ranges; modified get_upstream_dependency_inputs, get_files, and get_jobs to support list of repoints; added special handling to expand repoint ranges for Hi Goodtimes jobs |
| sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/batch_starter.py | Refactored DependencyConfig import; added special handling in s3_processing_event to trigger multiple Hi Goodtimes jobs across a range of repoints when a single Hi L1B DE file arrives |
| tests/lambda_endpoints/test_dependency_api.py | Added fixture for Hi L1B DE files across multiple repoints; added tests for querying files with single and multiple repoints; added test for Hi Goodtimes multi-repoint dependency retrieval |
| tests/lambda_endpoints/test_batch_starter.py | Added test to verify Hi Goodtimes jobs are triggered for multiple repoints when a single Hi L1B DE file arrives |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def test_hi_goodtimes_multi_repoint_trigger( | ||
| mock_submit_all_jobs, | ||
| mock_get_dependencies, | ||
| session, | ||
| s3_client, | ||
| monkeypatch, | ||
| ): | ||
| """Test Hi Goodtimes multi-repoint trigger logic in s3_processing_event. | ||
|
|
||
| When a Hi L1B DE file with repoint N arrives, it should trigger goodtimes | ||
| jobs for repoints [N-M, N+M] where M is determined by the configuration. | ||
| """ | ||
| # Monkeypatch configuration values for testing | ||
| monkeypatch.setattr(dependency, "HI_GOODTIMES_NUM_PAST_REPOINTS", 1) | ||
| monkeypatch.setattr(dependency, "HI_GOODTIMES_NUM_FUTURE_REPOINTS", 2) | ||
|
|
||
| mock_get_dependencies.side_effect = [ | ||
| [ | ||
| { | ||
| "data_source": "hi", | ||
| "data_type": "ancillary", | ||
| "descriptor": "45sensor-goodtimes", | ||
| "relationship": "UPSTREAM", | ||
| } | ||
| ], | ||
| [], | ||
| ] | ||
|
|
||
| # L1B DE and spice files are added to the DB table by the | ||
| # hi_l1b_de_repoint_files fixture |
There was a problem hiding this comment.
The test function is missing the hi_l1b_de_repoint_files fixture parameter. The comment on line 2332 states that "L1B DE and spice files are added to the DB table by the hi_l1b_de_repoint_files fixture", but the fixture is not included in the function parameters. This means the Hi L1B DE files will not be present in the database when the test runs, causing the test to not properly validate the multi-repoint trigger logic. Add hi_l1b_de_repoint_files to the function parameters after monkeypatch.
| ): | ||
| repoint_param = list( | ||
| range( | ||
| repoint - HI_GOODTIMES_NUM_PAST_REPOINTS, |
There was a problem hiding this comment.
The range calculation can produce invalid negative or zero repoint numbers. When repoint - HI_GOODTIMES_NUM_PAST_REPOINTS is less than 1 (e.g., repoint=1 and NUM_PAST=1), the range will start at 0, which may be invalid if repoint numbers start at 1. Add validation to ensure the range starts at a valid repoint number, such as using max(1, repoint - HI_GOODTIMES_NUM_PAST_REPOINTS) to prevent querying for invalid repoints.
| repoint - HI_GOODTIMES_NUM_PAST_REPOINTS, | |
| max(1, repoint - HI_GOODTIMES_NUM_PAST_REPOINTS), |
sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/batch_starter.py
Outdated
Show resolved
Hide resolved
| target_repoint = call_args.args[4] | ||
| repoints_submitted.add(target_repoint) | ||
|
|
||
| # Verify we submitted for repoints 1, 2, 3 |
There was a problem hiding this comment.
The comment says "Verify we submitted for repoints 1, 2, 3" but should include repoint 4 to match the assertion on line 2425.
| # Verify we submitted for repoints 1, 2, 3 | |
| # Verify we submitted for repoints 1, 2, 3, 4 |
| lambda_handler(events, context) | ||
|
|
||
| # Process the event | ||
| # lambda_handler(events, {}) |
There was a problem hiding this comment.
Remove the commented-out code. The lambda_handler is already called on line 2397, making this line redundant.
| # lambda_handler(events, {}) |
| # the correct way to calculate the repoint affected by the new file | ||
| # that triggered here. | ||
| for target_repoint in range( | ||
| repoint - dependency.HI_GOODTIMES_NUM_FUTURE_REPOINTS, |
There was a problem hiding this comment.
The range calculation can produce invalid negative or zero repoint numbers. When repoint - HI_GOODTIMES_NUM_FUTURE_REPOINTS is less than 1 (e.g., repoint=1 and NUM_FUTURE=2), the range will include repoint 0 or negative numbers, which are likely invalid. Add validation to ensure target_repoint values are >= 1, or use max(1, repoint - HI_GOODTIMES_NUM_FUTURE_REPOINTS) as the range start to prevent generating jobs for invalid repoints.
| repoint - dependency.HI_GOODTIMES_NUM_FUTURE_REPOINTS, | |
| max(1, repoint - dependency.HI_GOODTIMES_NUM_FUTURE_REPOINTS), |
| if call_args[0][1]["descriptor"] == "45sensor-goodtimes" | ||
| ] | ||
|
|
||
| # Should have 3 calls for repoints 1, 2, 3 |
There was a problem hiding this comment.
The comment says "Should have 3 calls" but the assertion checks for 4 calls. Update the comment to match the assertion.
| # Should have 3 calls for repoints 1, 2, 3 | |
| # Should have 4 calls for repoints 1, 2, 3, 4 |
lacoak21
left a comment
There was a problem hiding this comment.
Looks good! Just a couple of questions related to missing repointing data.
| } | ||
| ], | ||
| ) | ||
| def test_get_jobs_hi_goodtimes_multi_repoint( |
There was a problem hiding this comment.
What would happen if you called it like
science_files = get_files( session, dependency=dep, start_date=datetime(2024, 1, 1), end_date=datetime(2024, 1, 2), repoint=[1, 2], )
And there was only data for repoint 1?
| # Special handling for Hi Goodtimes - needs L1B DE from multiple repoints | ||
| # Pass a list of repoints instead of a single repoint | ||
| repoint_param = repoint | ||
| if ( |
There was a problem hiding this comment.
This is a duplicate if-statement from batch starter, is it possible to combine the checks into one place?
| submit_all_jobs( | ||
| session, | ||
| job, | ||
| trigger_start_time, |
There was a problem hiding this comment.
Does the start and end time need to be changed to match repoint?
There was a problem hiding this comment.
Not here. This is just submitting a potential job for each of the repoints that the trigger file would be used in. It is in the dependency code where the start/end time need to get updated when gathering upstream dependencies for a job.
c52018c to
dc8f6cc
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/dependency.py:1414
- Docstring says this function “replaces the L1B DE files with files from the extended repoint range”, but the implementation currently extends the existing L1B DE input (adds nearest files to the existing lists). Consider updating the docstring wording to reflect the actual behavior (extend/augment rather than replace) to avoid confusion for future maintainers.
Hi Goodtimes jobs require L1B DE data from N repoints total (target plus
N-1 nearest). This function takes an existing ProcessingInputCollection
(from get_upstream_dependency_inputs) and replaces the L1B DE files with
files from the extended repoint range.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/dependency.py
Outdated
Show resolved
Hide resolved
sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/dependency.py
Outdated
Show resolved
Hide resolved
| trigger_is_hi_l1b_de | ||
| and repoint is not None | ||
| and job["data_source"] == "hi" | ||
| and job["data_type"] == "l1c" |
There was a problem hiding this comment.
I think we decided that the ENA goodtimes would be l1b products.
| and job["data_type"] == "l1c" | |
| and job["data_type"] == "l1b" |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/dependency.py
Outdated
Show resolved
Hide resolved
|
|
||
| # Special handling for Hi Goodtimes - extend L1B DE to N nearest repoints | ||
| if ( | ||
| data_type == "l1c" |
There was a problem hiding this comment.
We decided to make Goodtimes L1B
| data_type == "l1c" | |
| data_type == "l1b" |
| f"Submitting Hi Goodtimes job for repoint {target_repoint} " | ||
| f"(triggered by repoint {repoint} file)" | ||
| ) | ||
| submit_all_jobs( |
There was a problem hiding this comment.
I think this answers my question from yesterday.
- Reprocess L1B DE specifically -> s3_processing_event fires and fan-out happens normally.
- Reprocess Goodtimes directly goes straight to submit_all_jobs, bypasses s3_processing_event entirely -> no fan-out.
lacoak21
left a comment
There was a problem hiding this comment.
This looks great Tim. Have you tested in dev out of curiosity?
| filter_dependencies, | ||
|
|
||
| # Check if trigger file is Hi L1B DE | ||
| trigger_is_hi_l1b_de = ( |
There was a problem hiding this comment.
Is l1b de the only dependency? Im just wondering whether you needed to handle a case where you had to reprocess goodtimes because a SPICE kernel got updated.
| return records | ||
|
|
||
|
|
||
| def get_n_nearest_files_by_repoint( |
There was a problem hiding this comment.
@subagonsouth I reviewed part of this but had to stop here. I will resume in the morning.
| repoint, | ||
| skip_if_inprogress=True, | ||
| ) | ||
| if l1b_de_records is None: |
There was a problem hiding this comment.
Uhoh I think you are right that other jobs dont check for upstream jobs in progress. The CRID check only handles it when an upstream , upstream job is still processing. Wow how could I have missed this :(. Ill make sure to keep this in mind when reviewing Tenzin's refactor.
| return [] | ||
|
|
||
| distances = np.abs(other_repoints - repoint) | ||
| sort_indices = np.lexsort((other_repoints, distances)) |
There was a problem hiding this comment.
Whoa np.lexsort is so useful.
| if upstream_dependencies_output is None: | ||
| logger.info( | ||
| f"No dependencies found for {start_date=} - {end_date=}: {dependencies}" | ||
| # Use a single session for all database operations |
Change Summary
Overview
This PR implements custom handling for Hi Goodtimes jobs, which require L1B DE data from multiple repoints (N=7 total, including the target repoint).
Key Changes
New Configuration:
New Functions:
Modified Functions:
Hi Goodtimes Job Logic:
Test Coverage
Closes: #1029