Skip to content

Commit

Permalink
Create topics when getting the first message in the Migrator output
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor committed Jan 15, 2025
1 parent 9f67fea commit 11a3078
Showing 1 changed file with 26 additions and 42 deletions.
68 changes: 26 additions & 42 deletions internal/impl/kafka/enterprise/redpanda_migrator_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"fmt"
"slices"
"sync"
"time"

"github.com/twmb/franz-go/pkg/kgo"
franz_sr "github.com/twmb/franz-go/pkg/sr"
Expand Down Expand Up @@ -177,7 +176,7 @@ func init() {
// Stores the source to destination SchemaID mapping.
var schemaIDCache sync.Map
var topicCache sync.Map
createdTopicsOnConnect := false
var runOnce sync.Once
output, err = kafka.NewFranzWriterFromConfig(
conf,
kafka.NewFranzWriterHooks(
Expand All @@ -192,19 +191,25 @@ func init() {
}
}

if err := fn(&kafka.FranzSharedClientInfo{Client: client, ConnDetails: connDetails}); err != nil {
return err
}
return fn(&kafka.FranzSharedClientInfo{Client: client, ConnDetails: connDetails})
}).WithYieldClientFn(
func(context.Context) error {
clientMut.Lock()
defer clientMut.Unlock()

if createdTopicsOnConnect {
if client == nil {
return nil
}

// Make multiple attempts until the input connects in the background.
// TODO: It would be nicer to somehow get notified when the input is ready.
loop:
for {
if err = kafka.FranzSharedClientUse(inputResource, mgr, func(details *kafka.FranzSharedClientInfo) error {
client.Close()
client = nil
return nil
}).WithWriteHookFn(
func(ctx context.Context, client *kgo.Client, records []*kgo.Record) error {
// Try to create all topics which the input `redpanda_migrator` resource is configured to read
// from when we receive the first message.
runOnce.Do(func() {
err := kafka.FranzSharedClientUse(inputResource, mgr, func(details *kafka.FranzSharedClientInfo) error {
inputClient := details.Client
outputClient := client
topics := inputClient.GetConsumeTopics()
Expand All @@ -215,13 +220,13 @@ func init() {
topicCache.Store(topic, struct{}{})
mgr.Logger().Debugf("Topic %q already exists", topic)
} else {
// This may be a topic which doesn't have any messages in it, so if we
// failed to create it now, we log an error and continue. If it does contain
// messages, we'll attempt to create it again anyway when receiving a
// message from it.
mgr.Logger().Errorf("Failed to create topic %q and ACLs: %s", topic, err)
}

// This may be a topic which doesn't have any messages in it, so if we can't create
// it here, we just log an error and carry on. If it does contain messages, we'll
// attempt to create it again anyway during WriteBatch and we'll raise another error
// there if we can't.
continue
}

Expand All @@ -234,34 +239,13 @@ func init() {
topicCache.Store(topic, struct{}{})
}

createdTopicsOnConnect = true
return nil
}); err == nil {
break
})
if err == nil {
mgr.Logger().Errorf("Failed to fetch topics from input %q: %s", inputResource, err)
}
})

select {
case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
break loop
}
}

return nil
}).WithYieldClientFn(
func(context.Context) error {
clientMut.Lock()
defer clientMut.Unlock()

if client == nil {
return nil
}

client.Close()
client = nil
return nil
}).WithWriteHookFn(
func(ctx context.Context, client *kgo.Client, records []*kgo.Record) error {
if translateSchemaIDs {
if res, ok := mgr.GetGeneric(schemaRegistryOutputResource); ok {
srOutput := res.(*schemaRegistryOutput)
Expand Down Expand Up @@ -299,8 +283,8 @@ func init() {

}

// Once we get here, the input should already be initialised and its pre-flight hook should have
// been called already. Thus, we don't need to loop until the input is ready.
// The current record may be coming from a topic which was created later during runtime, so we
// need to try and create it if we haven't done so already.
if err := kafka.FranzSharedClientUse(inputResource, mgr, func(details *kafka.FranzSharedClientInfo) error {
for _, record := range records {
if _, ok := topicCache.Load(record.Topic); !ok {
Expand Down

0 comments on commit 11a3078

Please sign in to comment.