Skip to content

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Oct 21, 2025

Closes #644

Summary by CodeRabbit

  • New Features

    • Avro-based Kafka source and sink and per-partition Kafka offset tracking for incremental runs
  • Improvements

    • Consolidated/flattened Kafka & Schema Registry configuration; improved redaction and naming-strategy support
    • Increased max offset string length; updated Avro library
  • Documentation

    • Expanded Kafka Avro examples and config guidance
  • Tests

    • Added tests for Kafka offsets, config utilities, writers and DB fixtures
  • Chores

    • Project/version and dependency updates

@coderabbitai
Copy link

coderabbitai bot commented Oct 21, 2025

Walkthrough

Adds Kafka Avro integration: new Kafka offset types and tests, a KafkaAvroSource for incremental Avro reads, refactors writer config into KafkaAvroConfig/KafkaAvroWriterConfig, updates TableWriterKafka/KafkaAvroSink, adjusts bookkeeper offset handling and DB schema, updates docs, tests, and BOMs.

Changes

Cohort / File(s) Summary
API: Kafka offset types
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/KafkaPartition.scala, pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala, pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala
Add KafkaPartition, KafkaType, KafkaValue; constants for partition/offset fields; parsing, serialization, comparison, increment, and OffsetType.fromString mapping.
API Tests
pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala
Add tests for KafkaValue creation, serialization, Spark literal mapping, compare semantics and error cases.
Core: bookkeeper & offsets
pramen/core/src/main/scala/.../OffsetManagerJdbc.scala, pramen/core/src/main/scala/.../OffsetManagerUtils.scala, pramen/core/src/main/scala/.../bookkeeper/model/OffsetRecords.scala
Extract min/max helper; add Kafka-specific grouping to produce per-partition KafkaValue; remove fixed length constraints on min/max DB columns.
Core: ingestion & notifications
pramen/core/src/main/scala/.../IncrementalIngestionJob.scala, pramen/core/src/main/scala/.../PipelineNotificationBuilderHtml.scala
Validate Kafka offset/partition columns and types; add Succeeded-case record-count rendering for Sink tasks.
Core Tests & utils
pramen/core/src/test/scala/.../OffsetManagerJdbcSuite.scala, pramen/core/src/test/scala/.../OffsetManagerUtilsSuite.scala, pramen/core/src/test/scala/.../OffsetRecordFactory.scala
Add tests and a test factory for Kafka offset handling and getMinMaxOffsets behavior.
Extras: Kafka Avro source & sink
pramen/extras/src/main/scala/.../source/KafkaAvroSource.scala, pramen/extras/src/main/scala/.../sink/KafkaAvroSink.scala
New KafkaAvroSource (incremental Kafka read + Avro decode via ABRiS/schema registry); KafkaAvroSink updated to use KafkaAvroWriterConfig.
Extras: writer refactor
pramen/extras/src/main/scala/.../writer/TableWriterKafka.scala, pramen/extras/src/main/scala/.../writer/model/KafkaWriterConfig.scala (removed), pramen/extras/src/main/scala/.../writer/model/KafkaAvroConfig.scala, pramen/extras/src/main/scala/.../writer/model/KafkaAvroWriterConfig.scala
Remove legacy KafkaWriterConfig; add KafkaAvroConfig and KafkaAvroWriterConfig; adapt TableWriterKafka and factory to the new config shape and centralized writer config.
Extras: naming strategy & config utils
pramen/extras/src/main/scala/.../writer/model/NamingStrategy.scala, pramen/extras/src/main/scala/.../utils/ConfigUtils.scala
Expose naming strategy constants and add ABRiS config application helper; add renderExtraOptions and change logExtraOptions to use redaction tokens and consolidated rendering.
Extras: tests, fixtures & mocks
pramen/extras/src/test/scala/.../mocks/DummyKafkaConfigFactory.scala, pramen/extras/src/test/scala/.../tests/writer/model/KafkaAvroWriterConfigSuite.scala, pramen/extras/src/test/scala/.../utils/ConfigUtilsSuite.scala, pramen/extras/src/test/scala/.../fixtures/RelationalDbFixture.scala, pramen/extras/src/test/scala/.../tests/writer/TableWriterKafkaSuite.scala
Tests updated for new config shape; added ConfigUtils tests; added HSQLDB fixture; update TableWriterKafka tests for flattened kafka / schema.registry keys.
Build & versions
pramen/pom.xml, pramen/api/pom.xml, pramen/core/pom.xml, pramen/extras/pom.xml, pramen/project/Versions.scala
Bump project to 1.12.8-SNAPSHOT; upgrade ABRiS to 6.4.1 for Spark 3.x; remove kafka-avro-serializer; add HSQLDB test dependency.
Docs
README.md
Add Kafka Avro source/sink documentation and examples showing new config structure and schema registry usage.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Source as KafkaAvroSource
    participant OffMgr as OffsetManager
    participant Kafka as Kafka Cluster
    participant Registry as Schema Registry (ABRiS)

    User->>Source: readIncremental(start?, end?)
    Source->>OffMgr: validate/resolve offsets (KafkaValue)
    OffMgr-->>Source: resolved KafkaValue

    rect rgba(220,240,250,0.8)
        Source->>Kafka: read(topic, startingOffsets, endingOffsets)
        Kafka-->>Source: records (key, value, partition, offset, timestamp)
    end

    rect rgba(220,250,240,0.8)
        Source->>Registry: fetch schema & decode Avro (value ± key)
        Registry-->>Source: decoded columns
    end

    Source->>Source: flatten columns + attach metadata
    Source-->>User: SourceResult(DataFrame)
Loading
sequenceDiagram
    participant Utils as OffsetManagerUtils
    participant DF as Spark DataFrame
    participant OffMgr as OffsetManager
    participant DB as Offsets DB

    OffMgr->>DF: groupBy(kafka_partition).agg(min(kafka_offset), max(kafka_offset))
    DF-->>Utils: per-partition min/max rows
    Utils->>Utils: build KafkaValue(Seq[KafkaPartition(part, offset)])
    Utils-->>OffMgr: (minKafkaValue, maxKafkaValue)
    OffMgr->>DB: persist min_offset / max_offset (serialized)
    DB-->>OffMgr: last stored offsets
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 I hopped through configs, avro and stream,
Partitions counted, offsets in a dream.
Schemas fetched, decoded — fields set free,
Tests and fixtures stitched each boundary.
A tiny rabbit cheers — Kafka flows with glee! 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Out of Scope Changes Check ⚠️ Warning While the primary objective of adding a Kafka/Avro source is well-scoped and implemented, the pull request includes several changes that extend beyond the stated issue #644 requirements. The extensive refactoring of KafkaAvroSink and sink-related configuration classes (replacing KafkaWriterConfig with KafkaAvroWriterConfig, TableWriterKafka refactoring) addresses sinks rather than sources, which is not explicitly requested in the issue. Additionally, the PipelineNotificationBuilderHtml change to display record counts for sinks appears tangential to source functionality. While the sink refactoring uses shared Kafka/Avro infrastructure, it represents a scope expansion beyond the stated objective of adding a source. Consider separating sink-related changes into a separate PR or issue if they are not explicitly required for the source functionality. If the sink refactoring is intentional and part of a broader Kafka/Avro integration effort, the PR description and/or linked issues should be clarified to reflect this scope. Specifically, evaluate whether the KafkaAvroSink refactoring, TableWriterKafka changes, and PipelineNotificationBuilderHtml modifications are necessary dependencies for the source implementation, or if they should be deferred to a follow-up PR.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "#644 Add Kafka Avro batch source" directly and clearly describes the main change in this pull request. It specifically identifies the new feature being added (Kafka Avro source) and includes a helpful issue reference. The title is concise, specific, and avoids vague terminology. While the PR contains additional related changes beyond just the source implementation, the title appropriately captures the primary objective and is consistent with the issue being closed.
Linked Issues Check ✅ Passed The code changes effectively implement the core requirements from issue #644 to add an experimental Kafka/Avro source. The implementation includes the KafkaAvroSource class, Kafka offset tracking infrastructure (KafkaPartition, KafkaType, KafkaValue), offset management utilities, and Avro schema registry integration using Abris. This aligns with the second proposed solution approach in the issue: "Use kafka batch and save offsets in the offsets bookkeeping table." The source properly handles incremental ingestion, partition offsets, and Avro decoding, meeting the coding objectives required by the linked issue.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/644-kafka-batch-source

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 17d3c63 and 3537be2.

📒 Files selected for processing (1)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (5)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (5)

21-21: LGTM: Import additions support new ABRIS integration.

The ABRIS config fragment imports are necessary for the new applyNamingStrategyToAbrisConfig method.


47-60: LGTM: Subject resolution logic is correct and well-documented.

The pattern matching correctly handles all naming strategies. The isKey parameter is appropriately used only for TOPIC_NAME strategy, as record-based strategies in ABRIS derive subjects from record name/namespace rather than key/value distinction.


81-103: LGTM: Previous bug fix confirmed, excellent error context.

The critical bug from the previous review has been properly fixed—isKey is now correctly passed through on line 85. The addition of partStr on line 82 provides valuable context in error messages, clearly indicating whether the issue is with key or value configuration.


105-108: LGTM: Clean validation helper with clear error messages.

The helper properly validates required configuration and provides descriptive error messages when values are missing.


112-118: LGTM: Appropriate visibility change for external use.

Making these constants public enables external code to reference the naming strategy values consistently. The addition of SCHEMA_RECORD_NAMESPACE completes the configuration key set.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (24)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/utils/ConfigUtilsSuite.scala (1)

83-103: Remove unnecessary stripMargin calls.

Both test cases use .stripMargin on strings that contain no | character. While harmless, this is unnecessary and slightly misleading.

Apply this diff to remove the unnecessary calls:

       val expected =
-        """option1 = "value1";password = "pwd"""".stripMargin
+        """option1 = "value1";password = "pwd""""

       assert(rendered.mkString(";") == expected)
     }

     "should redact sensitive tokens" in {
       val rendered = ConfigUtils.renderExtraOptions(Map(
         "option1" -> "value1",
         "some.password" -> "pwd"), Set("password"))
       val expected =
-        """option1 = "value1";some.password = [redacted]""".stripMargin
+        """option1 = "value1";some.password = [redacted]"""

       assert(rendered.mkString(";") == expected)
     }
pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala (1)

584-584: Special handling for Sink jobs looks correct.

The logic appropriately distinguishes Sink jobs to display only the raw record count without difference indicators, which makes semantic sense for sink operations.

For more idiomatic Scala, consider using pattern matching instead of isInstanceOf:

       task.runStatus match {
         case s: Succeeded        =>
           s.recordCount match {
-            case Some(recordCount) if task.taskDef.jobType.isInstanceOf[JobType.Sink] => renderDifference(recordCount, None, None)
-            case Some(recordCount) => renderDifference(recordCount, s.recordCountOld, s.recordsAppended)
+            case Some(recordCount) =>
+              task.taskDef.jobType match {
+                case _: JobType.Sink => renderDifference(recordCount, None, None)
+                case _ => renderDifference(recordCount, s.recordCountOld, s.recordsAppended)
+              }
             case None => "-"
           }
pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (1)

45-49: Simplify afterAll - unnecessary connection creation.

The afterAll method creates a new connection just to close it immediately. For an in-memory HSQLDB database, this is unnecessary as the database will be cleaned up when the JVM exits. Consider removing this or only closing if you're maintaining a persistent connection field.

Apply this diff to simplify:

   override protected def afterAll(): Unit = {
-    val connection = getConnection
-    if (connection != null) connection.close()
     super.afterAll()
   }
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetRecordFactory.scala (1)

21-41: Add a tiny helper to build Kafka offset JSON for tests (reduces string typos).

Constructing strings like {"0":099,"1":01} inline is error‑prone. A small helper keeps tests DRY and deterministic.

Apply this addition:

 object OffsetRecordFactory {
+  /** Builds a canonical Kafka offset JSON string from (partition -> offset) pairs. */
+  def kafkaOffsetJson(parts: Seq[(Int, Long)]): String =
+    parts.sortBy(_._1).map { case (p, o) => s""""$p":$o""" }.mkString("{", ",", "}")
+
   def getOffsetRecord(pramenTableName: String = "table1",
pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala (1)

219-219: 512-char limit: OK for now; make message precise and consider configurability.

  • Error says “bytes” but check is on characters; adjust wording.
  • If topics can have many partitions, consider a config knob rather than a hard-coded cap.

Apply this wording tweak:

-          throw new IllegalArgumentException(s"Offset value '$s' is bigger than $MAX_STRING_OFFSET_CHARACTERS bytes")
+          throw new IllegalArgumentException(s"Offset value '$s' is longer than $MAX_STRING_OFFSET_CHARACTERS characters")
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala (1)

411-426: Nice coverage for Kafka min/max; consider clarifying intent of leading zeros.

Either add a brief comment that leading zeros test normalization, or build these strings via a helper (see OffsetRecordFactory.kafkaOffsetJson).

pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala (1)

178-181: Good extraction; add pre-validation for Kafka partition layouts to fail fast.

Refactor is clean. For Kafka offsets, consider validating that all records have the same partition set before computing min/max to avoid deep compare errors.

Example addition (outside this hunk) near validateOffsets:

// Inside validateOffsets(...)
if (inconsistentOffsets.length == 1 && inconsistentOffsets.head == "kafka") {
  val partsSets = offsets.flatMap(or => OffsetValue.fromString("kafka", or.minOffset))
    .map(_.asInstanceOf[OffsetValue.KafkaValue].value.map(_.partition).sorted.distinct.mkString(","))
    .distinct
  if (partsSets.length > 1)
    throw new RuntimeException(s"Inconsistent Kafka partition layouts found for $table at $infoDate.")
}

Also applies to: 183-189

pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala (1)

304-321: Kafka column validation should use case-insensitive lookup and friendly errors.

Using df.schema(name) is case-sensitive and throws a generic error if missing. Align with earlier checks by using equalsIgnoreCase and explicit messages.

Apply this diff:

-      case OffsetType.KafkaType    =>
-        val offsetField = df.schema(KAFKA_OFFSET_FIELD)
-        val partitionField = df.schema(KAFKA_PARTITION_FIELD)
-
-        if (offsetField.dataType != LongType) {
+      case OffsetType.KafkaType    =>
+        val offsetFieldOpt = df.schema.fields.find(_.name.equalsIgnoreCase(KAFKA_OFFSET_FIELD))
+        val partitionFieldOpt = df.schema.fields.find(_.name.equalsIgnoreCase(KAFKA_PARTITION_FIELD))
+        val offsetField = offsetFieldOpt.getOrElse(
+          throw new IllegalArgumentException(s"Kafka offset column '$KAFKA_OFFSET_FIELD' not found in the output table '${outputTable.name}'.")
+        )
+        val partitionField = partitionFieldOpt.getOrElse(
+          throw new IllegalArgumentException(s"Kafka partition column '$KAFKA_PARTITION_FIELD' not found in the output table '${outputTable.name}'.")
+        )
+
+        if (offsetField.dataType != LongType) {
           throw new IllegalArgumentException(s"Kafka offset column '$KAFKA_OFFSET_FIELD' has type '${offsetField.dataType}'. " +
             s"But only '${LongType.typeName}' is supported for offset type '${offsetInfo.offsetType.dataTypeString}'.")
         }
 
-        if (partitionField.dataType != IntegerType) {
+        if (partitionField.dataType != IntegerType) {
           throw new IllegalArgumentException(s"Kafka partition column '$KAFKA_PARTITION_FIELD' has type '${partitionField.dataType}'. " +
             s"But only '${IntegerType.typeName}' is supported for offset type '${offsetInfo.offsetType.dataTypeString}'.")
         }
pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala (2)

84-89: Add parsing robustness tests (whitespace and invalid tokens).

Strengthen fromString("kafka", ...) by testing inputs with spaces (e.g., { "0" : 10 , "1" : 20 }) and invalid offsets (e.g., {"0":ten}) expecting IllegalArgumentException. If needed, trim tokens in parser.


143-176: Consider testing KafkaValue.increment and non-sorted partition inputs.

  • Add a test that increment bumps all partition offsets by 1.
  • Add a test that constructing KafkaValue with partitions out of order still yields the same valueString ordering.
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala (2)

31-42: Avoid double collect; use named access and assert required columns.

Minor perf/robustness: collect once, read by column name, and fail fast if Kafka columns missing.

-      if (offsetType == OffsetType.KafkaType) {
-        val aggregatedDf = df.groupBy(col(KAFKA_PARTITION_FIELD))
-          .agg(
-            min(col(KAFKA_OFFSET_FIELD)).as("min_offset"),
-            max(col(KAFKA_OFFSET_FIELD)).as("max_offset")
-          ).orderBy(KAFKA_PARTITION_FIELD)
-
-        val minValue = OffsetValue.KafkaValue(aggregatedDf.collect().map(row => KafkaPartition(row.getAs[Int](0), row.getAs[Long](1))).toSeq)
-        val maxValue = OffsetValue.KafkaValue(aggregatedDf.collect().map(row => KafkaPartition(row.getAs[Int](0), row.getAs[Long](2))).toSeq)
-
-        Some(minValue, maxValue)
-      } else {
+      if (offsetType == OffsetType.KafkaType) {
+        require(df.columns.contains(KAFKA_PARTITION_FIELD) && df.columns.contains(KAFKA_OFFSET_FIELD),
+          s"Kafka offset computation requires columns '$KAFKA_PARTITION_FIELD' and '$KAFKA_OFFSET_FIELD'.")
+
+        val aggregatedDf = df.groupBy(col(KAFKA_PARTITION_FIELD))
+          .agg(
+            min(col(KAFKA_OFFSET_FIELD)).as("min_offset"),
+            max(col(KAFKA_OFFSET_FIELD)).as("max_offset")
+          )
+          .orderBy(col(KAFKA_PARTITION_FIELD))
+
+        val parts = aggregatedDf.collect()
+        val minValue = OffsetValue.KafkaValue(parts.map(r =>
+          KafkaPartition(r.getAs[Int](KAFKA_PARTITION_FIELD), r.getAs[Long]("min_offset"))).toSeq)
+        val maxValue = OffsetValue.KafkaValue(parts.map(r =>
+          KafkaPartition(r.getAs[Int](KAFKA_PARTITION_FIELD), r.getAs[Long]("max_offset"))).toSeq)
+
+        Some(minValue, maxValue)
+      } else {

Please confirm df.isEmpty is acceptable for your Spark version and data sizes; it triggers a job.


50-52: Optional: add basic validation for Kafka offsets.

Non‑Kafka values are validated; consider asserting non‑negative Kafka offsets and non‑empty partition set upstream or here.

pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (2)

83-92: KafkaValue implementation is coherent; consider invariants.

Enforce unique partitions and non‑negative offsets at construction to fail fast on bad data.

 case class KafkaValue(value: Seq[KafkaPartition]) extends OffsetValue {
   override val dataType: OffsetType = OffsetType.KafkaType
+  require(value.nonEmpty, "KafkaValue must contain at least one partition.")
+  private val partitions = value.map(_.partition)
+  require(partitions.distinct.size == partitions.size, s"Duplicate Kafka partitions in value: $partitions")
+  require(value.forall(_.offset >= 0), s"Kafka offsets must be non-negative: ${value.map(_.offset)}")

Also applies to: 95-123, 125-128


139-153: Make Kafka offset parsing whitespace‑tolerant and safer.

Trim tokens and guard split arity to avoid surprises with user‑supplied strings like { "0": 123 }.

-        case KAFKA_TYPE_STR =>
-          try {
-            Some(KafkaValue(
-              value
-                .replaceAll("[{}\"]", "")
-                .split(",")
-                .filter(_.nonEmpty)
-                .map { part =>
-                  val Array(partStr, offsetStr) = part.split(":")
-                  KafkaPartition(partStr.toInt, offsetStr.toLong)
-                }.toSeq
-            ))
-          } catch {
-            case NonFatal(ex) => throw new IllegalArgumentException(s"Unexpected Kafka offset: '$value'. Expected a JSON mapping from partition to offset.", ex)
-          }
+        case KAFKA_TYPE_STR =>
+          try {
+            val parts = value
+              .replaceAll("[{}\"]", "")
+              .split(",")
+              .iterator
+              .map(_.trim)
+              .filter(_.nonEmpty)
+              .toSeq
+            Some(KafkaValue(
+              parts.map { part =>
+                val kv = part.split(":").map(_.trim)
+                require(kv.length == 2, s"Unexpected Kafka offset entry: '$part'. Expected '<partition>:<offset>'.")
+                KafkaPartition(kv(0).toInt, kv(1).toLong)
+              }
+            ))
+          } catch {
+            case NonFatal(ex) =>
+              throw new IllegalArgumentException(s"Unexpected Kafka offset: '$value'. Expected a JSON mapping from partition to offset.", ex)
+          }
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (1)

133-135: Behavior change: dropped table-level writer overrides.

Previously, extra writer options merged with per-table options. Now only KafkaAvroWriterConfig is used and “options” (except topic) aren’t passed to TableWriterKafka. Confirm this is intentional and not breaking existing pipelines that set, e.g., limit.records or schema IDs per table.

If you want to support light per-table overrides without expanding the API, consider merging a safe subset before constructing the writer.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfig.scala (1)

39-47: Validation messages: fix grammar for user-facing errors.

Minor text polish to reduce confusion.

-      throw new IllegalArgumentException(s"If key strategy is defined, column names must be define too. " +
+      throw new IllegalArgumentException(s"If key strategy is defined, column names must be defined too. " +
         s"Please, define '<job>.key.column.names'")
...
-      throw new IllegalArgumentException(s"If key columns are defined, naming strategy for keys need to be defined too. " +
+      throw new IllegalArgumentException(s"If key columns are defined, the naming strategy for keys needs to be defined too. " +
         s"Please, define '<job>.schema.registry.key.naming.strategy'")
pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/DummyKafkaConfigFactory.scala (1)

28-36: Minor: name the last argument for clarity.

The final argument is positional; naming it improves readability and future-proofing.

-    KafkaAvroConfig(
+    KafkaAvroConfig(
       brokers = brokers,
       schemaRegistryUrl = schemaRegistryUrl,
       keyNamingStrategy = keyNamingStrategy,
       valueNamingStrategy = valueNamingStrategy,
       extraOptions = extraOptions,
-      schemaRegistryExtraOptions
+      schemaRegistryExtraOptions = schemaRegistryExtraOptions
     )
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (2)

49-58: Docs: “Producer” → “Consumer”.

These are consumer options for reads.

-  *      # Arbitrary options for creating a Kafka Producer
+  *      # Arbitrary options for creating a Kafka Consumer

59-74: Docs: fix typo in schema registry URL.

“regictry” → “registry”.

-  *      url = "https://my.schema.regictry:8081"
+  *      url = "https://my.schema.registry:8081"
pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/model/KafkaAvroWriterConfigSuite.scala (2)

35-44: “Minimalistic config” still includes key strategy/columns due to legacy paths; update keys and assertions.

You’re removing writer.kafka.* (old), not the new keys. As a result the test isn’t exercising the minimal path (value strategy only, no key strategy/columns).

Apply this to actually test the minimal config and expectations:

-      val kafkaConfig = fromConfig(conf
-        .withoutPath("writer.kafka.schema.registry.key.naming.strategy")
-      .withoutPath("writer.kafka.key.column.names"))
+      val kafkaConfig = fromConfig(
+        conf
+          .withoutPath("schema.registry.key.naming.strategy")
+          .withoutPath("key.column.names")
+      )

-      assert(kafkaConfig.kafkaAvroConfig.brokers == "localhost:9092")
-      assert(kafkaConfig.kafkaAvroConfig.schemaRegistryUrl == "localhost:8081")
-      assert(kafkaConfig.kafkaAvroConfig.valueNamingStrategy.namingStrategy == "topic.name")
-      assert(kafkaConfig.keyColumns == Seq("a", "b"))
+      assert(kafkaConfig.kafkaAvroConfig.brokers == "localhost:9092")
+      assert(kafkaConfig.kafkaAvroConfig.schemaRegistryUrl == "localhost:8081")
+      assert(kafkaConfig.kafkaAvroConfig.valueNamingStrategy.namingStrategy == "topic.name")
+      assert(kafkaConfig.kafkaAvroConfig.keyNamingStrategy.isEmpty)
+      assert(kafkaConfig.keyColumns.isEmpty)

72-73: Tighten exception assertions and fix grammar (“defined”).

Current messages use “define” instead of “defined”. Suggest fixing messages in KafkaAvroWriterConfig.fromConfig and adjusting assertions to be robust.

Tests:

-        assert(ex.getMessage.contains("column names must be define too"))
+        assert(ex.getMessage.toLowerCase.contains("column names must be defined"))
-      assert(ex.getMessage.contains("naming strategy for keys need to be defined too"))
+      assert(ex.getMessage.toLowerCase.contains("naming strategy for keys must be defined"))

And in production (KafkaAvroWriterConfig.scala):

-      throw new IllegalArgumentException(s"If key strategy is defined, column names must be define too. " +
+      throw new IllegalArgumentException(s"If key strategy is defined, column names must be defined too. " +
         s"Please, define '<job>.key.column.names'")
...
-      throw new IllegalArgumentException(s"If key columns are defined, naming strategy for keys need to be defined too. " +
+      throw new IllegalArgumentException(s"If key columns are defined, a naming strategy for keys must be defined. " +
         s"Please, define '<job>.schema.registry.key.naming.strategy'")

I can raise a tiny follow-up PR to apply the wording change and adjust tests.

Also applies to: 81-82

pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (3)

27-30: Avoid cross‑module coupling for redaction tokens.

Importing KAFKA_TOKENS_TO_REDACT from the source package couples writer to reader internals. Move tokens to a shared constants object (e.g., extras.security.RedactionTokens) and use from both places.

- import za.co.absa.pramen.extras.source.KafkaAvroSource.KAFKA_TOKENS_TO_REDACT
+ import za.co.absa.pramen.extras.security.RedactionTokens.KafkaTokensToRedact
...
-        KAFKA_TOKENS_TO_REDACT
+        KafkaTokensToRedact

39-51: Double action on DataFrame (count() then write); consider caching to avoid recomputation.

Counting before write causes a second full evaluation on save(). Optional optimization:

-    val count = dfOut.count()
+    val cached = dfOut.persist()
+    val count = cached.count()
...
-      dfOut.write
+      cached.write
         .format("kafka")
         .option("topic", topicName)
         .option("kafka.bootstrap.servers", writerConf.kafkaAvroConfig.brokers)
         .options(writerConf.kafkaAvroConfig.extraOptions)
         .save()
+      cached.unpersist()

88-94: Safe access to keyNamingStrategy already guarded; consider small readability tweak.

You pattern‑match earlier, so .get is safe. Optional: bind the Option value in the match to avoid re‑fetching it here.

-    val dfOut = writerConf.kafkaAvroConfig.keyNamingStrategy match {
-      case Some(_) => getKeyValueDataFrame(df)
+    val dfOut = writerConf.kafkaAvroConfig.keyNamingStrategy match {
+      case Some(_) => getKeyValueDataFrame(df)
       case None    => getValueDataFrame(df)
     }

Or pass the NamingStrategy explicitly in the Some(_) branch for clarity:

writerConf.kafkaAvroConfig.keyNamingStrategy match {
  case Some(ns) => getKeyValueDataFrame(df) // and use ns instead of .get below
  case None     => getValueDataFrame(df)
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9c85c87 and 097ae65.

📒 Files selected for processing (30)
  • README.md (2 hunks)
  • pramen/api/pom.xml (1 hunks)
  • pramen/api/src/main/scala/za/co/absa/pramen/api/offset/KafkaPartition.scala (1 hunks)
  • pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (2 hunks)
  • pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (4 hunks)
  • pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala (1 hunks)
  • pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala (3 hunks)
  • pramen/core/pom.xml (1 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala (1 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala (1 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala (1 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala (1 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala (2 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala (1 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala (2 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetRecordFactory.scala (1 hunks)
  • pramen/extras/pom.xml (2 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (5 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (1 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (1 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (6 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (1 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfig.scala (1 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaWriterConfig.scala (0 hunks)
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (1 hunks)
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/DummyKafkaConfigFactory.scala (1 hunks)
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/model/KafkaAvroWriterConfigSuite.scala (4 hunks)
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/utils/ConfigUtilsSuite.scala (1 hunks)
  • pramen/pom.xml (3 hunks)
  • pramen/project/Versions.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaWriterConfig.scala
🧰 Additional context used
🧬 Code graph analysis (18)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetRecordFactory.scala (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala (5)
  • core (147-158)
  • core (160-167)
  • core (169-181)
  • core (183-189)
  • core (197-202)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecord.scala (1)
  • OffsetRecord (19-28)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala (4)
  • dataType (24-24)
  • minOffset (25-25)
  • maxOffset (26-26)
  • batchId (27-27)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetRecordFactory.scala (2)
  • OffsetRecordFactory (21-42)
  • getOffsetRecord (22-41)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala (3)
  • dataType (24-24)
  • minOffset (25-25)
  • maxOffset (26-26)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (4)
  • valueString (41-41)
  • valueString (56-56)
  • valueString (71-71)
  • valueString (86-91)
pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala (3)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (15)
  • OffsetValue (34-157)
  • KafkaValue (83-128)
  • valueString (41-41)
  • valueString (56-56)
  • valueString (71-71)
  • valueString (86-91)
  • getSparkLit (43-43)
  • getSparkLit (58-58)
  • getSparkLit (73-73)
  • getSparkLit (93-93)
  • fromString (131-156)
  • compareTo (45-50)
  • compareTo (60-65)
  • compareTo (75-80)
  • compareTo (95-123)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/KafkaPartition.scala (1)
  • KafkaPartition (19-19)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (7)
  • OffsetType (29-67)
  • KafkaType (54-58)
  • getSparkCol (39-39)
  • getSparkCol (45-45)
  • getSparkCol (51-51)
  • getSparkCol (57-57)
  • fromString (60-66)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/utils/ConfigUtilsSuite.scala (1)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)
  • ConfigUtils (24-205)
  • renderExtraOptions (97-110)
pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala (4)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala (1)
  • taskDef (57-57)
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala (1)
  • taskDef (44-46)
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobBaseDummy.scala (1)
  • jobType (37-37)
pramen/api/src/main/scala/za/co/absa/pramen/api/status/JobType.scala (1)
  • JobType (26-64)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala (4)
  • minOffset (25-25)
  • maxOffset (26-26)
  • infoDate (23-23)
  • dataType (24-24)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffsetAggregated.scala (1)
  • DataOffsetAggregated (23-29)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecordConverter.scala (2)
  • OffsetRecordConverter (23-50)
  • toDataOffset (24-49)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (2)
  • OffsetValue (34-157)
  • fromString (131-156)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (3)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (6)
  • pramen (66-66)
  • pramen (68-78)
  • pramen (80-86)
  • pramen (88-98)
  • pramen (100-118)
  • pramen (120-138)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfig.scala (3)
  • KafkaAvroWriterConfig (22-28)
  • KafkaAvroWriterConfig (30-57)
  • fromConfig (36-56)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (1)
  • fromConfig (41-73)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfig.scala (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (6)
  • pramen (66-66)
  • pramen (68-78)
  • pramen (80-86)
  • pramen (88-98)
  • pramen (100-118)
  • pramen (120-138)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (3)
  • fromConfig (41-73)
  • KafkaAvroConfig (22-29)
  • KafkaAvroConfig (31-74)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (2)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (2)
  • OffsetType (29-67)
  • KafkaType (54-58)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/KafkaPartition.scala (1)
  • KafkaPartition (19-19)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala (4)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/KafkaPartition.scala (1)
  • KafkaPartition (19-19)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (2)
  • OffsetType (29-67)
  • KafkaType (54-58)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (2)
  • OffsetValue (34-157)
  • KafkaValue (83-128)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala (2)
  • OffsetManagerUtils (26-57)
  • getMinMaxValueFromData (27-56)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (4)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (2)
  • KafkaAvroSource (102-214)
  • KafkaAvroSource (216-225)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (3)
  • ConfigUtils (24-205)
  • logExtraOptions (79-87)
  • getExtraOptions (59-66)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfig.scala (3)
  • KafkaAvroWriterConfig (22-28)
  • KafkaAvroWriterConfig (30-57)
  • fromConfig (36-56)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (1)
  • fromConfig (41-73)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/DummyKafkaConfigFactory.scala (4)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (6)
  • pramen (66-66)
  • pramen (68-78)
  • pramen (80-86)
  • pramen (88-98)
  • pramen (100-118)
  • pramen (120-138)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (2)
  • KafkaAvroConfig (22-29)
  • KafkaAvroConfig (31-74)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfig.scala (2)
  • KafkaAvroWriterConfig (22-28)
  • KafkaAvroWriterConfig (30-57)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/DummyNamingStrategyFactory.scala (2)
  • DummyNamingStrategyFactory (21-30)
  • getDummyNamingStrategy (22-29)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/model/KafkaAvroWriterConfigSuite.scala (3)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (6)
  • pramen (66-66)
  • pramen (68-78)
  • pramen (80-86)
  • pramen (88-98)
  • pramen (100-118)
  • pramen (120-138)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfig.scala (3)
  • KafkaAvroWriterConfig (22-28)
  • KafkaAvroWriterConfig (30-57)
  • fromConfig (36-56)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (1)
  • fromConfig (41-73)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (4)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (6)
  • pramen (66-66)
  • pramen (68-78)
  • pramen (80-86)
  • pramen (88-98)
  • pramen (100-118)
  • pramen (120-138)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)
  • ConfigUtils (24-205)
  • getExtraOptions (59-66)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfig.scala (1)
  • fromConfig (36-56)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (1)
  • fromConfigOpt (56-79)
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala (2)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (1)
  • OffsetValue (34-157)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (3)
  • OffsetType (29-67)
  • StringType (48-52)
  • KafkaType (54-58)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (2)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (1)
  • fromString (131-156)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala (1)
  • dataType (24-24)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (5)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (7)
  • OffsetValue (34-157)
  • KafkaValue (83-128)
  • increment (125-127)
  • valueString (41-41)
  • valueString (56-56)
  • valueString (71-71)
  • valueString (86-91)
pramen/api/src/main/scala/za/co/absa/pramen/api/Query.scala (1)
  • Query (28-56)
pramen/api/src/main/scala/za/co/absa/pramen/api/SourceResult.scala (1)
  • SourceResult (21-25)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)
  • ConfigUtils (24-205)
  • logExtraOptions (79-87)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (3)
  • KafkaAvroConfig (22-29)
  • KafkaAvroConfig (31-74)
  • fromConfig (41-73)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala (4)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (3)
  • OffsetValue (34-157)
  • KafkaValue (83-128)
  • fromString (131-156)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/KafkaPartition.scala (1)
  • KafkaPartition (19-19)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (3)
  • OffsetType (29-67)
  • KafkaType (54-58)
  • StringType (48-52)
pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala (1)
  • validateOffsetValue (239-249)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (26)
README.md (1)

1042-1079: Kafka sink operation example is comprehensive and consistent.

The operation example properly demonstrates:

  • Dependencies configuration (lines 1049-1054)
  • Table definition with input/output mapping (lines 1056-1077)
  • Optional date range specification (lines 1065-1068)
  • Schema transformations, filters, and column selection (lines 1069-1075)

The structure aligns well with other operation examples in the document and provides sufficient detail for users to understand how to configure Kafka sink operations.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)

68-87: LGTM! Cleaner consolidated logging approach.

The refactored logExtraOptions now delegates rendering to renderExtraOptions and outputs a single consolidated log message. The parameter rename from redactedKeys to redactedTokens better reflects the substring-matching behavior.


89-110: LGTM! Well-structured redaction helper.

The renderExtraOptions method correctly:

  • Sorts entries by key for deterministic output
  • Performs case-insensitive token matching by converting keys to lowercase
  • Quotes non-redacted values consistently
  • Returns a stable sequence format
pramen/pom.xml (2)

25-25: LGTM: Version bump to 1.12.8-SNAPSHOT.

The version update is consistent with the multi-module project structure and aligns with other module POM updates.


116-116: LGTM: Abris version updated to 6.4.1.

The update aligns with the corresponding change in pramen/project/Versions.scala (line 130) and is consistent across the build system.

pramen/project/Versions.scala (1)

130-130: LGTM: Abris version updated to 6.4.1.

The update is consistent with the Maven POM change (pramen/pom.xml line 116) and maintains alignment across build systems.

pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (2)

67-82: LGTM: Proper resource management in withQuery.

The nested try-finally blocks ensure that ResultSet, Statement, and Connection are properly closed in the correct order, even if exceptions occur.


30-30: LGTM: Appropriate JDBC configuration for tests.

The sql.enforce_size=false parameter is appropriate for test fixtures as it disables VARCHAR length constraints, allowing tests to run without strict size validation.

pramen/core/pom.xml (1)

30-30: LGTM: Version bump to 1.12.8-SNAPSHOT.

The parent version update is consistent with the multi-module project structure and aligns with pramen/pom.xml.

pramen/api/pom.xml (1)

30-30: LGTM: Version bump to 1.12.8-SNAPSHOT.

The parent version update is consistent with the multi-module project structure and aligns with pramen/pom.xml.

pramen/extras/pom.xml (2)

30-30: LGTM: Version bump to 1.12.8-SNAPSHOT.

The parent version update is consistent with the multi-module project structure and aligns with pramen/pom.xml.


83-88: LGTM: HSQLDB test dependency added.

The test-scoped HSQLDB dependency with jdk8 classifier aligns with the new RelationalDbFixture test utility and maintains Java 8 compatibility.

pramen/api/src/main/scala/za/co/absa/pramen/api/offset/KafkaPartition.scala (1)

17-19: LGTM — minimal and correct Kafka partition holder.

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala (1)

51-66: KafkaType aggregation test looks good and exercises per-partition min/max correctly.

pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala (1)

51-57: LGTM — KafkaValue construction/assertions are clear and correct.

pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (1)

34-35: Kafka offset type addition looks good.

Type string, mapping, and getSparkCol are consistent with usage elsewhere.

Confirm docs/config accept "kafka" in source definitions to avoid case/alias mismatches.

Also applies to: 54-59, 64-65

pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (1)

142-145: LGTM on config wiring.

Factory now sources KafkaAvroWriterConfig.fromConfig(conf); consistent with writer changes.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfig.scala (1)

49-56: Config surface looks cohesive.

Consistent with TableWriterKafka usage (recordsLimit, key/value schema IDs, keyColumns).

pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/DummyKafkaConfigFactory.scala (1)

38-51: LGTM: mock helpers align with new config model.

Defaults make tests straightforward; trailing comma is fine on 2.13+.

If the project targets Scala < 2.13, confirm trailing commas are enabled or drop them.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (3)

135-161: Offsets handling: logic reads (from, to] correctly; LGTM.

  • start: inclusive; increment when open-ended to avoid reprocessing.
  • end: exclusive; increment to read up to the provided inclusive bound.

Please confirm tests cover:

  • no offsets (earliest→latest),
  • only from,
  • from+to across multiple partitions. Based on learnings.

167-171: Good: options logging with redaction.

Token set should catch sasl.jaas.config and basic.auth.user.info via substring; OK.


216-225: Factory wiring looks correct.

Config is sourced via KafkaAvroConfig.fromConfig; consistent with the rest.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (1)

41-73: LGTM: config parsing and namespacing.

  • Enforces value naming strategy.
  • Properly prefixes extra options.
  • Matches TableWriterKafka and KafkaAvroSource expectations.
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (2)

52-57: No action needed—extraOptions keys are correctly prefixed with "kafka."

The code already handles the key transformation correctly. In KafkaAvroConfig.scala (lines 56–58), extra options extracted from the config are mapped to include the "kafka." prefix:

.map { case (k, v) => (s"$KAFKA_EXTRA_OPTIONS.$k", v) }

where KAFKA_EXTRA_OPTIONS = "kafka". This transforms keys like acks into kafka.acks before they are stored in the extraOptions field and passed to Spark's Kafka sink at line 56 of TableWriterKafka.scala. Spark receives keys in the expected kafka.* format.

Likely an incorrect or invalid review comment.


124-129: Now I need to verify how the schemaRegistryClientConfig is actually used. Let me search for the ToAvroConfig usage:Based on my verification, I have found evidence that confirms the original review comment concern. ABRiS expects unprefixed configuration keys like "basic.auth.credentials.source" and "basic.auth.user.info" when passed to usingSchemaRegistry(). However, the codebase shows that schemaRegistryExtraOptions are being prefixed with "schema.registry." during configuration parsing, which creates a mismatch.

The original review comment's concern about key prefix compatibility is valid. The suggested normalization (stripping the "schema.registry." prefix before passing to ABRiS) is a reasonable approach, though the actual implementation should be verified against how this config is consumed downstream.

Verify the actual usage of schemaRegistryClientConfig in ToAvroConfig and how it's passed to ABRiS methods.

The code currently builds schemaRegistryClientConfig at lines 124-126 but the review comment requires confirmation that this map (with prefixed keys) is indeed being passed directly to ABRiS without any intermediate normalization. If there's existing normalization logic elsewhere, the suggested fix may be unnecessary or redundant.

pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/model/KafkaAvroWriterConfigSuite.scala (1)

46-55: Verify the exact configuration structure for extra options and update test assertions accordingly.

The review comment correctly identifies that the test's "extra params" case doesn't validate anything meaningful. The added key "writer.kafka.schema.registry.key.naming" lacks the proper nesting structure and won't be extracted by getExtraOptions(), which looks for nested configs under "kafka" and "schema.registry" prefixes.

The suggested fix is sound: using "kafka.acks" and "schema.registry.extra.options.basic.auth.user.info" will create the correct nested structures in HOCON and be properly extracted. However, verify the exact paths match your system's configuration conventions before applying.

Update the test to:

  1. Replace the ineffectual extra param with actual keys under kafka and schema.registry hierarchies
  2. Add assertions confirming these keys propagate into the extraOptions and schemaRegistryExtraOptions maps
  3. Remove the now-unnecessary withoutPath() calls for removed test scenarios

Comment on lines +25 to +26
def minOffset = column[String]("min_offset")
def maxOffset = column[String]("max_offset")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Schema change requires migrations and cross‑DB validation.

Dropping O.Length(128) makes sense for Kafka offsets, but it alters the DDL. Ensure forward migrations exist for all supported DBs (H2 for tests, plus prod DBs) and that the resulting type is appropriate (e.g., TEXT/large VARCHAR) to avoid silent truncation or performance issues.

Run to locate DDL/migrations touching offsets:

I can propose Flyway/ Liquibase ALTERs for Postgres/MySQL/MSSQL if you confirm target DBs.


🏁 Script executed:

#!/bin/bash
rg -nP 'CREATE\s+TABLE\s+offsets|ALTER\s+TABLE\s+offsets|min_offset|max_offset|OffsetRecords' -C3

Length of output: 18246


Comment on lines 40 to 64
* kafka {
* bootstrap.servers = "mybroker1:9092,mybroker2:9092"
*
* # Arbitrary options for creating a Kafka Producer
* # sasl.jaas.config = "..."
* # sasl.mechanism = "..."
* # security.protocol = "..."
* # ...
* }
*
* schema.registry {
* url = "https://my.schema.regictry:8081"
*
* # Can be one of: topic.name, record.name, topic.record.name
* schema.registry.value.naming.strategy = "topic.name"
* value.naming.strategy = "topic.name"
*
* # Arbitrary options for creating a Kafka Producer
* option {
* kafka.sasl.jaas.config = "..."
* kafka.sasl.mechanism = "..."
* kafka.security.protocol = "..."
* # ...
* }
* # If you want to force the specific schema id. Otherwise, the latest schema id will be used.
* # key.schema.id =
* # value.schema.id =
*
* # Arbitrary options for Schema registry
* schema.registry.option {
* basic.auth.credentials.source = "..."
* basic.auth.user.info = "..."
* # ...
* }
* # Arbitrary options for the schema registry
* # basic.auth.credentials.source = "..."
* # basic.auth.user.info = "..."
* # ...
* }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Docs: fix typos and clarity.

  • “schema.regictry” → “schema.registry”.
  • Consider clarifying that options under “kafka { … }” are producer options for the sink.
-  *    schema.registry {
-  *      url = "https://my.schema.regictry:8081"
+  *    schema.registry {
+  *      url = "https://my.schema.registry:8081"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
* kafka {
* bootstrap.servers = "mybroker1:9092,mybroker2:9092"
*
* # Arbitrary options for creating a Kafka Producer
* # sasl.jaas.config = "..."
* # sasl.mechanism = "..."
* # security.protocol = "..."
* # ...
* }
*
* schema.registry {
* url = "https://my.schema.regictry:8081"
*
* # Can be one of: topic.name, record.name, topic.record.name
* schema.registry.value.naming.strategy = "topic.name"
* value.naming.strategy = "topic.name"
*
* # Arbitrary options for creating a Kafka Producer
* option {
* kafka.sasl.jaas.config = "..."
* kafka.sasl.mechanism = "..."
* kafka.security.protocol = "..."
* # ...
* }
* # If you want to force the specific schema id. Otherwise, the latest schema id will be used.
* # key.schema.id =
* # value.schema.id =
*
* # Arbitrary options for Schema registry
* schema.registry.option {
* basic.auth.credentials.source = "..."
* basic.auth.user.info = "..."
* # ...
* }
* # Arbitrary options for the schema registry
* # basic.auth.credentials.source = "..."
* # basic.auth.user.info = "..."
* # ...
* }
* kafka {
* bootstrap.servers = "mybroker1:9092,mybroker2:9092"
*
* # Arbitrary options for creating a Kafka Producer
* # sasl.jaas.config = "..."
* # sasl.mechanism = "..."
* # security.protocol = "..."
* # ...
* }
*
* schema.registry {
* url = "https://my.schema.registry:8081"
*
* # Can be one of: topic.name, record.name, topic.record.name
* value.naming.strategy = "topic.name"
*
* # If you want to force the specific schema id. Otherwise, the latest schema id will be used.
* # key.schema.id =
* # value.schema.id =
*
* # Arbitrary options for the schema registry
* # basic.auth.credentials.source = "..."
* # basic.auth.user.info = "..."
* # ...
* }
🤖 Prompt for AI Agents
In
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala
around lines 40 to 64, fix the typo "schema.regictry" -> "schema.registry" and
update the comments to clarify that the options inside the "kafka { … }" block
are Kafka Producer configuration options used by the sink (e.g.,
bootstrap.servers and optional producer settings), and optionally add a brief
note that schema registry options apply to the Avro serializer; keep wording
concise and correct the misspelling.

Comment on lines 181 to 205
val abrisValueConfig = AbrisConfig
.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andTopicNameStrategy(topic, isKey = false)
.usingSchemaRegistry(schemaRegistryClientConfig)

val df1 = dfRaw
.withColumn("data", from_avro(col("value"), abrisValueConfig))
.withColumn(KAFKA_PARTITION_FIELD, col("partition"))
.withColumn(KAFKA_OFFSET_FIELD, col("offset"))
.withColumn("kafka_timestamp", col("timestamp"))
.withColumn("kafka_timestamp_type", col("timestampType"))

val df2 = kafkaAvroConfig.keyNamingStrategy match {
case Some(keyNamingStrategy) =>
val abrisKeyConfig = AbrisConfig
.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andTopicNameStrategy(topic, isKey = true)
.usingSchemaRegistry(schemaRegistryClientConfig)
df1.withColumn("kafka_key", from_avro(col("key"), abrisKeyConfig))
case None =>
df1.withColumn("kafka_key", col("key"))
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: ignores configured naming strategy; always uses TopicName strategy.

Both value and key readers should honor KafkaAvroConfig.valueNamingStrategy / keyNamingStrategy. Otherwise, record.name or topic.record.name subjects won’t resolve and decoding will fail.

Apply this diff:

-    val abrisValueConfig = AbrisConfig
-      .fromConfluentAvro
-      .downloadReaderSchemaByLatestVersion
-      .andTopicNameStrategy(topic, isKey = false)
-      .usingSchemaRegistry(schemaRegistryClientConfig)
+    val abrisValueBase = AbrisConfig.fromConfluentAvro.downloadReaderSchemaByLatestVersion
+    val abrisValueConfig = kafkaAvroConfig.valueNamingStrategy.namingStrategy match {
+      case za.co.absa.pramen.extras.writer.model.NamingStrategy.NAMING_STRATEGY_TOPIC_NAME =>
+        abrisValueBase.andTopicNameStrategy(topic, isKey = false)
+      case za.co.absa.pramen.extras.writer.model.NamingStrategy.NAMING_STRATEGY_RECORD_NAME =>
+        abrisValueBase.andRecordNameStrategy(
+          kafkaAvroConfig.valueNamingStrategy.recordName.get,
+          kafkaAvroConfig.valueNamingStrategy.recordNamespace.get
+        )
+      case za.co.absa.pramen.extras.writer.model.NamingStrategy.NAMING_STRATEGY_TOPIC_RECORD_NAME =>
+        abrisValueBase.andTopicRecordNameStrategy(
+          topic,
+          kafkaAvroConfig.valueNamingStrategy.recordName.get,
+          kafkaAvroConfig.valueNamingStrategy.recordNamespace.get
+        )
+      case other =>
+        throw new IllegalArgumentException(s"Unsupported value naming strategy: $other")
+    }.usingSchemaRegistry(schemaRegistryClientConfig)
@@
-      case Some(keyNamingStrategy) =>
-        val abrisKeyConfig = AbrisConfig
-          .fromConfluentAvro
-          .downloadReaderSchemaByLatestVersion
-          .andTopicNameStrategy(topic, isKey = true)
-          .usingSchemaRegistry(schemaRegistryClientConfig)
-        df1.withColumn("kafka_key", from_avro(col("key"), abrisKeyConfig))
+      case Some(keyNamingStrategy) =>
+        val abrisKeyBase = AbrisConfig.fromConfluentAvro.downloadReaderSchemaByLatestVersion
+        val abrisKeyConfig = keyNamingStrategy.namingStrategy match {
+          case za.co.absa.pramen.extras.writer.model.NamingStrategy.NAMING_STRATEGY_TOPIC_NAME =>
+            abrisKeyBase.andTopicNameStrategy(topic, isKey = true)
+          case za.co.absa.pramen.extras.writer.model.NamingStrategy.NAMING_STRATEGY_RECORD_NAME =>
+            abrisKeyBase.andRecordNameStrategy(
+              keyNamingStrategy.recordName.get,
+              keyNamingStrategy.recordNamespace.get
+            )
+          case za.co.absa.pramen.extras.writer.model.NamingStrategy.NAMING_STRATEGY_TOPIC_RECORD_NAME =>
+            abrisKeyBase.andTopicRecordNameStrategy(
+              topic,
+              keyNamingStrategy.recordName.get,
+              keyNamingStrategy.recordNamespace.get
+            )
+          case other =>
+            throw new IllegalArgumentException(s"Unsupported key naming strategy: $other")
+        }.usingSchemaRegistry(schemaRegistryClientConfig)
+        df1.withColumn("kafka_key", from_avro(col("key"), abrisKeyConfig))

Optionally, extract this into a small helper to reduce duplication.

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/TableWriterKafkaSuite.scala (2)

30-45: Re-enable or remove commented-out test assertions.

The test successfully constructs a TableWriterKafka instance but all assertions validating the output DataFrame structure are commented out (lines 40-44). Tests without assertions provide no validation and can create false confidence in the codebase.

Either uncomment and update the assertions to match the refactored API, or remove this test if the functionality is not yet ready for validation.


47-67: Re-enable or remove commented-out test assertions.

Similar to the first test, all assertions validating the key-value DataFrame structure are commented out (lines 60-66). This test case covers an important scenario (Kafka messages with both keys and values) but provides no validation of the actual behavior.

Please uncomment and update these assertions to validate the refactored implementation, or remove the test if it's not ready.

🧹 Nitpick comments (1)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (1)

54-58: Verify that returning the column unchanged is the intended behavior.

The getSparkCol implementation returns the input column without any type casting or transformation, which differs from all other offset types that perform explicit casts (DateTimeType → LongType, IntegralType → LongType, StringType → SparkStringType). Based on the context from OffsetValue.scala, Kafka offsets appear to be stored as JSON-like strings.

Consider casting to SparkStringType for consistency and type safety:

-    override def getSparkCol(c: Column): Column = c
+    override def getSparkCol(c: Column): Column = c.cast(SparkStringType)

This would ensure type consistency and provide early validation if the column is not in the expected string format.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 097ae65 and 2629c04.

📒 Files selected for processing (2)
  • pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (2 hunks)
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/TableWriterKafkaSuite.scala (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (2)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (1)
  • fromString (131-156)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala (1)
  • dataType (24-24)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (4)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/TableWriterKafkaSuite.scala (2)

69-88: LGTM!

This test properly validates that extra Kafka options (SASL configuration) are correctly extracted and accessible via getExtraOptions. The configuration structure aligns with the refactored API and the assertions provide meaningful validation.


32-36: Configuration refactoring looks correct.

The updated configuration structure consistently uses the flat kafka.* and schema.registry.* keys across all test cases, properly aligning with the refactored KafkaAvroWriterConfig API. The transition from the nested configuration model to the new flat structure is clean and consistent.

Also applies to: 49-56, 71-79

pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (2)

34-34: LGTM!

The constant declaration follows the established pattern and the string "kafka" fits within the database column length constraint of 20 characters.


65-65: LGTM!

The fromString update correctly maps KAFKA_TYPE_STR to KafkaType, following the established pattern for other offset types.

@github-actions
Copy link

github-actions bot commented Oct 21, 2025

Unit Test Coverage

Overall Project 83.8% -1.95% 🍏
Files changed 27.9%

Module Coverage
pramen:core Jacoco Report 85.47% -0.17%
pramen-extras Jacoco Report 68.64% -18.1%
Files
Module File Coverage
pramen:core Jacoco Report OffsetRecords.scala 100% 🍏
OffsetManagerUtils.scala 98.05% -1.95% 🍏
PipelineNotificationBuilderHtml.scala 90.01% -0.22%
OffsetManagerJdbc.scala 87.82% -0.84% 🍏
IncrementalIngestionJob.scala 75.57% -7.09%
pramen-extras Jacoco Report KafkaAvroWriterConfig.scala 100%
KafkaAvroConfig.scala 90.78%
ConfigUtils.scala 68.84% -8.9%
NamingStrategy.scala 29.86% -65.02%
TableWriterKafka.scala 8.93% -24.78%
KafkaAvroSink.scala 0% -24.44%
KafkaAvroSource.scala 0%

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (1)

51-51: Typo already fixed.

The typo identified in the previous review ("list if tables" → "list of tables") has been corrected in the current code.

🧹 Nitpick comments (6)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (1)

25-83: Consider extracting shared test fixture to avoid duplication.

This trait is nearly identical to pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/RelationalDbFixture.scala. The duplication creates maintenance overhead if either fixture needs updates.

Consider one of these approaches:

  • Create a shared test-utils module containing common test fixtures
  • Publish core's test fixtures as a test-jar and depend on it from extras
  • If module independence is critical, document this as intentional duplication
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (1)

47-57: Avoid Option.get in subject resolution.

Defensive checks would yield clearer errors than NoSuchElementException on missing names/namespaces (especially for programmatic construction).

-    } else if (namingStrategy == NAMING_STRATEGY_RECORD_NAME) {
-      SchemaSubject.usingRecordNameStrategy(recordName.get, recordNamespace.get)
+    } else if (namingStrategy == NAMING_STRATEGY_RECORD_NAME) {
+      val (name, ns) = (recordName.getOrElse(throw new IllegalArgumentException("Record name missing.")),
+                        recordNamespace.getOrElse(throw new IllegalArgumentException("Record namespace missing.")))
+      SchemaSubject.usingRecordNameStrategy(name, ns)
     } else if (namingStrategy == NAMING_STRATEGY_TOPIC_RECORD_NAME) {
-      SchemaSubject.usingTopicRecordNameStrategy(topicName, recordName.get, recordNamespace.get)
+      val (name, ns) = (recordName.getOrElse(throw new IllegalArgumentException("Record name missing.")),
+                        recordNamespace.getOrElse(throw new IllegalArgumentException("Record namespace missing.")))
+      SchemaSubject.usingTopicRecordNameStrategy(topicName, name, ns)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (2)

56-64: Scaladoc: use actual config keys for schema IDs.

Align with KafkaAvroWriterConfig constants.

-  *      # key.schema.id =
-  *      # value.schema.id =
+  *      # schema.registry.key.schema.id =
+  *      # schema.registry.value.schema.id =

40-49: Clarify producer options in example.

These are Kafka Producer options for the sink.

-  *      # Arbitrary options for creating a Kafka Producer
+  *      # Arbitrary Kafka Producer options used by the sink
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (2)

52-57: Docs: consumer vs producer.

For source, these are consumer/reader options.

-  *      # Arbitrary options for creating a Kafka Producer
+  *      # Arbitrary options for the Kafka consumer/reader

135-161: Add edge-case tests for Kafka offset semantics as the implementation logic is correct.

The offset handling in lines 135-161 implements the semantics correctly:

  • Starting offset: inclusive when to is defined (closed interval: min ≤ offset ≤ max); incremented by 1 when to is undefined (open interval: min < offset) to avoid reprocessing.
  • Ending offset: incremented by 1 to be exclusive, matching Spark's semantics.

However, no dedicated test suite exists for KafkaAvroSource offset logic. Add tests covering:

  • Single-partition and multi-partition offset handling
  • Closed interval (to defined) vs. open interval (to undefined)
  • Empty ranges and boundary conditions
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2629c04 and c6df10e.

📒 Files selected for processing (8)
  • README.md (2 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/RelationalDbFixture.scala (1 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (5 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (1 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (3 hunks)
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (1 hunks)
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/DummyKafkaConfigFactory.scala (1 hunks)
  • pramen/pom.xml (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/RelationalDbFixture.scala
🚧 Files skipped from review as they are similar to previous changes (1)
  • pramen/pom.xml
🧰 Additional context used
🧬 Code graph analysis (4)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/RelationalDbFixture.scala (5)
  • getConnection (35-36)
  • beforeAll (38-43)
  • afterAll (45-49)
  • getTables (52-65)
  • withQuery (67-82)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (3)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (6)
  • pramen (66-66)
  • pramen (68-78)
  • pramen (80-86)
  • pramen (88-98)
  • pramen (100-118)
  • pramen (120-138)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfig.scala (3)
  • KafkaAvroWriterConfig (22-28)
  • KafkaAvroWriterConfig (30-57)
  • fromConfig (36-56)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (1)
  • fromConfig (41-73)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (4)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (7)
  • OffsetValue (34-157)
  • KafkaValue (83-128)
  • increment (125-127)
  • valueString (41-41)
  • valueString (56-56)
  • valueString (71-71)
  • valueString (86-91)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)
  • ConfigUtils (24-205)
  • logExtraOptions (79-87)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (3)
  • KafkaAvroConfig (22-29)
  • KafkaAvroConfig (31-74)
  • fromConfig (41-73)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (1)
  • applyNamingStrategyToAbrisConfig (78-106)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/DummyKafkaConfigFactory.scala (5)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (6)
  • pramen (66-66)
  • pramen (68-78)
  • pramen (80-86)
  • pramen (88-98)
  • pramen (100-118)
  • pramen (120-138)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (2)
  • KafkaAvroConfig (22-29)
  • KafkaAvroConfig (31-74)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfig.scala (2)
  • KafkaAvroWriterConfig (22-28)
  • KafkaAvroWriterConfig (30-57)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (2)
  • NamingStrategy (23-107)
  • NamingStrategy (109-142)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/DummyNamingStrategyFactory.scala (2)
  • DummyNamingStrategyFactory (21-30)
  • getDummyNamingStrategy (22-29)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (1)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/DummyKafkaConfigFactory.scala (1)

22-36: LGTM: handy test helpers map 1:1 with production configs.

Also applies to: 38-50

Comment on lines 195 to 203
val df2 = kafkaAvroConfig.keyNamingStrategy match {
case Some(keyNamingStrategy) =>
val abrisKeyConfig = kafkaAvroConfig.valueNamingStrategy
.applyNamingStrategyToAbrisConfig(abrisValueBase, topic, isKey = true)
.usingSchemaRegistry(schemaRegistryClientConfig)
df1.withColumn("kafka_key", from_avro(col("key"), abrisKeyConfig))
case None =>
df1.withColumn("kafka_key", col("key"))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: key decoding uses value naming strategy.

Use the configured key naming strategy (when present).

Apply this diff:

     val df2 = kafkaAvroConfig.keyNamingStrategy match {
       case Some(keyNamingStrategy) =>
-        val abrisKeyConfig = kafkaAvroConfig.valueNamingStrategy
-          .applyNamingStrategyToAbrisConfig(abrisValueBase, topic, isKey = true)
+        val abrisKeyConfig = keyNamingStrategy
+          .applyNamingStrategyToAbrisConfig(abrisValueBase, topic, isKey = true)
           .usingSchemaRegistry(schemaRegistryClientConfig)
         df1.withColumn("kafka_key", from_avro(col("key"), abrisKeyConfig))
       case None =>
         df1.withColumn("kafka_key", col("key"))
     }

Note: This also relies on fixing isKey handling in NamingStrategy.applyNamingStrategyToAbrisConfig.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val df2 = kafkaAvroConfig.keyNamingStrategy match {
case Some(keyNamingStrategy) =>
val abrisKeyConfig = kafkaAvroConfig.valueNamingStrategy
.applyNamingStrategyToAbrisConfig(abrisValueBase, topic, isKey = true)
.usingSchemaRegistry(schemaRegistryClientConfig)
df1.withColumn("kafka_key", from_avro(col("key"), abrisKeyConfig))
case None =>
df1.withColumn("kafka_key", col("key"))
}
val df2 = kafkaAvroConfig.keyNamingStrategy match {
case Some(keyNamingStrategy) =>
val abrisKeyConfig = keyNamingStrategy
.applyNamingStrategyToAbrisConfig(abrisValueBase, topic, isKey = true)
.usingSchemaRegistry(schemaRegistryClientConfig)
df1.withColumn("kafka_key", from_avro(col("key"), abrisKeyConfig))
case None =>
df1.withColumn("kafka_key", col("key"))
}
🤖 Prompt for AI Agents
In
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala
around lines 195 to 203, the code wrongly uses the value naming strategy to
build the Abris config for the key; change it to use
kafkaAvroConfig.keyNamingStrategy to call applyNamingStrategyToAbrisConfig on
the key base (not the value base), mark isKey = true, chain
usingSchemaRegistry(schemaRegistryClientConfig) and pass that config into
from_avro for the "kafka_key" column; also ensure
NamingStrategy.applyNamingStrategyToAbrisConfig correctly respects the isKey
flag when constructing configs.

Comment on lines 942 to 951
kafka {
bootstrap.servers = "mybroker1:9092,mybroker2:9092"
# Arbitrary options for creating a Kafka Producer
sasl.jaas.config = "..."
sasl.mechanism = "..."
security.protocol = "..."
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Docs: say “Kafka consumer/reader options” for source.

This block configures reader/consumer options, not a producer.

-      # Arbitrary options for creating a Kafka Producer
+      # Arbitrary options for the Kafka consumer/reader
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kafka {
bootstrap.servers = "mybroker1:9092,mybroker2:9092"
# Arbitrary options for creating a Kafka Producer
sasl.jaas.config = "..."
sasl.mechanism = "..."
security.protocol = "..."
}
kafka {
bootstrap.servers = "mybroker1:9092,mybroker2:9092"
# Arbitrary options for the Kafka consumer/reader
sasl.jaas.config = "..."
sasl.mechanism = "..."
security.protocol = "..."
}
🤖 Prompt for AI Agents
In README.md around lines 942 to 951, the kafka config block is described as
Producer options but it actually documents consumer/reader settings; update the
wording to say “Kafka consumer/reader options” (or similar) at the heading or
preceding sentence and, if present, replace or supplement producer-specific
example keys with consumer-appropriate keys (e.g., group.id, enable.auto.commit,
auto.offset.reset) so the description and examples consistently reflect
reader/consumer configuration.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (1)

81-102: Fix: Inconsistent error message wording at line 100.

The previous review correctly identified that most issues were fixed (isKey passing, bound variables), but the error message at line 100 still says "Unsupported value naming strategy" when it should just say "Unsupported naming strategy" to be consistent with the getSubject method (line 58) and because this method applies to both key and value strategies.

Apply this diff to fix the wording:

       case other =>
-        throw new IllegalArgumentException(s"Unsupported value naming strategy: $other")
+        throw new IllegalArgumentException(s"Unsupported naming strategy: $other")
🧹 Nitpick comments (6)
pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/RelationalDbFixture.scala (1)

47-55: LGTM! Proper HSQLDB shutdown and resource cleanup.

The shutdown logic ensures the connection is closed in a finally block and gracefully handles cases where the database is already shutdown. This prevents resource leaks in test cleanup.

Note: This identical logic appears in both pramen/core/.../RelationalDbFixture.scala and pramen/extras/.../RelationalDbFixture.scala. Consider extracting this common test fixture to a shared test utilities module to avoid duplication.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (1)

108-110: Suggest using constant for offset column name.

Line 109 hardcodes "kafka_offset" but the code uses KAFKA_OFFSET_FIELD constant elsewhere (e.g., line 191). For consistency and maintainability, use the imported constant.

Apply this diff:

   override def getOffsetInfo: Option[OffsetInfo] = {
-    Some(OffsetInfo("kafka_offset", OffsetType.KafkaType))
+    Some(OffsetInfo(KAFKA_OFFSET_FIELD, OffsetType.KafkaType))
   }
pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (4)

45-57: Always run super.afterAll(); guard cleanup with finally

If getConnection/SHUTDOWN throws, super.afterAll() won’t run. Wrap it in finally and drop the redundant null check.

-  override protected def afterAll(): Unit = {
-    val connection = getConnection
-    if (connection != null) {
-      try {
-        connection.createStatement().execute("SHUTDOWN")
-      } catch {
-        case _: SQLException => // Already shutdown or in-memory DB
-      } finally {
-        connection.close()
-      }
-    }
-    super.afterAll()
-  }
+  override protected def afterAll(): Unit = {
+    try {
+      var connection: Connection = null
+      try {
+        connection = getConnection
+        connection.createStatement().execute("SHUTDOWN")
+      } catch {
+        case _: SQLException => () // already shutdown or in‑memory
+      } finally {
+        if (connection != null) connection.close()
+      }
+    } finally {
+      super.afterAll()
+    }
+  }

60-73: Make getTables exception‑safe (reuse withQuery or add try/finally)

Current code leaks resources on exceptions. Mirror withQuery’s pattern or reuse it directly.

Option A — safe try/finally:

 def getTables: Seq[String] = {
-    val conn = getConnection
-    val st: Statement = conn.createStatement()
-    val rs: ResultSet = st.executeQuery("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.SYSTEM_TABLES WHERE TABLE_TYPE='TABLE'")
-
-    val tables = new ListBuffer[String]
-    while(rs.next()) {
-      tables += rs.getString(1)
-    }
-    rs.close()
-    st.close()
-    conn.close()
-    tables.toSeq
+    val conn = getConnection
+    val st: Statement = conn.createStatement()
+    try {
+      val rs: ResultSet = st.executeQuery("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.SYSTEM_TABLES WHERE TABLE_TYPE='TABLE'")
+      try {
+        val tables = new ListBuffer[String]
+        while (rs.next()) {
+          tables += rs.getString(1)
+        }
+        tables.toSeq
+      } finally {
+        rs.close()
+      }
+    } finally {
+      st.close()
+      conn.close()
+    }
 }

Option B — reuse withQuery:

 def getTables: Seq[String] = {
-  ...
-  -- current implementation
+  val buf = new ListBuffer[String]
+  withQuery("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.SYSTEM_TABLES WHERE TABLE_TYPE='TABLE'") { rs =>
+    while (rs.next()) buf += rs.getString(1)
+  }
+  buf.toSeq
 }

63-68: Optionally filter system schemas or use DatabaseMetaData for portability

HSQLDB’s SYSTEM_TABLES may include non‑user tables. Consider restricting to public schema or using JDBC metadata.

Examples:

  • SQL filter:
-"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.SYSTEM_TABLES WHERE TABLE_TYPE='TABLE'"
+"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.SYSTEM_TABLES WHERE TABLE_TYPE='TABLE' AND TABLE_SCHEMA = 'PUBLIC'"
  • Or via JDBC:
val md = conn.getMetaData
val rs = md.getTables(null, "PUBLIC", "%", Array("TABLE"))

38-43: Restore the original default timezone after the suite

Setting a global default timezone can leak into other suites. Capture and restore it in afterAll().

 trait RelationalDbFixture extends BeforeAndAfterAll {
@@
+  private var originalTimeZone: TimeZone = _
   override protected def beforeAll(): Unit = {
-    TimeZone.setDefault(TimeZone.getTimeZone("Africa/Johannesburg"))
+    originalTimeZone = TimeZone.getDefault
+    TimeZone.setDefault(TimeZone.getTimeZone("Africa/Johannesburg"))
@@
-  override protected def afterAll(): Unit = {
-    val connection = getConnection
+  override protected def afterAll(): Unit = {
-    ...
-    super.afterAll()
+    try {
+      // existing SHUTDOWN logic (see prior diff)
+    } finally {
+      if (originalTimeZone != null) TimeZone.setDefault(originalTimeZone)
+      super.afterAll()
+    }
   }

Also applies to: 45-57

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c6df10e and 17d3c63.

📒 Files selected for processing (6)
  • README.md (2 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/RelationalDbFixture.scala (1 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (5 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (1 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (2 hunks)
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala
🧰 Additional context used
🧬 Code graph analysis (3)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/RelationalDbFixture.scala (5)
  • getConnection (35-36)
  • beforeAll (38-43)
  • afterAll (45-57)
  • getTables (60-73)
  • withQuery (75-90)
pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/RelationalDbFixture.scala (1)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (1)
  • afterAll (45-57)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (8)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (7)
  • OffsetValue (34-157)
  • KafkaValue (83-128)
  • increment (125-127)
  • valueString (41-41)
  • valueString (56-56)
  • valueString (71-71)
  • valueString (86-91)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetInfo.scala (1)
  • OffsetInfo (19-19)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetType.scala (2)
  • OffsetType (29-68)
  • KafkaType (54-58)
pramen/api/src/main/scala/za/co/absa/pramen/api/Query.scala (1)
  • Query (28-56)
pramen/api/src/main/scala/za/co/absa/pramen/api/SourceResult.scala (1)
  • SourceResult (21-25)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)
  • ConfigUtils (24-205)
  • logExtraOptions (79-87)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (3)
  • KafkaAvroConfig (22-29)
  • KafkaAvroConfig (31-74)
  • fromConfig (41-73)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (1)
  • applyNamingStrategyToAbrisConfig (81-102)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (11)
pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/RelationalDbFixture.scala (1)

59-59: LGTM! Comment typo corrected.

Fixed grammatical error in the comment.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (2)

47-60: LGTM! Well-structured subject resolution.

The method correctly handles all three naming strategies with proper validation and error handling. The use of the getNameAndNamespace helper ensures consistent validation for strategies requiring record name and namespace.


104-107: LGTM! Good defensive validation helper.

The helper provides clear error messages when record name or namespace is missing for strategies that require them.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (5)

135-161: LGTM! Correct Kafka offset interval handling.

The offset logic correctly handles:

  • Inclusive starting offsets (with increment when open interval)
  • Exclusive ending offsets (with increment as Spark requires)
  • Clear error messages for type mismatches

172-193: LGTM! Value decoding correctly honors naming strategy.

The value decoding properly uses kafkaAvroConfig.valueNamingStrategy.applyNamingStrategyToAbrisConfig with isKey = false, fixing the past issue where naming strategies were ignored.


195-203: LGTM! Key decoding correctly uses key naming strategy.

The key decoding properly uses keyNamingStrategy.applyNamingStrategyToAbrisConfig with isKey = true, fixing the past issue where the value strategy was incorrectly used for keys. The fallback to raw key when no key strategy is defined is a sensible default.


205-210: LGTM! Correct final DataFrame structure.

The final selection flattens the Avro "data" structure while preserving Kafka metadata fields, producing a clean schema for downstream processing.


215-224: LGTM! Well-structured factory with proper security.

The companion object provides a clean factory interface and correctly redacts sensitive tokens (password, jaas.config, auth.user.info) from logs.

README.md (2)

929-987: LGTM! Clear and accurate Kafka Avro source documentation.

The documentation correctly describes the source capabilities, shows complete configuration examples, and provides a working pipeline operation example. The comment fixes (consumer/reader, schema.registry typo) from previous reviews are applied.


1000-1080: LGTM! Well-structured Kafka Avro sink documentation.

The sink documentation correctly describes the functionality (note: "Kafka Producer" at line 1016 is accurate for a sink, unlike the source which uses consumer/reader). The typo fixes from previous reviews (Confluent, schema.registry) are applied. The operation example demonstrates optional transformations, filters, and column selection.

pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/RelationalDbFixture.scala (1)

75-90: withQuery helper is tidy and resource‑safe

Good use of nested try/finally to ensure ResultSet/Statement/Connection are closed.

@yruslan yruslan merged commit 68971be into main Oct 21, 2025
9 checks passed
@yruslan yruslan deleted the feature/644-kafka-batch-source branch October 21, 2025 12:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add experimental Kafka/Avro source

1 participant