Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working on message bus subscriptions options #278

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

ejsmith
Copy link
Contributor

@ejsmith ejsmith commented Jul 13, 2022

  • concurrent dictionary per topic that has a channel of messages.

  • publish sends a message to the channel based on the topic for the message

  • by default the message type full name is the topic (1 topic per message type)

  • message publish topics can be overridden to publish multiple message types to a single topic

  • subscription specifies a topic which can have wildcards

  • in memory subscription will check all of the topics and add all of the matching topics

  • to the subscription object

  • for rabbit impl, on subscribe will ensure all exchanges are created for the message types that are being listened for

  • publish will ensure the topic the message is going to is created

  • queue groups can be specified on subscriptions and if multiple subscribers use the same queue group, they will round robin the messages

  • subscriptions have their own channel instances

  • read from topic channel, write to all matching subscriber channels which in turn call the message handlers with their instances

  • when writing to a subscriber channel and there is more than one matching subscriber, then all subscribers get a copy of the message

  • if multiple subscribers have the same queue group name then we round robin them

  • https://deniskyashif.com/2019/12/08/csharp-channels-part-1/

  • https://github.com/deniskyashif/trydotnet-channels

@@ -564,14 +565,16 @@ public abstract class QueueTestBase : TestWithLoggingBase, IDisposable {
Assert.InRange(sw.Elapsed, TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(5000));

_ = Task.Run(async () => {
await SystemClock.SleepAsync(500);
await Task.Delay(500);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SystemClock.SleepAsync(500) should be doing this behind the scenes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I know that, but in this case we never want the sleep to be virtual.

@@ -553,6 +553,7 @@ public abstract class QueueTestBase : TestWithLoggingBase, IDisposable {
return;

try {
Log.MinimumLevel = LogLevel.Trace;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably should revert this and leave this up to the caller.

using System.Threading;
using System.Threading.Tasks;

namespace Foundatio.Messaging {
public interface IMessageSubscriber {
Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken = default) where T : class;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should create extension method overloads for this that just set a token on options. Will users know there is an implicit conversion happening here for cancellation token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I am worried about us having overload conflicts since there are already a bunch of overloads for this method. I pretty much did this just for compatibility.

}

public ValueTask DisposeAsync() {
throw new NotImplementedException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the default interface implementation throw for dispose? I'd think this would do nothing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you cant see, these haven't been implemented yet. This PR is not done.

Comment on lines 120 to 124
_subscribers.TryRemove(subscriber.Id, out _);
if (_subscribers.Count == 0)
RemoveTopicSubscriptionAsync().GetAwaiter().GetResult();

return new ValueTask();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinda would be nice if this was async.

Comment on lines +28 to +29
string MessageType { get; set; }
IDictionary<string, string> Properties { get; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work for kafka? I'm dealing with public Func<ConsumeResult<string, byte[]>, string> ResolveMessageType { get; set; } so I can look at specific headers and not have to deserialize every single header to process the message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it can't be kafka specific. It has access to properties which should include the headers from the kafka message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants