Skip to content

Commit feee6c0

Browse files
authored
Merge pull request #2894 from redpanda-data/mihaitodor-redpanda-migrator-fix
Fix redpanda-migrator topic creation logic
2 parents ddc9c67 + c07a590 commit feee6c0

File tree

2 files changed

+12
-5
lines changed

2 files changed

+12
-5
lines changed

internal/impl/kafka/enterprise/redpanda_migrator_input.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -557,13 +557,18 @@ func (r *RedpandaMigratorReader) ReadBatch(ctx context.Context) (service.Message
557557

558558
if output != nil {
559559
for _, topic := range topics {
560-
r.mgr.Logger().Infof("Creating topic %q", topic)
561-
562560
if err := createTopic(ctx, topic, r.replicationFactorOverride, r.replicationFactor, r.client, output.client); err != nil && err != errTopicAlreadyExists {
561+
// We could end up attempting to create a topic which doesn't have any messages in it, so if that
562+
// fails, we can just log an error and carry on. If it does contain messages, the output will
563+
// attempt to create it again anyway and will trigger and error if it can't.
564+
// The output `topicCache` could be populated here to avoid the redundant call to create topics, but
565+
// it's not worth the complexity.
563566
r.mgr.Logger().Errorf("Failed to create topic %q and ACLs: %s", topic, err)
564567
} else {
565568
if err == errTopicAlreadyExists {
566569
r.mgr.Logger().Debugf("Topic %q already exists", topic)
570+
} else {
571+
r.mgr.Logger().Infof("Created topic %q in output cluster", topic)
567572
}
568573
if err := createACLs(ctx, topic, r.client, output.client); err != nil {
569574
r.mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", topic, err)

internal/impl/kafka/enterprise/redpanda_migrator_output.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -449,17 +449,19 @@ func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.Messa
449449

450450
if input != nil {
451451
if _, ok := w.topicCache.Load(topic); !ok {
452-
w.mgr.Logger().Infof("Creating topic %q", topic)
453-
454452
if err := createTopic(ctx, topic, w.replicationFactorOverride, w.replicationFactor, input.client, w.client); err != nil && err != errTopicAlreadyExists {
455453
return fmt.Errorf("failed to create topic %q: %s", topic, err)
456454
} else {
457455
if err == errTopicAlreadyExists {
458-
w.mgr.Logger().Infof("Topic %q already exists", topic)
456+
w.mgr.Logger().Debugf("Topic %q already exists", topic)
457+
} else {
458+
w.mgr.Logger().Infof("Created topic %q", topic)
459459
}
460460
if err := createACLs(ctx, topic, input.client, w.client); err != nil {
461461
w.mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", topic, err)
462462
}
463+
464+
w.topicCache.Store(topic, struct{}{})
463465
}
464466
}
465467
}

0 commit comments

Comments
 (0)