Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk-cdk: improve AirbyteConnectorRunner and CliRunner #45374

Merged
merged 2 commits into from
Sep 11, 2024

Conversation

postamar
Copy link
Contributor

@postamar postamar commented Sep 10, 2024

What

The CliRunner dumps all output into a temp file and re-reads it into a BufferingOutputConsumer. This works, but it requires an edge case in the CLI grammar defined in AirbyteConnectorRunner. @edgao found a much better way in #45113 involving RuntimeBeanDefinitions. This PR implements this change and removes the --output CLI argument support. It also proves a nice entry point for destination input messages in the CliRunner.

How

The CliRunner overrides the PrintStream that gets injected in the StdoutOutputConsumer. By default it's a PrintStream which wraps System.out but for integration tests we want the StdoutOutputConsumer to feed into a BufferingOutputConsumer.

Same goes for InputStream for destinations.

Review guide

n/a

User Impact

None, test-only change.

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

@postamar postamar requested review from a team as code owners September 10, 2024 21:24
Copy link

vercel bot commented Sep 10, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Sep 11, 2024 0:45am

@postamar postamar requested a review from edgao September 10, 2024 21:24
@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Sep 10, 2024
Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had a question in CliRunner about how to use this interface, plus a few nits

@postamar
Copy link
Contributor Author

This is ready for another look. I added the missing functionality for async running.

Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking my understanding: there's two ways to use this asyncly:

val dest = CliRunner.destination(pipeInput)
launch { dest.run() }
sendMessages(pipe)
inspect(dest.results.stuff())

or via callback

CliRunner.destination(byteArrayInput)
  .withCallback({ msg -> ... })
  .run()

lgtm assuming I'm on the right track!

@postamar
Copy link
Contributor Author

Yes indeed! The callback is required if you want to interrupt the sync sooner than normal operation.

Something which I didn't explore (because I felt it was out of scope for this PR) but which would make sense is to use channels instead of a PipeInputStream for destination.

@postamar postamar merged commit e4fec50 into master Sep 11, 2024
37 checks passed
@postamar postamar deleted the postamar/bean-definitions branch September 11, 2024 12:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues CDK Connector Development Kit connectors/source/mysql-v2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants