From 11a3078b56ae8acde3d6bf5f0eb0cc797aa5984b Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Wed, 15 Jan 2025 15:33:12 +0000 Subject: [PATCH] Create topics when getting the first message in the Migrator output Signed-off-by: Mihai Todor --- .../enterprise/redpanda_migrator_output.go | 68 +++++++------------ 1 file changed, 26 insertions(+), 42 deletions(-) diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_output.go b/internal/impl/kafka/enterprise/redpanda_migrator_output.go index e17f3bf856..a7f48a13ca 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_output.go +++ b/internal/impl/kafka/enterprise/redpanda_migrator_output.go @@ -13,7 +13,6 @@ import ( "fmt" "slices" "sync" - "time" "github.com/twmb/franz-go/pkg/kgo" franz_sr "github.com/twmb/franz-go/pkg/sr" @@ -177,7 +176,7 @@ func init() { // Stores the source to destination SchemaID mapping. var schemaIDCache sync.Map var topicCache sync.Map - createdTopicsOnConnect := false + var runOnce sync.Once output, err = kafka.NewFranzWriterFromConfig( conf, kafka.NewFranzWriterHooks( @@ -192,19 +191,25 @@ func init() { } } - if err := fn(&kafka.FranzSharedClientInfo{Client: client, ConnDetails: connDetails}); err != nil { - return err - } + return fn(&kafka.FranzSharedClientInfo{Client: client, ConnDetails: connDetails}) + }).WithYieldClientFn( + func(context.Context) error { + clientMut.Lock() + defer clientMut.Unlock() - if createdTopicsOnConnect { + if client == nil { return nil } - // Make multiple attempts until the input connects in the background. - // TODO: It would be nicer to somehow get notified when the input is ready. - loop: - for { - if err = kafka.FranzSharedClientUse(inputResource, mgr, func(details *kafka.FranzSharedClientInfo) error { + client.Close() + client = nil + return nil + }).WithWriteHookFn( + func(ctx context.Context, client *kgo.Client, records []*kgo.Record) error { + // Try to create all topics which the input `redpanda_migrator` resource is configured to read + // from when we receive the first message. + runOnce.Do(func() { + err := kafka.FranzSharedClientUse(inputResource, mgr, func(details *kafka.FranzSharedClientInfo) error { inputClient := details.Client outputClient := client topics := inputClient.GetConsumeTopics() @@ -215,13 +220,13 @@ func init() { topicCache.Store(topic, struct{}{}) mgr.Logger().Debugf("Topic %q already exists", topic) } else { + // This may be a topic which doesn't have any messages in it, so if we + // failed to create it now, we log an error and continue. If it does contain + // messages, we'll attempt to create it again anyway when receiving a + // message from it. mgr.Logger().Errorf("Failed to create topic %q and ACLs: %s", topic, err) } - // This may be a topic which doesn't have any messages in it, so if we can't create - // it here, we just log an error and carry on. If it does contain messages, we'll - // attempt to create it again anyway during WriteBatch and we'll raise another error - // there if we can't. continue } @@ -234,34 +239,13 @@ func init() { topicCache.Store(topic, struct{}{}) } - createdTopicsOnConnect = true return nil - }); err == nil { - break + }) + if err == nil { + mgr.Logger().Errorf("Failed to fetch topics from input %q: %s", inputResource, err) } + }) - select { - case <-time.After(100 * time.Millisecond): - case <-ctx.Done(): - break loop - } - } - - return nil - }).WithYieldClientFn( - func(context.Context) error { - clientMut.Lock() - defer clientMut.Unlock() - - if client == nil { - return nil - } - - client.Close() - client = nil - return nil - }).WithWriteHookFn( - func(ctx context.Context, client *kgo.Client, records []*kgo.Record) error { if translateSchemaIDs { if res, ok := mgr.GetGeneric(schemaRegistryOutputResource); ok { srOutput := res.(*schemaRegistryOutput) @@ -299,8 +283,8 @@ func init() { } - // Once we get here, the input should already be initialised and its pre-flight hook should have - // been called already. Thus, we don't need to loop until the input is ready. + // The current record may be coming from a topic which was created later during runtime, so we + // need to try and create it if we haven't done so already. if err := kafka.FranzSharedClientUse(inputResource, mgr, func(details *kafka.FranzSharedClientInfo) error { for _, record := range records { if _, ok := topicCache.Load(record.Topic); !ok {