The purpose of this repository is to explain step by step how to build producer and consumer applications that interact using Confluent Kafka.
-
If you don't have the docker installed, please do install it here.
-
Let's set the confluent platform to run locally, download this confluent platform all-in-one docker compose file.
-
Then run the following command
docker-compose up -d
. This will start the confluent platform, it might take some minutes. -
Go to http://localhost:9021 and create a topic.
-
You are ready! If not, click here for more details.
- To instantiate a producer you need a
ProducerConfig
. In this example, we will only define theBootstrapServers
but you can define theBatchSize
,CompressionType
, and many others.
ProducerConfig config = new()
{
BootstrapServers = this.Options.Server
};
- Building the
IProducer
.
using IProducer<string, TMessage> producer = new ProducerBuilder<string, TMessage>(config).Build();
- Producing a message.
string eventId = Guid.NewGuid().ToString();
Message<string, TMessage> @event = new()
{
Key = eventId,
Value = msg,
};
await producer.ProduceAsync(topic, @event, cancellationToken).ConfigureAwait(false);
- To instantiate a consumer you need a
ConsumerConfig
. In this, example we define theBootstrapServers
, theGroupId
, and theAutoOffsetReset
but you can define many others. TheAutoOffReset
is set toEarliest
which means that it will automatically reset the offset to the earliest offset.
ConsumerConfig config = new()
{
BootstrapServers = this.Options.Server,
GroupId = $"{topic}-{Guid.NewGuid()}",
AutoOffsetReset = AutoOffsetReset.Earliest
};
- Building the
IConsumer
.
using IConsumer<string, TMessage> consumer = new ConsumerBuilder<string, TMessage>(config).Build();
- Subscribing a topic.
consumer.Subscribe(topic);
- Consuming a message.
var message = consumer.Consume(cancellationToken);
This is a wrapper around the confluent Kafka implementation, the goal was to simplify!
The IKafkaService
has two methods:
- ProduceAsync, all the logic above about producing messages to a topic is wrapped within this method.
- ConsumeAsync, all the logic above about consuming messages from a topic is wrapped within this method.
The KafkaServiceOptions
gather all the necessary configurations for the producer and consumer.
In your application add the service as follows:
services.Configure<KafkaServiceOptions>(this.Configuration.GetSection(nameof(KafkaServiceOptions)));
services.AddKafkaService();
Set the KafkaServiceOptions
in your appsettings
json file as follows:
{
"KafkaServiceOptions": {
"Server": "localhost:9092",
"Topics": [ "myTopic" ]
}
}
try
{
int count = 1;
while (!stoppingToken.IsCancellationRequested)
{
await this.kafkaService.ProduceAsync($"Hi! I'm the event number {count}", "myTopic", stoppingToken).ConfigureAwait(false);
count++;
}
}
catch (Exception ex)
{
this.Logger.LogError($"Exception: {ex.GetType().FullName} | " + $"Message: {ex.Message}");
}
try
{
MessageHandlerDelegate<string> handler = this.HandleAsync;
await this.kafkaService.ConsumeAsync<string>("myTopic", handler, stoppingToken).ConfigureAwait(false);
}
catch (Exception ex)
{
this.Logger.LogError($"Exception: {ex.GetType().FullName} | " + $"Message: {ex.Message}");
}
Notice, that a handler is passed, it's in this method where all the messages will be delivered.
private Task HandleAsync(string @event)
{
this.Logger.LogInformation($"Event received: {@event}");
return Task.CompletedTask;
}
Notice that this a demo, there is a lot of aspects that are not covered here.