Skip to content

feat: add SubscriptionOnStartHandler #2059

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 122 commits into
base: topic/streams-v1
Choose a base branch
from

Conversation

alepane21
Copy link
Contributor

@alepane21 alepane21 commented Jul 17, 2025

Summary by CodeRabbit

  • New Features

    • Introduced a flexible hooks system for stream subscriptions, enabling custom authorization, initial event emission, event filtering, and data mapping across NATS, Kafka, and Redis providers.
    • Added support for executing custom logic at subscription start, including emitting initial events and controlling subscription lifecycle.
    • Unified event and configuration abstractions for consistent handling across different pubsub providers.
    • Added a new module to facilitate implementing subscription start hooks with logging and callbacks.
  • Bug Fixes

    • Improved consistency and structure in event data handling for NATS, Kafka, and Redis integrations.
  • Tests

    • Added extensive tests covering subscription start hooks, event emission, authorization checks, and subscription lifecycle management.
  • Chores

    • Updated dependencies, removed unused indirect packages, and enhanced mock generation for subscription event updaters across pubsub providers.

Depends on engine PR: wundergraph/graphql-go-tools#1243

How to use the new hook?

The hook implemented in this PR is the SubscriptionOnStartHandler as defined here:

type SubscriptionOnStartHandler interface {

Example

Query

On the demo graph, we will create a module that add some logic to the following query

employeeUpdatedMyKafka(employeeID: $employeeID)

Module code that implement the new hook

As an example, this module will emit a message if the subscription is started with employeeID == 1.

const myModuleID = "startSubscriptionModule"

type StartSubscriptionModule struct {}

func (m *StartSubscriptionModule) SubscriptionOnStart(ctx core.SubscriptionOnStartHookContext) (bool, error) {
    if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdatedMyKafka" {
      return false, nil
    }
    employeeId := ctx.RequestContext().Operation().Variables().GetInt64("employeeID")
    if employeeId != 1 {
        return false, nil
    }
    ctx.WriteEvent(&kafka.Event{
        Data: []byte(`{"id": 1, "__typename": "Employee"}`),
    })
    return false, nil
}

func (m *StartSubscriptionModule) Module() core.ModuleInfo {
	return core.ModuleInfo{
		// This is the ID of your module, it must be unique
		ID: myModuleID,
		// The priority of your module, lower numbers are executed first
		Priority: 1,
		New: func() core.Module {
			return &StartSubscriptionModule{}
		},
	}
}

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.
  • Documentation has been updated on https://github.com/wundergraph/cosmo-docs.
  • I have read the Contributors Guide.

alepane21 and others added 30 commits July 10, 2025 11:40
…subscription and not going to other client that subscribed the same subscription
@alepane21 alepane21 requested a review from Noroth July 30, 2025 18:05

type SubscriptionOnStartHookContext interface {
// the request context
RequestContext() RequestContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whatever we expose here needs to be tested. E.g., the ResponseWriter wouldn't work in subscriptions. Ideally, in our first iteration, we are purely use case driven and expose only functionality and data that is necessary to enable these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm I see, so you would prefer an ad-hoc version (I'm thinking of a SubscriptionRequestContext) with only things that make sense?

// Check if the error is a StreamHookError and should close the connection
var streamHookErr *StreamHookError
close := false
if errors.As(err, &streamHookErr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any error should close the subscription. Why do we want to do it selectively?

Copy link
Contributor Author

@alepane21 alepane21 Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was that this would allow the customer to customize the client's behaviour: if an error with closeConnection set to false was returned, the client could have just write something to the UI (like "no initial data found, waiting for the first real data") and then, when a message with the data arrived, the client could just read it.

In the real world, the client will simply close the subscription on the first error it receives, so this behaviour makes no sense :(

I'll remove it to avoid complexity and confusion.

@@ -88,31 +87,44 @@ func (p *ProviderAdapter) topicPoller(ctx context.Context, client *kgo.Client, u
r := iter.Next()

p.logger.Debug("subscription update", zap.String("topic", r.Topic), zap.ByteString("data", r.Value))
updater.Update(r.Value)
headers := make(map[string][]byte)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we already have the requirement for this? If not, I'd reduce the implementation to the absolute minimum while allowing for further extensions in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, one of the requirements is to read/write kafka headers.

func (c *EngineDataSourceFactory) ResolveDataSourceSubscription() (datasource.SubscriptionDataSource, error) {
return datasource.NewPubSubSubscriptionDataSource[*SubscriptionEventConfiguration](
c.RedisAdapter,
func(ctx *resolve.Context, input []byte, xxh *xxhash.Digest) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we move this outside of the provider specific implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the hook is managed inside the PubSubSubscriptionDataSource. Having it in the datasource allows the implemented providers (and the future ones!) to just return the PubSubSubscriptionDataSource to implements the hooks.
An alternative solution that I thought was to embed this struct inside another one that was implementing the hooks, but looked like an overly complicated solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants