Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QUEUED is not an expected state on EMR Serverless Pipes #27217

Closed
maduxi opened this issue Jan 20, 2025 · 0 comments · Fixed by #27221
Closed

QUEUED is not an expected state on EMR Serverless Pipes #27217

maduxi opened this issue Jan 20, 2025 · 0 comments · Fixed by #27221
Assignees
Labels
integration: aws Related to dagster-aws type: bug Something isn't working

Comments

@maduxi
Copy link

maduxi commented Jan 20, 2025

What's the issue?

When executing EMR Serverless jobs with the PipesEMRServerlessClient, the execution fails if the step is on QUEUED state. The issue is that this state is not on the possible states here

This is the list of possible states for an EMR Serverless job https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/job-states.html

Stacktrace:

dagster._core.errors.DagsterInvariantViolationError: Unexpected state for AWS EMR Serverless job XXXX: QUEUED
  File "/Users/madhava/Documents/belvo/data-platform-datasets/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_plan.py", line 245, in dagster_event_sequence_for_step
    yield from check.generator(step_events)
  File "/Users/madhava/Documents/belvo/data-platform-datasets/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 501, in core_dagster_event_sequence_for_step
    for user_event in _step_output_error_checked_user_event_sequence(
  File "/Users/madhava/Documents/belvo/data-platform-datasets/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 184, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/Users/madhava/Documents/belvo/data-platform-datasets/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 88, in _process_asset_results_to_events
    for user_event in user_event_sequence:
  File "/Users/madhava/Documents/belvo/data-platform-datasets/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute.py", line 190, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn, compute_context):
  File "/Users/madhava/Documents/belvo/data-platform-datasets/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute.py", line 159, in _yield_compute_results
    for event in iterate_with_context(
  File "/Users/madhava/Documents/belvo/data-platform-datasets/.venv/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 480, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/Users/madhava/Documents/belvo/data-platform-datasets/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute_generator.py", line 127, in _coerce_op_compute_fn_to_iterator
    result = invoke_compute_fn(
             ^^^^^^^^^^^^^^^^^^
  File "/Users/madhava/Documents/belvo/data-platform-datasets/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute_generator.py", line 115, in invoke_compute_fn
    return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/madhava/Documents/belvo/data-platform-datasets/belvo_etl/assets/belvo_dbt.py", line 85, in optimize_tables
    return pipes_emr_serverless_client.run(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/madhava/Documents/belvo/data-platform-datasets/.venv/lib/python3.11/site-packages/dagster_aws/pipes/clients/emr_serverless.py", line 114, in run
    completion_response = self._wait_for_completion(context, start_response)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/madhava/Documents/belvo/data-platform-datasets/.venv/lib/python3.11/site-packages/dagster_aws/pipes/clients/emr_serverless.py", line 226, in _wait_for_completion
    raise DagsterInvariantViolationError(

What did you expect to happen?

We can add this state so it doesn't fail. Specifically to this line

How to reproduce?

Set a low limit for concurrent jobs in the EMR Serverless application and run several jobs to get some in the queue state.

Dagster version

dagster, version 1.9.9

Deployment type

Other Docker-based deployment

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@maduxi maduxi added the type: bug Something isn't working label Jan 20, 2025
@garethbrickman garethbrickman added the integration: aws Related to dagster-aws label Jan 20, 2025
danielgafni added a commit that referenced this issue Jan 21, 2025
## Summary & Motivation

Resolve #27217

## How I Tested These Changes

Checked
https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/job-states.html
again

## Changelog

[dagster-aws] fix PipesEMRServerlessClient failing with QUEUED job state
marijncv pushed a commit to marijncv/dagster that referenced this issue Jan 21, 2025
…o#27221)

## Summary & Motivation

Resolve dagster-io#27217

## How I Tested These Changes

Checked
https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/job-states.html
again

## Changelog

[dagster-aws] fix PipesEMRServerlessClient failing with QUEUED job state
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
integration: aws Related to dagster-aws type: bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants