Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,10 @@ pramen.sources = [
name = "kafka_source"
factory.class = "za.co.absa.pramen.extras.source.KafkaAvroSource"

# [Optional] Set name for the struct field that contains Kafka record metadata
#custom.kafka.column = "kafka"
# [Optional] Set name for the Kafka key column
#key.column.name = "kafka_key"

kafka {
bootstrap.servers = "mybroker1:9092,mybroker2:9092"
Expand Down Expand Up @@ -1010,6 +1014,9 @@ pramen.sinks = [
name = "kafka_avro"
factory.class = "za.co.absa.pramen.extras.sink.KafkaAvroSink"

# [Optional] Set name for the struct field that contains Kafka record metadata. This column will be dropped if It exists before sending data to Kafka.
#custom.kafka.column = "kafka"

kafka {
bootstrap.servers = "mybroker1:9092,mybroker2:9092"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ sealed trait OffsetValue extends Comparable[OffsetValue] {
}

object OffsetValue {
val KAFKA_PARTITION_FIELD = "kafka_partition"
val KAFKA_OFFSET_FIELD = "kafka_offset"

case class DateTimeValue(t: Instant) extends OffsetValue {
override val dataType: OffsetType = OffsetType.DateTimeType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package za.co.absa.pramen.core.bookkeeper
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, max, min}
import org.apache.spark.sql.types.StringType
import za.co.absa.pramen.api.offset.OffsetValue.{KAFKA_OFFSET_FIELD, KAFKA_PARTITION_FIELD}
import za.co.absa.pramen.api.offset.{KafkaPartition, OffsetType, OffsetValue}
import za.co.absa.pramen.api.sql.SqlGeneratorBase

Expand All @@ -28,12 +27,15 @@ object OffsetManagerUtils {
if (df.isEmpty) {
None
} else {
val kafkaOffsetFieldName = s"$offsetColumn.offset"
val kafkaPartitionFieldName = s"$offsetColumn.partition"

if (offsetType == OffsetType.KafkaType) {
val aggregatedDf = df.groupBy(col(KAFKA_PARTITION_FIELD))
val aggregatedDf = df.groupBy(col(kafkaPartitionFieldName))
.agg(
min(col(KAFKA_OFFSET_FIELD)).as("min_offset"),
max(col(KAFKA_OFFSET_FIELD)).as("max_offset")
).orderBy(KAFKA_PARTITION_FIELD)
min(col(kafkaOffsetFieldName)).as("min_offset"),
max(col(kafkaOffsetFieldName)).as("max_offset")
).orderBy(kafkaPartitionFieldName)

val minValue = OffsetValue.KafkaValue(aggregatedDf.collect().map(row => KafkaPartition(row.getAs[Int](0), row.getAs[Long](1))).toSeq)
val maxValue = OffsetValue.KafkaValue(aggregatedDf.collect().map(row => KafkaPartition(row.getAs[Int](0), row.getAs[Long](2))).toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.jobdef.SourceTable
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.OffsetValue.{KAFKA_OFFSET_FIELD, KAFKA_PARTITION_FIELD}
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType}
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Source}
Expand All @@ -31,6 +30,7 @@ import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager, OffsetManag
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode}
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental}
import za.co.absa.pramen.core.utils.SparkUtils
import za.co.absa.pramen.core.utils.SparkUtils._

import java.time.{Instant, LocalDate}
Expand Down Expand Up @@ -307,16 +307,19 @@ class IncrementalIngestionJob(operationDef: OperationDef,
s"But only string type is supported for offset type '${offsetInfo.offsetType.dataTypeString}'.")
}
case OffsetType.KafkaType =>
val offsetField = df.schema(KAFKA_OFFSET_FIELD)
val partitionField = df.schema(KAFKA_PARTITION_FIELD)
val kafkaFieldName = field.name
val kafkaOffsetFieldName = s"$kafkaFieldName.offset"
val kafkaPartitionFieldName = s"$kafkaFieldName.partition"
val offsetField = SparkUtils.getNestedField(df.schema, kafkaOffsetFieldName)
val partitionField = SparkUtils.getNestedField(df.schema, kafkaPartitionFieldName)

if (offsetField.dataType != LongType) {
throw new IllegalArgumentException(s"Kafka offset column '$KAFKA_OFFSET_FIELD' has type '${offsetField.dataType}'. " +
throw new IllegalArgumentException(s"Kafka offset column '$kafkaOffsetFieldName' has type '${offsetField.dataType}'. " +
s"But only '${LongType.typeName}' is supported for offset type '${offsetInfo.offsetType.dataTypeString}'.")
}

if (partitionField.dataType != IntegerType) {
throw new IllegalArgumentException(s"Kafka partition column '$KAFKA_PARTITION_FIELD' has type '${partitionField.dataType}'. " +
throw new IllegalArgumentException(s"Kafka partition column '$kafkaPartitionFieldName' has type '${partitionField.dataType}'. " +
s"But only '${IntegerType.typeName}' is supported for offset type '${offsetInfo.offsetType.dataTypeString}'.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,38 @@ object SparkUtils {
}
}

def getNestedField(schema: StructType, fieldName: String): StructField = {
def getNestedFieldInArray(schema: StructType, fieldNames: Array[String]): StructField = {
val rootFieldName = fieldNames.head
val rootFieldOpt = schema.fields.find(_.name.equalsIgnoreCase(rootFieldName))

rootFieldOpt match {
case Some(field) =>
if (fieldNames.length == 1) {
field
} else {
field.dataType match {
case struct: StructType =>
getNestedFieldInArray(struct, fieldNames.drop(1))
case other =>
throw new IllegalArgumentException(s"Field '${field.name}' (of $fieldName) is of type '${other.typeName}', expected StructType.")
}
}
case None =>
val fields = schema.fields.map(_.name).mkString("[ ", ", ", " ]")
throw new IllegalArgumentException(s"Field $rootFieldName (of '$fieldName') not found in the schema. Available fields: $fields")
}
}

val fieldNames = fieldName.split('.')

if (fieldNames.length < 1) {
throw new IllegalArgumentException(s"Field name '$fieldName' is not valid.")
}

getNestedFieldInArray(schema, fieldNames)
}

private def getActualProcessingTimeUdf: UserDefinedFunction = {
udf((_: Long) => Instant.now().getEpochSecond)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.pramen.core.tests.bookkeeper

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.TimestampType
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.api.offset.{KafkaPartition, OffsetType, OffsetValue}
Expand Down Expand Up @@ -58,8 +58,12 @@ class OffsetManagerUtilsSuite extends AnyWordSpec with SparkTestBase {
(2, 2L, "f"),
(2, 3L, "g")
).toDF("kafka_partition", "kafka_offset", "field")
.withColumn("kafka", struct(
col("kafka_offset").as("offset"),
col("kafka_partition").as("partition")
))

val (minValue, maxValue) = OffsetManagerUtils.getMinMaxValueFromData(df, "kafka_offset", OffsetType.KafkaType).get
val (minValue, maxValue) = OffsetManagerUtils.getMinMaxValueFromData(df, "kafka", OffsetType.KafkaType).get

assert(minValue == OffsetValue.KafkaValue(Seq(KafkaPartition(0, 1), KafkaPartition(1, 3), KafkaPartition(2, 1))))
assert(maxValue == OffsetValue.KafkaValue(Seq(KafkaPartition(0, 2), KafkaPartition(1, 4), KafkaPartition(2, 3))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package za.co.absa.pramen.extras.sink

import com.typesafe.config.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{ExternalChannelFactory, MetastoreReader, Sink, SinkResult}
import za.co.absa.pramen.extras.sink.KafkaAvroSink.TOPIC_NAME_KEY
import za.co.absa.pramen.extras.source.KafkaAvroSource.CUSTOM_KAFKA_COLUMN_KEY
import za.co.absa.pramen.extras.utils.ConfigUtils
import za.co.absa.pramen.extras.writer.TableWriterKafka
import za.co.absa.pramen.extras.writer.model.KafkaAvroWriterConfig

Expand All @@ -37,6 +40,9 @@ import java.time.LocalDate
* name = "kafka_avro"
* factory.class = "za.co.absa.pramen.extras.sink.KafkaAvroSink"
*
* # [Optional] Set name for the struct field that contains Kafka record metadata. This column will be dropped if it exists before sending data to Kafka.
* custom.kafka.column = "kafka"
*
* kafka {
* bootstrap.servers = "mybroker1:9092,mybroker2:9092"
*
Expand Down Expand Up @@ -114,6 +120,8 @@ import java.time.LocalDate
*/
class KafkaAvroSink(sinkConfig: Config,
val kafkaWriterConfig: KafkaAvroWriterConfig) extends Sink {
private val log = LoggerFactory.getLogger(this.getClass)
private val kafkaColumnName = ConfigUtils.getOptionString(sinkConfig, CUSTOM_KAFKA_COLUMN_KEY).getOrElse("kafka")

override val config: Config = sinkConfig
override def connect(): Unit = {}
Expand All @@ -132,7 +140,14 @@ class KafkaAvroSink(sinkConfig: Config,

val writer = new TableWriterKafka(topicName, kafkaWriterConfig)

SinkResult(writer.write(df, infoDate, None))
val dfWithoutKafkaField = if (df.schema.fields.exists(_.name.equalsIgnoreCase(kafkaColumnName))) {
log.warn(s"Dropping '$kafkaColumnName' field from the input DataFrame before sending to Kafka because the output topic has its own metadata.")
df.drop(kafkaColumnName)
} else {
df
}

SinkResult(writer.write(dfWithoutKafkaField, infoDate, None))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package za.co.absa.pramen.extras.source

import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.{col, struct}
import org.slf4j.LoggerFactory
import za.co.absa.abris.avro.functions.from_avro
import za.co.absa.abris.config.AbrisConfig
import za.co.absa.pramen.api.offset.OffsetValue.{KAFKA_OFFSET_FIELD, KAFKA_PARTITION_FIELD, KafkaValue}
import za.co.absa.pramen.api.offset.OffsetValue.KafkaValue
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue}
import za.co.absa.pramen.api.{ExternalChannelFactoryV2, Query, Source, SourceResult}
import za.co.absa.pramen.extras.source.KafkaAvroSource.KAFKA_TOKENS_TO_REDACT
import za.co.absa.pramen.extras.utils.ConfigUtils
import za.co.absa.pramen.extras.writer.model.KafkaAvroConfig

Expand All @@ -46,6 +46,11 @@ import java.time.LocalDate
* name = "kafka_avro"
* factory.class = "za.co.absa.pramen.extras.source.KafkaAvroSource"
*
* # [Optional] Set name for the struct field that contains Kafka record metadata
* custom.kafka.column = "kafka"
* # [Optional] Set name for the Kafka key column
* key.column.name = "kafka_key"
*
* kafka {
* bootstrap.servers = "mybroker1:9092,mybroker2:9092"
*
Expand Down Expand Up @@ -103,10 +108,17 @@ class KafkaAvroSource(sourceConfig: Config,
workflowConfig: Config,
val kafkaAvroConfig: KafkaAvroConfig)
(implicit spark: SparkSession) extends Source {
import za.co.absa.pramen.extras.source.KafkaAvroSource._

private val log = LoggerFactory.getLogger(this.getClass)

private val kafkaColumnName = ConfigUtils.getOptionString(sourceConfig, CUSTOM_KAFKA_COLUMN_KEY).getOrElse("kafka")
private val keyColumnName = ConfigUtils.getOptionString(sourceConfig, KEY_COLUMN_KEY).getOrElse("kafka_key")

override def hasInfoDateColumn(query: Query): Boolean = false

override def getOffsetInfo: Option[OffsetInfo] = {
Some(OffsetInfo("kafka_offset", OffsetType.KafkaType))
Some(OffsetInfo(kafkaColumnName, OffsetType.KafkaType))
}

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
Expand Down Expand Up @@ -187,24 +199,39 @@ class KafkaAvroSource(sourceConfig: Config,

val df1 = dfRaw
.withColumn("data", from_avro(col("value"), abrisValueConfig))
.withColumn(KAFKA_PARTITION_FIELD, col("partition"))
.withColumn(KAFKA_OFFSET_FIELD, col("offset"))
.withColumn("kafka_timestamp", col("timestamp"))
.withColumn("kafka_timestamp_type", col("timestampType"))
.withColumn("tmp_pramen_kafka", struct(
col("partition"),
col("offset"),
col("timestamp"),
col("timestampType").as("timestamp_type")
))

val df2 = kafkaAvroConfig.keyNamingStrategy match {
case Some(keyNamingStrategy) =>
val abrisKeyConfig = keyNamingStrategy
.applyNamingStrategyToAbrisConfig(abrisValueBase, topic, isKey = true)
.usingSchemaRegistry(schemaRegistryClientConfig)
df1.withColumn("kafka_key", from_avro(col("key"), abrisKeyConfig))
df1.withColumn("tmp_pramen_kafka_key", from_avro(col("key"), abrisKeyConfig))
case None =>
df1.withColumn("kafka_key", col("key"))
df1.withColumn("tmp_pramen_kafka_key", col("key"))
}

val payloadFields = df2.select("data.*").schema.fieldNames.toSet
if (payloadFields.contains(kafkaColumnName)) {
log.warn(s"Payload field '$kafkaColumnName' conflicts with Kafka metadata struct name and will be replaced.")
}
if (payloadFields.contains(keyColumnName)) {
log.warn(s"Payload field '$keyColumnName' conflicts with Kafka key column name and will be replaced.")
}

// Put data fields to the root level of the schema:
// Put data fields to the root level of the schema, and if data struct already has kafka_key and kafka fields,
// drop them
val dfFinal = df2
.select(KAFKA_PARTITION_FIELD, KAFKA_OFFSET_FIELD, "kafka_timestamp", "kafka_timestamp_type", "kafka_key", "data.*")
.select("tmp_pramen_kafka_key", "data.*", "tmp_pramen_kafka")
.drop(kafkaColumnName)
.drop(keyColumnName)
.withColumnRenamed("tmp_pramen_kafka", kafkaColumnName)
.withColumnRenamed("tmp_pramen_kafka_key", keyColumnName)

SourceResult(dfFinal)
}
Expand All @@ -214,6 +241,8 @@ class KafkaAvroSource(sourceConfig: Config,

object KafkaAvroSource extends ExternalChannelFactoryV2[KafkaAvroSource] {
val TOPIC_NAME_KEY = "topic.name"
val CUSTOM_KAFKA_COLUMN_KEY = "custom.kafka.column"
val KEY_COLUMN_KEY = "key.column.name"

val KAFKA_TOKENS_TO_REDACT = Set("password", "jaas.config", "auth.user.info")

Expand Down