Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi-asset error with can_subset=True #26244

Open
jackson-burke opened this issue Dec 3, 2024 · 0 comments
Open

multi-asset error with can_subset=True #26244

jackson-burke opened this issue Dec 3, 2024 · 0 comments
Labels
area: dagster-pipes Related to Dagster Pipes type: bug Something isn't working

Comments

@jackson-burke
Copy link

jackson-burke commented Dec 3, 2024

What's the issue?

I'm running a mutli-asset materialization with dagster-pipes using the setting can_subset=True.

However, the can_subset=True (combined with is_required=False in the outs) seems to not function correctly.

When I run a one of my five assets in the Dagster UI, I receive the following error message:

dagster._core.errors.DagsterInvariantViolationError: op 'ingest' did not yield or return 
expected outputs 'asset1', 'asset2 'asset3 'asset4'}. Did you forget to `yield from pipes_session.get_results()` 
or `return <PipesClient>.run(...).get_results`?

If I attempt to return a blank Output for the assets that are not selected as well, I receive this error:

dagster._core.errors.DagsterInvariantViolationError: Core compute for op "ingest" 
returned an output "asset1" that is not selected. The selected outputs are {'asset5'}

What did you expect to happen?

I would expect to be able to run any number of our assets at one time and have the job finish successfully.

How to reproduce?

dagster_code.py

config = read_config(__file__)


@multi_asset(
    group_name="name",
    outs={
        f"{c}": AssetOut(dagster_type=Nothing, is_required=False) for c in config.keys()
    },
    can_subset=True,
)
def ingest(
    context: AssetExecutionContext,
    config: dict = config,
):
    client = PipesClient() # helper class to return custom PipesK8sClient or PipesDockerClient

    for asset in context.selected_asset_keys:
        asset_name = asset.path[-1]
        asset_config = config[asset_name]

        kwargs = {...}
        result = client.run(kwargs).get_materialize_result()
        yield Output(
            output_name=asset_name,
            value=None,
            metadata=result.metadata,
        )

external_code.py

def ingest_pipe() -> None:
    context = open_dagster_pipes()
    run_process()

def main():
    ingest_pipe(params)

if __name__ == "__main__":
    main()

Dagster version

1.9.3

Deployment type

None

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
By submitting this issue, you agree to follow Dagster's Code of Conduct.

@jackson-burke jackson-burke added the type: bug Something isn't working label Dec 3, 2024
@garethbrickman garethbrickman added the area: dagster-pipes Related to Dagster Pipes label Dec 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: dagster-pipes Related to Dagster Pipes type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants