Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producing to inexistent topics with kgo.AllowAutoTopicCreation() and kgo.ConsumeRegex() options #906

Open
Andrey-Metelyov opened this issue Feb 10, 2025 · 1 comment
Labels
bug Something isn't working

Comments

@Andrey-Metelyov
Copy link

Problem Description

If destination topic does not exist, and client configured with kgo.AllowAutoTopicCreation() and kgo.ConsumeRegex() options, client.ProduceSync hangs with logs:

[DEBUG] wrote Fetch v16 broker: 1 bytes_written: 145 write_wait: 0s time_to_write: 0s err: <nil>
[INFO] producing to a new topic for the first time, fetching metadata to learn its partitions topic: result.topic.b
[INFO] immediate metadata update triggered why: forced load because we are producing to a topic for the first time
[DEBUG] wrote Metadata v12 broker: 1 bytes_written: 22 write_wait: 332µs time_to_write: 152.5µs err: <nil>
[INFO] producing to a new topic for the first time, fetching metadata to learn its partitions topic: result.topic.a
[DEBUG] read Metadata v12 broker: 1 bytes_read: 1997 read_wait: 1.0064ms time_to_read: 2.803ms err: <nil>
[DEBUG] metadata refresh has identical topic partition data topic: topic.a partition: 0 leader: 1 leader_epoch: 0
[DEBUG] metadata refresh has identical topic partition data topic: topic.b partition: 0 leader: 1 leader_epoch: 0
[DEBUG] immediate metadata update had inner errors, re-updating errors: topic_missing{} update_after: 250ms
.
.
.
[DEBUG] wrote Metadata v12 broker: 1 bytes_written: 22 write_wait: 0s time_to_write: 276.8µs err: <nil>
[DEBUG] read Metadata v12 broker: 1 bytes_read: 1997 read_wait: 2.0099ms time_to_read: 2.7625ms err: <nil>
[DEBUG] metadata refresh has identical topic partition data topic: topic.b partition: 0 leader: 1 leader_epoch: 0
[DEBUG] metadata refresh has identical topic partition data topic: topic.a partition: 0 leader: 1 leader_epoch: 0
[DEBUG] immediate metadata update had inner errors, re-updating errors: topic_missing{} update_after: 250ms

How to reproduce

Start Kafka in Docker with docker-compose.yml like this:

services:
  kafka:
    image: apache/kafka
    ports:
      - "9092:9092"

Execute this in container:

cd /opt/kafka/bin
echo "message 1" | ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic.a
echo "message 2" | ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic.b

To check available topics in kafka:

./kafka-topics.sh --bootstrap-server localhost:9092 --list

To delete topic:

./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic result.topic.a

Start application:

package main

import (
	"context"
	"fmt"
	"github.com/twmb/franz-go/pkg/kgo"
)

type logger struct {
}

func (l *logger) Level() kgo.LogLevel {
	return kgo.LogLevelDebug
}

func (l *logger) Log(level kgo.LogLevel, msg string, keyvals ...any) {
	fmt.Printf("[%s] %s", level, msg)
	for i := 0; i < len(keyvals); i += 2 {
		fmt.Printf(" %s: %v", keyvals[i], keyvals[i+1])
	}
	fmt.Println("")
}

func main() {
	topics := []string{"^topic\\..*"}
	// uncomment this and comment kgo.ConsumeRegex() for second case
	//topics := []string{"topic.a", "topic.b"}
	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost"),
		kgo.ConsumeTopics(topics...),
		kgo.ConsumeRegex(),
		kgo.AllowAutoTopicCreation(),
		kgo.WithLogger(&logger{}),
	)
	if err != nil {
		fmt.Println("error", err)
		return
	}
	ctx := context.Background()

	fmt.Println("Starting loop")
	for {
		fetches := client.PollRecords(ctx, 10)
		fmt.Printf("Read %d messages\n", len(fetches))

		var records []*kgo.Record
		iter := fetches.RecordIter()
		for !iter.Done() {
			record := iter.Next()
			topic := fmt.Sprintf("result.%s", record.Topic)
			fmt.Printf("Prepare message for topic: %s\n", topic)
			records = append(records, &kgo.Record{
				Topic: topic,
				Key:   record.Key,
				Value: record.Value,
			})
		}

		fmt.Printf("Sending %d messages\n", len(records))
		produceResults := client.ProduceSync(ctx, records...)
		for _, result := range produceResults {
			fmt.Printf("result: %+v\n", result)
		}
	}
}

When using list of topics without regexp (see comment) and remove kgo.ConsumeRegex() option in kgo.NewClient everything works. Topics result.topic.a and result.topic.b will be successfully created.

@twmb
Copy link
Owner

twmb commented Feb 18, 2025

Ack, will look into it.

@twmb twmb added the bug Something isn't working label Feb 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants