Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventHub component delivers out of order messages (pubsub, binding) #3568

Open
oising opened this issue Oct 16, 2024 · 11 comments
Open

EventHub component delivers out of order messages (pubsub, binding) #3568

oising opened this issue Oct 16, 2024 · 11 comments
Labels
good first issue Good for newcomers kind/bug Something isn't working P0
Milestone

Comments

@oising
Copy link

oising commented Oct 16, 2024

Expected Behavior

When using Event Hubs as a pubsub or binding, messages should be delivered in the order they were posted (assume PartitionKey is set when publishing/posting to ensure ordering across partitions.)

Actual Behavior

In the pubsub case, the sidecar delivers new events before the subscriber has completed handling the last one. This causes major problems when trying to ensure order sensitive work is executed correctly (e.g. starting a workflow to process subsequent events.)

Steps to Reproduce the Problem

We're publishing to our topic like this (dotnet sdk):

await dapr.PublishEventAsync(
        WellKnownPubSubNames.Percept,
        WellKnownPubSubTopics.RecordingSessionFileEventV1, 
        data,
        new Dictionary<string, string>
        {
            { "partitionKey", data.DeviceId }
        }
    );

and receiving like this:

pubsub.MapPost(
    WellKnownPubSubTopics.RecordingSessionFileEventV1,
    [Topic(WellKnownPubSubNames.Percept, WellKnownPubSubTopics.RecordingSessionFileEventV1)]
    async (
        [FromServices] ILogger<Program> logger,
        [FromServices] IRecordingSessionRepository recordingSessionRepository,
        HttpContext HttpContext,
        [FromBody] RecordingSessionFileEventV1 data
    ) => {
        try {
            var messageId = HttpContext.Request.Headers["Message-Id"];

            logger.LogInformation("Start message id: {messageId}", messageId!);

            await Task.Delay(500);

            logger.LogInformation("Stop message id: {messageId}", messageId!);

            return Results.Ok();
        }
        catch (Exception ex) {
            logger.LogError(ex);
            return Results.Problem(ex.Message);
        }
    }
);

The problem is clear when watching the logs: instead of seeing a constant start/stop/start/stop alternating sequence of log events, we're seeing start/stop/start/start/stop/stop interleaving. The sidecar should not be sending another event until the current one has completed processing, i.e. it receives a http 200 (in this case.)

The same issue likely occurs for the binding since the common code is the problem (according to @yaron2):

It appears our implementation is using goroutines for each message, which would cause out-of-order delivery: https://github.com/dapr/components-contrib/blob/main/common/component/azure/eventhubs/eventhubs.go#L293. We do have a concurrency mechanism for components, it just hasn't been applied to Event Hubs. This should be an easy change - can you please open an issue in components-contrib for this? I'll add it to the v1.15 milestone

Release Note

PubSub and Binding components using ordered delivery (with a partitionkey) would interleave event deliveries to a subscriber. Now the sidecar will wait until the handler returns before sending the next event.

RELEASE NOTE:

@oising oising added the kind/bug Something isn't working label Oct 16, 2024
@yaron2 yaron2 added this to the v1.15 milestone Oct 16, 2024
@yaron2 yaron2 added the good first issue Good for newcomers label Oct 16, 2024
@yaron2
Copy link
Member

yaron2 commented Oct 16, 2024

Triaged

@olitomlinson
Copy link

olitomlinson commented Oct 18, 2024

Imo this is a P0 as it fundamentally breaks the underlying FIFO ordering that one would expect from EventHubs when processing each message individually via Dapr PubSub

@yaron2 yaron2 added the P0 label Oct 18, 2024
@yaron2
Copy link
Member

yaron2 commented Oct 18, 2024

Imo this is a P0 as it fundamentally breaks the underlying FIFO ordering that one would expect from EventHubs when processing each message individually via Dapr PubSub

Agree

@oising
Copy link
Author

oising commented Oct 18, 2024

Imo this is a P0 as it fundamentally breaks the underlying FIFO ordering that one would expect from EventHubs when processing each message individually via Dapr PubSub

And also the EH binding! They share the same code AFAICT.

@oising
Copy link
Author

oising commented Oct 18, 2024

Hey @olitomlinson @yaron2 - After a crash course in golang, I don't think the issue is at https://github.com/dapr/components-contrib/blob/main/common/component/azure/eventhubs/eventhubs.go?rgh-link-date=2024-10-16T14%3A11%3A56Z#L293 as this starts a goroutine for the partition which seems perfectly fine. Partitions should be handled in parallel. However, looking at

go aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler)
I can see that messages within the partition are dispatched as goroutines, where I would expect here they should be blocking calls to ensure correct dispatch ordering, no?

@yaron2
Copy link
Member

yaron2 commented Oct 18, 2024

Hey @olitomlinson @yaron2 - After a crash course in golang, I don't think the issue is at https://github.com/dapr/components-contrib/blob/main/common/component/azure/eventhubs/eventhubs.go?rgh-link-date=2024-10-16T14%3A11%3A56Z#L293 as this starts a goroutine for the partition which seems perfectly fine. Partitions should be handled in parallel. However, looking at

go aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler)

I can see that messages within the partition are dispatched as goroutines, where I would expect here they should be blocking calls to ensure correct dispatch ordering, no?

That seems correct, yes

@oising
Copy link
Author

oising commented Oct 18, 2024

So removing the go prefix should be enough? I should probably rename handleAsync to something like handleEvents -- it's interesting to me how the method being called asynchronously and named as such has no bearing on the method's body. Quite simple!

If this really is a one line fix, would you expect unit tests? They would be entirely beyond me at this point in my golang career :D

Also, as a P0 bug - would this warrant making it into 1.14.5 ?

@olitomlinson
Copy link

Hey @olitomlinson @yaron2 - After a crash course in golang, I don't think the issue is at https://github.com/dapr/components-contrib/blob/main/common/component/azure/eventhubs/eventhubs.go?rgh-link-date=2024-10-16T14%3A11%3A56Z#L293 as this starts a goroutine for the partition which seems perfectly fine. Partitions should be handled in parallel. However, looking at

go aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler)

I can see that messages within the partition are dispatched as goroutines, where I would expect here they should be blocking calls to ensure correct dispatch ordering, no?

This is exactly what I said in Discord :)

@oising
Copy link
Author

oising commented Oct 18, 2024

This is exactly what I said in Discord :)

I obviously misread or missed that -- but it's good that we agree! :) I will submit the two-line PR as draft and link it, and we can go from there. Given this is a blocker for our solution, I would really like to see this make a point release and not wait for 1.15...

@olitomlinson
Copy link

olitomlinson commented Oct 18, 2024

So removing the go prefix should be enough? I should probably rename handleAsync to something like handleEvents -- it's interesting to me how the method being called asynchronously and named as such has no bearing on the method's body. Quite simple!

If this really is a one line fix, would you expect unit tests? They would be entirely beyond me at this point in my golang career :D

Also, as a P0 bug - would this warrant making it into 1.14.5 ?

It could be as simple as a one-liner, but it needs thorough testing to makes sure that the checkpointing is done correctly after each message completes.

My one reservation on fixing this quickly is that there may be users out there in the wild with high-throughput use-cases that depend on the throughput that is currently afforded by this incorrect implementation. Until a fix is in place, its hard to quantify what that performance degradation maybe by checkpointing on each message.

The real solution here is to use Bulk Subscriptions for high throughput use-cases, but this is not Stable yet.

Idea : This could be fixed but the fix is put behind an opt-in feature-flag on the metadata so it doesn't impact people with existing expectations (from the incorrect implementation).

name : enableInOrderMessageDelivery
value : "false"

Then, when bulk subscriptions does graduate to Stable, the feature flag could be removed and replaced with an opt-in feature flag that reverts the behavior back to the broken implementation. And users with high-throughput expectations are encouraged to migrate to Bulk Subscriptions (or opt back in to the previous broken implementation, for a window of supported releases)

name : enableLegacyMessageDelivery
value : "true"

@oising
Copy link
Author

oising commented Oct 18, 2024

Hmm, I'm not going to be competent enough in the language to fix this in the window that my project requires. If you could collaborate with me, then I may learn enough to address my other feature requests for event hubs myself. How busy are you, lol

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Good for newcomers kind/bug Something isn't working P0
Projects
None yet
Development

No branches or pull requests

3 participants