From fbedadb3edb37b8be0ff652a140450a14e216929 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 23 Oct 2025 11:45:32 +0200 Subject: [PATCH 1/3] #648 Kafka Source: Group kafka generated columns into a configurable struct column. --- README.md | 7 ++++ .../absa/pramen/api/offset/OffsetValue.scala | 3 -- .../core/bookkeeper/OffsetManagerUtils.scala | 12 +++--- .../pipeline/IncrementalIngestionJob.scala | 13 +++--- .../absa/pramen/core/utils/SparkUtils.scala | 32 ++++++++++++++ .../bookkeeper/OffsetManagerUtilsSuite.scala | 8 +++- .../pramen/extras/sink/KafkaAvroSink.scala | 10 ++++- .../extras/source/KafkaAvroSource.scala | 42 +++++++++++++------ 8 files changed, 99 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 3aa7e8d8..1d4df1a7 100644 --- a/README.md +++ b/README.md @@ -939,6 +939,10 @@ pramen.sources = [ name = "kafka_source" factory.class = "za.co.absa.pramen.extras.source.KafkaAvroSource" + # [Optional] Set name for the struct field that Kafka record metadata + #custom.kafka.column = "kafka" + # [Optional] Set name for the Kafka key column + #key.column.name = "kafka_key" kafka { bootstrap.servers = "mybroker1:9092,mybroker2:9092" @@ -1010,6 +1014,9 @@ pramen.sinks = [ name = "kafka_avro" factory.class = "za.co.absa.pramen.extras.sink.KafkaAvroSink" + # [Optional] Set name for the struct field that Kafka record metadata. This column will be dropped if exists before sending data to Kafka. + #custom.kafka.column = "kafka" + kafka { bootstrap.servers = "mybroker1:9092,mybroker2:9092" diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala index 627bde18..9a0ed16f 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala @@ -32,9 +32,6 @@ sealed trait OffsetValue extends Comparable[OffsetValue] { } object OffsetValue { - val KAFKA_PARTITION_FIELD = "kafka_partition" - val KAFKA_OFFSET_FIELD = "kafka_offset" - case class DateTimeValue(t: Instant) extends OffsetValue { override val dataType: OffsetType = OffsetType.DateTimeType diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala index 9c496df5..20d097a7 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala @@ -19,7 +19,6 @@ package za.co.absa.pramen.core.bookkeeper import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.{col, max, min} import org.apache.spark.sql.types.StringType -import za.co.absa.pramen.api.offset.OffsetValue.{KAFKA_OFFSET_FIELD, KAFKA_PARTITION_FIELD} import za.co.absa.pramen.api.offset.{KafkaPartition, OffsetType, OffsetValue} import za.co.absa.pramen.api.sql.SqlGeneratorBase @@ -28,12 +27,15 @@ object OffsetManagerUtils { if (df.isEmpty) { None } else { + val kafkaOffsetFieldName = s"$offsetColumn.offset" + val kafkaPartitionFieldName = s"$offsetColumn.partition" + if (offsetType == OffsetType.KafkaType) { - val aggregatedDf = df.groupBy(col(KAFKA_PARTITION_FIELD)) + val aggregatedDf = df.groupBy(col(kafkaPartitionFieldName)) .agg( - min(col(KAFKA_OFFSET_FIELD)).as("min_offset"), - max(col(KAFKA_OFFSET_FIELD)).as("max_offset") - ).orderBy(KAFKA_PARTITION_FIELD) + min(col(kafkaOffsetFieldName)).as("min_offset"), + max(col(kafkaOffsetFieldName)).as("max_offset") + ).orderBy(kafkaPartitionFieldName) val minValue = OffsetValue.KafkaValue(aggregatedDf.collect().map(row => KafkaPartition(row.getAs[Int](0), row.getAs[Long](1))).toSeq) val maxValue = OffsetValue.KafkaValue(aggregatedDf.collect().map(row => KafkaPartition(row.getAs[Int](0), row.getAs[Long](2))).toSeq) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index 59699cca..a39aa30d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession} import za.co.absa.pramen.api.jobdef.SourceTable import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset -import za.co.absa.pramen.api.offset.OffsetValue.{KAFKA_OFFSET_FIELD, KAFKA_PARTITION_FIELD} import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType} import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} import za.co.absa.pramen.api.{Reason, Source} @@ -31,6 +30,7 @@ import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager, OffsetManag import za.co.absa.pramen.core.metastore.Metastore import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental} +import za.co.absa.pramen.core.utils.SparkUtils import za.co.absa.pramen.core.utils.SparkUtils._ import java.time.{Instant, LocalDate} @@ -307,16 +307,19 @@ class IncrementalIngestionJob(operationDef: OperationDef, s"But only string type is supported for offset type '${offsetInfo.offsetType.dataTypeString}'.") } case OffsetType.KafkaType => - val offsetField = df.schema(KAFKA_OFFSET_FIELD) - val partitionField = df.schema(KAFKA_PARTITION_FIELD) + val kafkaFieldName = field.name + val kafkaOffsetFieldName = s"$kafkaFieldName.offset" + val kafkaPartitionFieldName = s"$kafkaFieldName.partition" + val offsetField = SparkUtils.getNestedField(df.schema, kafkaOffsetFieldName) + val partitionField = SparkUtils.getNestedField(df.schema, kafkaPartitionFieldName) if (offsetField.dataType != LongType) { - throw new IllegalArgumentException(s"Kafka offset column '$KAFKA_OFFSET_FIELD' has type '${offsetField.dataType}'. " + + throw new IllegalArgumentException(s"Kafka offset column '$kafkaOffsetFieldName' has type '${offsetField.dataType}'. " + s"But only '${LongType.typeName}' is supported for offset type '${offsetInfo.offsetType.dataTypeString}'.") } if (partitionField.dataType != IntegerType) { - throw new IllegalArgumentException(s"Kafka partition column '$KAFKA_PARTITION_FIELD' has type '${partitionField.dataType}'. " + + throw new IllegalArgumentException(s"Kafka partition column '$kafkaPartitionFieldName' has type '${partitionField.dataType}'. " + s"But only '${IntegerType.typeName}' is supported for offset type '${offsetInfo.offsetType.dataTypeString}'.") } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala index abcd1706..193e27a2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala @@ -615,6 +615,38 @@ object SparkUtils { } } + def getNestedField(schema: StructType, fieldName: String): StructField = { + def getNestedFieldInArray(schema: StructType, fieldNames: Array[String]): StructField = { + val rootFieldName = fieldNames.head + val rootFieldOpt = schema.fields.find(_.name.equalsIgnoreCase(rootFieldName)) + + rootFieldOpt match { + case Some(field) => + if (fieldNames.length == 1) { + field + } else { + field.dataType match { + case struct: StructType => + getNestedFieldInArray(struct, fieldNames.drop(1)) + case other => + throw new IllegalArgumentException(s"Field '${field.name}' (of $fieldName) is of type '${other.typeName}', expected StructType.") + } + } + case None => + val fields = schema.fields.map(_.name).mkString("[ ", ", ", " ]") + throw new IllegalArgumentException(s"Field $rootFieldName (of '$fieldName') not found in the schema. Available fields: $fields") + } + } + + val fieldNames = fieldName.split('.') + + if (fieldNames.length < 1) { + throw new IllegalArgumentException(s"Field name '$fieldName' is not valid.") + } + + getNestedFieldInArray(schema, fieldNames) + } + private def getActualProcessingTimeUdf: UserDefinedFunction = { udf((_: Long) => Instant.now().getEpochSecond) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala index 4dc7ef37..dc25c180 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala @@ -16,7 +16,7 @@ package za.co.absa.pramen.core.tests.bookkeeper -import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.{col, struct} import org.apache.spark.sql.types.TimestampType import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.offset.{KafkaPartition, OffsetType, OffsetValue} @@ -58,8 +58,12 @@ class OffsetManagerUtilsSuite extends AnyWordSpec with SparkTestBase { (2, 2L, "f"), (2, 3L, "g") ).toDF("kafka_partition", "kafka_offset", "field") + .withColumn("kafka", struct( + col("kafka_offset").as("offset"), + col("kafka_partition").as("partition") + )) - val (minValue, maxValue) = OffsetManagerUtils.getMinMaxValueFromData(df, "kafka_offset", OffsetType.KafkaType).get + val (minValue, maxValue) = OffsetManagerUtils.getMinMaxValueFromData(df, "kafka", OffsetType.KafkaType).get assert(minValue == OffsetValue.KafkaValue(Seq(KafkaPartition(0, 1), KafkaPartition(1, 3), KafkaPartition(2, 1)))) assert(maxValue == OffsetValue.KafkaValue(Seq(KafkaPartition(0, 2), KafkaPartition(1, 4), KafkaPartition(2, 3)))) diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala index 089ecb7c..39416025 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala @@ -20,6 +20,8 @@ import com.typesafe.config.Config import org.apache.spark.sql.{DataFrame, SparkSession} import za.co.absa.pramen.api.{ExternalChannelFactory, MetastoreReader, Sink, SinkResult} import za.co.absa.pramen.extras.sink.KafkaAvroSink.TOPIC_NAME_KEY +import za.co.absa.pramen.extras.source.KafkaAvroSource.CUSTOM_KAFKA_COLUMN_KEY +import za.co.absa.pramen.extras.utils.ConfigUtils import za.co.absa.pramen.extras.writer.TableWriterKafka import za.co.absa.pramen.extras.writer.model.KafkaAvroWriterConfig @@ -37,6 +39,9 @@ import java.time.LocalDate * name = "kafka_avro" * factory.class = "za.co.absa.pramen.extras.sink.KafkaAvroSink" * + * # [Optional] Set name for the struct field that Kafka record metadata. This column will be dropped if exists before sending data to Kafka. + * custom.kafka.column = "kafka" + * * kafka { * bootstrap.servers = "mybroker1:9092,mybroker2:9092" * @@ -114,6 +119,7 @@ import java.time.LocalDate */ class KafkaAvroSink(sinkConfig: Config, val kafkaWriterConfig: KafkaAvroWriterConfig) extends Sink { + private val kafkaColumnName = ConfigUtils.getOptionString(sinkConfig, CUSTOM_KAFKA_COLUMN_KEY).getOrElse("kafka") override val config: Config = sinkConfig override def connect(): Unit = {} @@ -132,7 +138,9 @@ class KafkaAvroSink(sinkConfig: Config, val writer = new TableWriterKafka(topicName, kafkaWriterConfig) - SinkResult(writer.write(df, infoDate, None)) + val dfWithoutKafkaField = df.drop(kafkaColumnName) + + SinkResult(writer.write(dfWithoutKafkaField, infoDate, None)) } } diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala index 7e2c67c1..184f5a12 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala @@ -18,13 +18,12 @@ package za.co.absa.pramen.extras.source import com.typesafe.config.Config import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.{col, struct} import za.co.absa.abris.avro.functions.from_avro import za.co.absa.abris.config.AbrisConfig -import za.co.absa.pramen.api.offset.OffsetValue.{KAFKA_OFFSET_FIELD, KAFKA_PARTITION_FIELD, KafkaValue} +import za.co.absa.pramen.api.offset.OffsetValue.KafkaValue import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue} import za.co.absa.pramen.api.{ExternalChannelFactoryV2, Query, Source, SourceResult} -import za.co.absa.pramen.extras.source.KafkaAvroSource.KAFKA_TOKENS_TO_REDACT import za.co.absa.pramen.extras.utils.ConfigUtils import za.co.absa.pramen.extras.writer.model.KafkaAvroConfig @@ -46,6 +45,11 @@ import java.time.LocalDate * name = "kafka_avro" * factory.class = "za.co.absa.pramen.extras.source.KafkaAvroSource" * + * # [Optional] Set name for the struct field that Kafka record metadata + * custom.kafka.column = "kafka" + * # [Optional] Set name for the Kafka key column + * key.column.name = "kafka_key" + * * kafka { * bootstrap.servers = "mybroker1:9092,mybroker2:9092" * @@ -103,10 +107,15 @@ class KafkaAvroSource(sourceConfig: Config, workflowConfig: Config, val kafkaAvroConfig: KafkaAvroConfig) (implicit spark: SparkSession) extends Source { + import za.co.absa.pramen.extras.source.KafkaAvroSource._ + + private val kafkaColumnName = ConfigUtils.getOptionString(sourceConfig, CUSTOM_KAFKA_COLUMN_KEY).getOrElse("kafka") + private val keyColumnName = ConfigUtils.getOptionString(sourceConfig, KEY_COLUMN_KEY).getOrElse("kafka_key") + override def hasInfoDateColumn(query: Query): Boolean = false override def getOffsetInfo: Option[OffsetInfo] = { - Some(OffsetInfo("kafka_offset", OffsetType.KafkaType)) + Some(OffsetInfo(kafkaColumnName, OffsetType.KafkaType)) } override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = { @@ -187,24 +196,31 @@ class KafkaAvroSource(sourceConfig: Config, val df1 = dfRaw .withColumn("data", from_avro(col("value"), abrisValueConfig)) - .withColumn(KAFKA_PARTITION_FIELD, col("partition")) - .withColumn(KAFKA_OFFSET_FIELD, col("offset")) - .withColumn("kafka_timestamp", col("timestamp")) - .withColumn("kafka_timestamp_type", col("timestampType")) + .withColumn("tmp_pramen_kafka", struct( + col("partition"), + col("offset"), + col("timestamp"), + col("timestampType").as("timestamp_type") + )) val df2 = kafkaAvroConfig.keyNamingStrategy match { case Some(keyNamingStrategy) => val abrisKeyConfig = keyNamingStrategy .applyNamingStrategyToAbrisConfig(abrisValueBase, topic, isKey = true) .usingSchemaRegistry(schemaRegistryClientConfig) - df1.withColumn("kafka_key", from_avro(col("key"), abrisKeyConfig)) + df1.withColumn("tmp_pramen_kafka_key", from_avro(col("key"), abrisKeyConfig)) case None => - df1.withColumn("kafka_key", col("key")) + df1.withColumn("tmp_pramen_kafka_key", col("key")) } - // Put data fields to the root level of the schema: + // Put data fields to the root level of the schema, and if data struct already has kafka_key and kafka fields, + // drop them val dfFinal = df2 - .select(KAFKA_PARTITION_FIELD, KAFKA_OFFSET_FIELD, "kafka_timestamp", "kafka_timestamp_type", "kafka_key", "data.*") + .select("tmp_pramen_kafka_key", "data.*", "tmp_pramen_kafka") + .drop(kafkaColumnName) + .drop(keyColumnName) + .withColumnRenamed("tmp_pramen_kafka", kafkaColumnName) + .withColumnRenamed("tmp_pramen_kafka_key", keyColumnName) SourceResult(dfFinal) } @@ -214,6 +230,8 @@ class KafkaAvroSource(sourceConfig: Config, object KafkaAvroSource extends ExternalChannelFactoryV2[KafkaAvroSource] { val TOPIC_NAME_KEY = "topic.name" + val CUSTOM_KAFKA_COLUMN_KEY = "custom.kafka.column" + val KEY_COLUMN_KEY = "key.column.name" val KAFKA_TOKENS_TO_REDACT = Set("password", "jaas.config", "auth.user.info") From fdc5cb4f1d78fbd21bfda68da7017336940b4305 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 23 Oct 2025 12:07:24 +0200 Subject: [PATCH 2/3] #648 Fix PR suggestions. --- README.md | 4 ++-- .../scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala | 2 +- .../za/co/absa/pramen/extras/source/KafkaAvroSource.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 1d4df1a7..d04b3cc9 100644 --- a/README.md +++ b/README.md @@ -939,7 +939,7 @@ pramen.sources = [ name = "kafka_source" factory.class = "za.co.absa.pramen.extras.source.KafkaAvroSource" - # [Optional] Set name for the struct field that Kafka record metadata + # [Optional] Set name for the struct field that contains Kafka record metadata #custom.kafka.column = "kafka" # [Optional] Set name for the Kafka key column #key.column.name = "kafka_key" @@ -1014,7 +1014,7 @@ pramen.sinks = [ name = "kafka_avro" factory.class = "za.co.absa.pramen.extras.sink.KafkaAvroSink" - # [Optional] Set name for the struct field that Kafka record metadata. This column will be dropped if exists before sending data to Kafka. + # [Optional] Set name for the struct field that contains Kafka record metadata. This column will be dropped if It exists before sending data to Kafka. #custom.kafka.column = "kafka" kafka { diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala index 39416025..6404b8a7 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala @@ -39,7 +39,7 @@ import java.time.LocalDate * name = "kafka_avro" * factory.class = "za.co.absa.pramen.extras.sink.KafkaAvroSink" * - * # [Optional] Set name for the struct field that Kafka record metadata. This column will be dropped if exists before sending data to Kafka. + * # [Optional] Set name for the struct field that contains Kafka record metadata. This column will be dropped if It exists before sending data to Kafka. * custom.kafka.column = "kafka" * * kafka { diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala index 184f5a12..230f561b 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala @@ -45,7 +45,7 @@ import java.time.LocalDate * name = "kafka_avro" * factory.class = "za.co.absa.pramen.extras.source.KafkaAvroSource" * - * # [Optional] Set name for the struct field that Kafka record metadata + * # [Optional] Set name for the struct field that contains Kafka record metadata * custom.kafka.column = "kafka" * # [Optional] Set name for the Kafka key column * key.column.name = "kafka_key" From cd10447aef6b9321cf683af3a1dd513e95320022 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 23 Oct 2025 13:13:22 +0200 Subject: [PATCH 3/3] #648 Fix more PR suggestions. --- .../za/co/absa/pramen/extras/sink/KafkaAvroSink.scala | 11 +++++++++-- .../absa/pramen/extras/source/KafkaAvroSource.scala | 11 +++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala index 6404b8a7..bda402f8 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala @@ -18,6 +18,7 @@ package za.co.absa.pramen.extras.sink import com.typesafe.config.Config import org.apache.spark.sql.{DataFrame, SparkSession} +import org.slf4j.LoggerFactory import za.co.absa.pramen.api.{ExternalChannelFactory, MetastoreReader, Sink, SinkResult} import za.co.absa.pramen.extras.sink.KafkaAvroSink.TOPIC_NAME_KEY import za.co.absa.pramen.extras.source.KafkaAvroSource.CUSTOM_KAFKA_COLUMN_KEY @@ -39,7 +40,7 @@ import java.time.LocalDate * name = "kafka_avro" * factory.class = "za.co.absa.pramen.extras.sink.KafkaAvroSink" * - * # [Optional] Set name for the struct field that contains Kafka record metadata. This column will be dropped if It exists before sending data to Kafka. + * # [Optional] Set name for the struct field that contains Kafka record metadata. This column will be dropped if it exists before sending data to Kafka. * custom.kafka.column = "kafka" * * kafka { @@ -119,6 +120,7 @@ import java.time.LocalDate */ class KafkaAvroSink(sinkConfig: Config, val kafkaWriterConfig: KafkaAvroWriterConfig) extends Sink { + private val log = LoggerFactory.getLogger(this.getClass) private val kafkaColumnName = ConfigUtils.getOptionString(sinkConfig, CUSTOM_KAFKA_COLUMN_KEY).getOrElse("kafka") override val config: Config = sinkConfig @@ -138,7 +140,12 @@ class KafkaAvroSink(sinkConfig: Config, val writer = new TableWriterKafka(topicName, kafkaWriterConfig) - val dfWithoutKafkaField = df.drop(kafkaColumnName) + val dfWithoutKafkaField = if (df.schema.fields.exists(_.name.equalsIgnoreCase(kafkaColumnName))) { + log.warn(s"Dropping '$kafkaColumnName' field from the input DataFrame before sending to Kafka because the output topic has its own metadata.") + df.drop(kafkaColumnName) + } else { + df + } SinkResult(writer.write(dfWithoutKafkaField, infoDate, None)) } diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala index 230f561b..3141afea 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala @@ -19,6 +19,7 @@ package za.co.absa.pramen.extras.source import com.typesafe.config.Config import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, struct} +import org.slf4j.LoggerFactory import za.co.absa.abris.avro.functions.from_avro import za.co.absa.abris.config.AbrisConfig import za.co.absa.pramen.api.offset.OffsetValue.KafkaValue @@ -109,6 +110,8 @@ class KafkaAvroSource(sourceConfig: Config, (implicit spark: SparkSession) extends Source { import za.co.absa.pramen.extras.source.KafkaAvroSource._ + private val log = LoggerFactory.getLogger(this.getClass) + private val kafkaColumnName = ConfigUtils.getOptionString(sourceConfig, CUSTOM_KAFKA_COLUMN_KEY).getOrElse("kafka") private val keyColumnName = ConfigUtils.getOptionString(sourceConfig, KEY_COLUMN_KEY).getOrElse("kafka_key") @@ -213,6 +216,14 @@ class KafkaAvroSource(sourceConfig: Config, df1.withColumn("tmp_pramen_kafka_key", col("key")) } + val payloadFields = df2.select("data.*").schema.fieldNames.toSet + if (payloadFields.contains(kafkaColumnName)) { + log.warn(s"Payload field '$kafkaColumnName' conflicts with Kafka metadata struct name and will be replaced.") + } + if (payloadFields.contains(keyColumnName)) { + log.warn(s"Payload field '$keyColumnName' conflicts with Kafka key column name and will be replaced.") + } + // Put data fields to the root level of the schema, and if data struct already has kafka_key and kafka fields, // drop them val dfFinal = df2