-
Notifications
You must be signed in to change notification settings - Fork 3
#648 Kafka Source: Group Kafka generated columns into a configurable struct column. #650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Unit Test Coverage
Files
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements issue #648 by restructuring how Kafka-generated columns are organized in the data schema. The changes group Kafka metadata (partition, offset, timestamp, timestamp_type) into a configurable struct column and allow customization of column names for both the Kafka metadata struct and the key column.
Key changes:
- Kafka metadata fields are now grouped into a configurable struct column (default: "kafka") instead of individual top-level columns
- Column names for Kafka metadata and key are now configurable via
custom.kafka.columnandkey.column.nameconfiguration options - Updated offset management utilities to work with nested struct fields instead of flat columns
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| KafkaAvroSource.scala | Restructured to create a struct column for Kafka metadata and made column names configurable |
| KafkaAvroSink.scala | Added logic to drop the configurable Kafka metadata column before writing to Kafka |
| OffsetManagerUtilsSuite.scala | Updated test to create Kafka metadata as a struct field |
| SparkUtils.scala | Added getNestedField utility method to retrieve nested struct fields from schema |
| IncrementalIngestionJob.scala | Updated to validate nested Kafka offset and partition fields within struct |
| OffsetManagerUtils.scala | Modified to work with nested Kafka metadata fields using dot notation |
| OffsetValue.scala | Removed now-unused constant field name definitions |
| README.md | Added documentation for new configuration options |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala
Outdated
Show resolved
Hide resolved
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala
Outdated
Show resolved
Hide resolved
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala
Outdated
Show resolved
Hide resolved
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughRefactors Kafka metadata handling: removes two hardcoded Kafka field constants, adds a nested-schema resolver, makes Kafka metadata column name configurable in source and sink, wraps metadata into a configurable struct on read, and drops that metadata column before writing to Kafka. Tests and utilities updated to use dynamic field names. Changes
Sequence Diagram(s)sequenceDiagram
participant Kafka as Kafka Topic
participant Source as KafkaAvroSource
participant Config as Config
participant Schema as Schema Builder
participant DF as DataFrame
participant Sink as KafkaAvroSink
Config->>Source: read custom.kafka.column / key.column.name
Source->>Kafka: subscribe & read records
Kafka-->>Source: record + partition, offset, timestamp, key
Source->>Schema: build payload fields
Source->>DF: create tmp_pramen_kafka (partition,offset,timestamp,...)
Source->>DF: create tmp_pramen_kafka_key (key)
Source->>DF: project tmp_pramen_kafka_key, data.*, tmp_pramen_kafka
Source->>DF: drop original configured metadata columns (if conflict) and rename temps → configured names
DF-->>Sink: dataframe with configured nested kafka column
Sink->>Sink: if configured kafka column exists -> drop it (log)
Sink->>Kafka: write records (without kafka metadata column)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (5 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (1)
42-43: Fix grammar in documentation comment.The comment contains grammatical issues that were flagged in previous reviews:
- "struct field that Kafka record metadata" should be "struct field that contains Kafka record metadata"
- "if It exists" should be "if it exists"
Apply this diff:
- * # [Optional] Set name for the struct field that contains Kafka record metadata. This column will be dropped if It exists before sending data to Kafka. + * # [Optional] Set name for the struct field that contains Kafka record metadata. This column will be dropped if it exists before sending data to Kafka.README.md (2)
942-946: Fix grammar in documentation comments.The comments contain grammatical issues flagged in previous reviews:
- Line 942: "struct field that Kafka record metadata" should be "struct field that contains Kafka record metadata"
Apply this diff:
- # [Optional] Set name for the struct field that Kafka record metadata + # [Optional] Set name for the struct field that contains Kafka record metadata
1017-1019: Fix grammar in documentation comments.The comments contain grammatical issues flagged in previous reviews:
- Line 1017: "struct field that Kafka record metadata" should be "struct field that contains Kafka record metadata"
- "if It exists" should be "if it exists"
Apply this diff:
- # [Optional] Set name for the struct field that contains Kafka record metadata. This column will be dropped if It exists before sending data to Kafka. + # [Optional] Set name for the struct field that contains Kafka record metadata. This column will be dropped if it exists before sending data to Kafka.pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (1)
48-51: Fix grammar in documentation comment.The comment contains a grammatical issue flagged in previous reviews:
- Line 48: "struct field that Kafka record metadata" should be "struct field that contains Kafka record metadata"
Apply this diff:
- * # [Optional] Set name for the struct field that contains Kafka record metadata + * # [Optional] Set name for the struct field that contains Kafka record metadataWait, looking at line 48 more carefully, it actually already says "contains" - let me check the actual text again... No, line 48 shows "struct field that contains Kafka record metadata" which is correct. But the past review comment suggests it should say "contains" or "for". Let me look at the exact text provided.
Actually, reviewing the provided text at line 48-49, I see:
* # [Optional] Set name for the struct field that contains Kafka record metadataThis is already correct! The past review comment was about an older version that said "struct field that Kafka record metadata" without "contains" or "for". The current code already has "contains" so this is fine.
Let me skip this comment since the grammar is already correct.
🧹 Nitpick comments (1)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (1)
199-224: LGTM: Kafka metadata grouped into configurable struct at schema end.The implementation correctly:
- Creates a temporary
tmp_pramen_kafkastruct containing partition, offset, timestamp, and timestamp_type (lines 199-204)- Uses temporary
tmp_pramen_kafka_keycolumn for key data (lines 211, 213)- Projects data fields first (
data.*), then Kafka metadata (line 219), placing Kafka fields at the end of the schema per PR objectives- Drops conflicting payload fields (lines 220-221) before renaming temporaries (lines 222-223)
However, the PR objectives state: "If the chosen Kafka struct field name conflicts with an existing payload field, the Kafka field should replace the payload field and a warning should be generated."
Currently, lines 220-221 silently drop the conflicting columns without logging a warning. Consider adding a check and warning:
// Before line 219 val payloadFields = df2.select("data.*").schema.fieldNames.toSet if (payloadFields.contains(kafkaColumnName)) { log.warn(s"Payload field '$kafkaColumnName' conflicts with Kafka metadata struct name and will be replaced.") } if (payloadFields.contains(keyColumnName)) { log.warn(s"Payload field '$keyColumnName' conflicts with Kafka key column name and will be replaced.") }Would you like me to open an issue to track this enhancement?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
README.md(2 hunks)pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala(0 hunks)pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala(1 hunks)pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala(2 hunks)pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala(1 hunks)pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala(2 hunks)pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala(4 hunks)pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala(5 hunks)
💤 Files with no reviewable changes (1)
- pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala
🔇 Additional comments (12)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala (2)
19-19: LGTM: Import added for struct function.The addition of
structto the imports is necessary for creating the nested Kafka metadata structure in the test.
61-66: LGTM: Test updated to reflect new nested Kafka struct approach.The test now correctly creates a nested "kafka" struct containing offset and partition fields, and passes "kafka" to
getMinMaxValueFromData. This aligns with the PR's objective of grouping Kafka metadata into a configurable struct field.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala (2)
30-31: LGTM: Dynamic Kafka field names support nested structures.The construction of
kafkaOffsetFieldNameandkafkaPartitionFieldNameusing dot notation enables access to nested struct fields (e.g., "kafka.offset", "kafka.partition"). This aligns with the PR's goal of grouping Kafka metadata into a configurable struct.
34-41: LGTM: Aggregation logic correctly uses dynamic field names.The updated groupBy, aggregation, and ordering operations now reference the dynamically constructed field names. The positional access pattern (
getAs[Int](0),getAs[Long](1),getAs[Long](2)) in lines 40-41 correctly extracts partition (position 0), min_offset (position 1), and max_offset (position 2) from the aggregated results.pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (3)
23-24: LGTM: Imports added for configuration handling.The imports enable access to the shared
CUSTOM_KAFKA_COLUMN_KEYconstant and configuration utilities, ensuring consistency with the source's configuration model.
122-122: LGTM: Configurable Kafka column name with sensible default.The
kafkaColumnNameis derived from the optionalcustom.kafka.columnconfiguration, defaulting to "kafka". This aligns with the source's default and the PR's objective of making the Kafka struct field name configurable.
141-143: LGTM: Kafka metadata column dropped before writing to Kafka.Dropping the configured
kafkaColumnNamecolumn before writing prevents name collisions when data is re-ingested from Kafka, addressing one of the PR's key objectives.pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala (1)
618-648: LGTM: Well-designed nested field resolution utility.The
getNestedFieldmethod provides a clean API for resolving nested StructType fields using dot notation. Key strengths:
- Clear validation: Empty field names are rejected at line 643-645
- Recursive traversal: The helper
getNestedFieldInArraycorrectly handles nested StructTypes- Case-insensitive matching: Root field lookup (line 621) uses
equalsIgnoreCase, improving usability- Error handling: Provides informative error messages listing available fields (line 636-637) and reporting type mismatches (line 632)
The implementation supports the PR's dynamic Kafka field access pattern effectively.
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala (2)
33-33: LGTM: Import added for nested field utilities.The
SparkUtilsimport is necessary for accessing thegetNestedFieldmethod used in Kafka offset validation.
310-324: LGTM: Kafka offset validation adapted for dynamic nested fields.The validation logic correctly:
- Constructs dynamic field names (
kafkaOffsetFieldName,kafkaPartitionFieldName) using dot notation (lines 311-312)- Retrieves nested fields via
SparkUtils.getNestedField(lines 313-314)- Validates data types remain unchanged (LongType for offset, IntegerType for partition)
- Provides clear error messages referencing the dynamic field names (lines 317, 322)
This implementation aligns with the PR's objective of supporting configurable Kafka struct field names.
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (2)
110-113: LGTM: Configurable Kafka metadata field names with sensible defaults.The introduction of
CUSTOM_KAFKA_COLUMN_KEYandKEY_COLUMN_KEYas public constants enables configuration reuse across components. The defaults ("kafka" and "kafka_key") provide a good out-of-box experience.
118-119: LGTM: Offset info uses configurable field name.Using
kafkaColumnNameingetOffsetInfoensures offset management tracks the user-configured struct field, maintaining consistency with the dynamic field naming approach used throughout the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (1)
143-148: Consider using the exact field name from schema for clarity.While Spark's
drop()is case-insensitive by default, the code would be more robust and self-documenting if it retrieved the actual field name from the schema:- val dfWithoutKafkaField = if (df.schema.fields.exists(_.name.equalsIgnoreCase(kafkaColumnName))) { + val dfWithoutKafkaField = df.schema.fields.find(_.name.equalsIgnoreCase(kafkaColumnName)) match { + case Some(field) => - log.warn(s"Dropping '$kafkaColumnName' field from the input DataFrame before sending to Kafka because the output topic has its own metadata.") - df.drop(kafkaColumnName) - } else { + log.warn(s"Dropping '${field.name}' field from the input DataFrame before sending to Kafka because the output topic has its own metadata.") + df.drop(field.name) + case None => - df + df }This avoids relying on Spark's case-sensitivity configuration and makes the intent explicit.
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (1)
227-234: LGTM! Final DataFrame transformation correctly handles conflicts and ordering.The logic properly:
- Flattens the payload
- Drops conflicting fields from payload
- Renames temporary Kafka columns to configured names
- Ensures Kafka metadata appears at the end of the schema
Minor suggestion: Update the comment on lines 227-228 to reference the configured column names rather than hardcoded names:
- // Put data fields to the root level of the schema, and if data struct already has kafka_key and kafka fields, - // drop them + // Put data fields to the root level of the schema, and if data struct already has fields + // matching the configured Kafka metadata column names, drop them
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala(4 hunks)pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (4)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala (7)
pramen(66-66)pramen(68-78)pramen(80-86)pramen(88-98)pramen(100-118)pramen(120-138)write(39-64)pramen/api/src/main/scala/za/co/absa/pramen/api/SinkResult.scala (1)
SinkResult(19-24)pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (2)
KafkaAvroSource(107-240)KafkaAvroSource(242-253)pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)
ConfigUtils(24-205)getOptionString(43-49)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (3)
pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala (2)
OffsetValue(34-154)KafkaValue(80-125)pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)
ConfigUtils(24-205)getOptionString(43-49)pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetInfo.scala (1)
OffsetInfo(19-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
- GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
- GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
- GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
- GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
- GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
- GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (12)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala (4)
21-21: LGTM! Imports are properly used.The new imports support logging and configuration-driven Kafka column handling.
Also applies to: 24-25
43-45: LGTM! Documentation is clear and accurate.The documentation properly describes the configurable Kafka metadata column and its handling behavior.
123-124: LGTM! Configuration initialization follows best practices.The logger and configurable column name are properly initialized with a sensible default.
150-150: LGTM! Write logic correctly uses the modified DataFrame.The implementation properly writes the DataFrame with Kafka metadata removed.
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (8)
21-22: LGTM! Imports support the new struct-based metadata handling.The imports are properly used throughout the implementation.
Also applies to: 25-25
111-111: LGTM! Initialization follows best practices.The logger and configurable column names are properly initialized with sensible defaults.
Also applies to: 113-113, 115-116
121-121: LGTM! Offset column name is now configurable.The change aligns with the PR objective to make Kafka metadata configurable. Since the offset column is now a struct, ensure that the offset management system (OffsetManagerUtils) properly handles nested field access for partition and offset values.
49-53: LGTM! Documentation is clear and addresses previous feedback.The documentation properly describes both configurable parameters.
202-207: LGTM! Kafka metadata properly grouped into struct.The implementation correctly groups partition, offset, timestamp, and timestamp_type into a single struct, using a temporary name to avoid conflicts during transformation.
214-217: LGTM! Key column handling supports both Avro and raw formats.The implementation correctly handles both scenarios using a consistent temporary naming pattern.
219-225: LGTM! Excellent defensive programming with conflict warnings.The payload field conflict detection and warnings help users understand when their data fields will be replaced by Kafka metadata, directly addressing the PR objectives.
244-245: LGTM! New configuration constants promote consistency.The public constants centralize configuration key definitions and enable safe sharing with other components like KafkaAvroSink.
Closes #648
Summary by CodeRabbit
New Features
Documentation