Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 123 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,66 @@ normal batch pipelines.
- Reruns are possible for full days to remove duplicates. But for incremental sinks, such ask Kafka sink duplicates still
might happen.

#### Kafka Avro source
The Kafka Avro source allows getting data from Kafka. The source supports topics serialized as Avro records via
Confluent libraries and Schema Registry. The source supports only incremental schedule.

Here is an example of a Kafka Avro source definition:

```hocon
pramen.sources = [
{
# Define a name to reference from the pipeline:
name = "kafka_source"
factory.class = "za.co.absa.pramen.extras.source.KafkaAvroSource"


kafka {
bootstrap.servers = "mybroker1:9092,mybroker2:9092"

# Arbitrary options for the Kafka consumer/reader
sasl.jaas.config = "..."
sasl.mechanism = "..."
security.protocol = "..."
}

schema.registry {
url = "https://my.schema.registry:8081"
value.naming.strategy = "topic.name"

# Arbitrary options for Schema registry
basic.auth.credentials.source = "..."
basic.auth.user.info = "..."
ssl.truststore.location = "..."
ssl.truststore.password = "..."
ssl.truststore.type = "..."
}
}
]
```

The corresponding pipeline operation could look like this:
```hocon
pramen.operations = [
{
name = "Sourcing from a Kafka topic"
type = "ingestion"
schedule.type = "incremental"

source = "kafka_source"

info.date.expr = "@runDate"

tables = [
{
input.table = "my_kafka_topic1"
output.metastore.table = "table1"
}
]
}
]
```

### Sinks
Sinks define a way data needs to be sent to a target system. Built-in sinks include:
- Kafka sink.
Expand All @@ -937,84 +997,86 @@ Sinks define a way data needs to be sent to a target system. Built-in sinks incl
You can define your own sink by implementing `Sink` trait and providing the corresponding class name in pipeline configuration.

#### Kafka sink
A Kafka sink allows sending data from a metastore table to a Kafka topic in Avro format.
You can define all endpoint and credential options in the sink definitions. The output topic
name should be defined in the definition of the pipeline operation.
A Kafka Avro sink allows sending data from a metastore table to a Kafka topic in Avro format and Confluent Schema registry
integration. You can define all endpoint and credential options in the sink definitions. The output topic name should be defined
in the definition of the pipeline operation.

Here is an example of a Kafka sink definition:

```hocon
{
# Define a name to reference from the pipeline:
name = "kafka_avro"
factory.class = "za.co.absa.pramen.extras.sink.KafkaSink"

writer.kafka {
brokers = "mybroker1:9092,mybroker2:9092"
schema.registry.url = "https://my.schema.regictry:8081"

# Can be one of: topic.name, record.name, topic.record.name
schema.registry.value.naming.strategy = "topic.name"

# Arbitrary options for creating a Kafka Producer
option {
kafka.sasl.jaas.config = "..."
kafka.sasl.mechanism = "..."
kafka.security.protocol = "..."
# ...
pramen.sinks = [
{
# Define a name to reference from the pipeline:
name = "kafka_avro"
factory.class = "za.co.absa.pramen.extras.sink.KafkaAvroSink"

kafka {
bootstrap.servers = "mybroker1:9092,mybroker2:9092"

# Arbitrary options for creating a Kafka Producer
sasl.jaas.config = "..."
sasl.mechanism = "..."
security.protocol = "..."
}

# Arbitrary options for Schema registry
schema.registry.option {

schema.registry {
url = "https://my.schema.registry:8081"
value.naming.strategy = "topic.name"

# Arbitrary options for Schema registry
basic.auth.credentials.source = "..."
basic.auth.user.info = "..."
# ...
ssl.truststore.location = "..."
ssl.truststore.password = "..."
ssl.truststore.type = "..."
}
}
}
]
```

The corresponding pipeline operation could look like this:
<details>
<summary>Click to expand</summary>

```hocon
{
name = "Kafka sink"
type = "sink"
sink = "kafka_avro"
schedule.type = "daily"
# Optional dependencies
dependencies = [
{
tables = [ dependent_table ]
date.from = "@infoDate"
}
]
tables = [
{
input.metastore.table = metastore_table
output.topic.name = "my.topic"

# All following settings are OPTIONAL

# Date range to read the source table for. By default the job information date is used.
# But you can define an arbitrary expression based on the information date.
# More: see the section of documentation regarding date expressions, and the list of functions allowed.
date {
from = "@infoDate"
to = "@infoDate"
pramen.operations = [
{
name = "Kafka sink"
type = "sink"
sink = "kafka_avro"
schedule.type = "daily"
# Optional dependencies
dependencies = [
{
tables = [dependent_table]
date.from = "@infoDate"
}
transformations = [
{ col = "col1", expr = "lower(some_string_column)" }
],
filters = [
"some_numeric_column > 100"
]
columns = [ "col1", "col2", "col2", "some_numeric_column" ]
}
]
}
]
tables = [
{
input.metastore.table = "metastore_table"
output.topic.name = "my.topic"

# All following settings are OPTIONAL

# Date range to read the source table for. By default the job information date is used.
# But you can define an arbitrary expression based on the information date.
# More: see the section of documentation regarding date expressions, and the list of functions allowed.
date {
from = "@infoDate"
to = "@infoDate"
}
transformations = [
{col = "col1", expr = "lower(some_string_column)"}
],
filters = [
"some_numeric_column > 100"
]
columns = ["col1", "col2", "col2", "some_numeric_column"]
}
]
}
]
```
</details>

Expand Down
2 changes: 1 addition & 1 deletion pramen/api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<parent>
<groupId>za.co.absa.pramen</groupId>
<artifactId>pramen</artifactId>
<version>1.12.7-SNAPSHOT</version>
<version>1.12.8-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.offset

case class KafkaPartition(partition: Int, offset: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ object OffsetType {
val INTEGRAL_TYPE_STR = "integral"
val INTEGRAL_TYPE_ALT_STR = "number" // Alternative name for 'integral' purely for compatibility with SqlColumnType
val STRING_TYPE_STR = "string"
val KAFKA_TYPE_STR = "kafka"

case object DateTimeType extends OffsetType {
override val dataTypeString: String = DATETIME_TYPE_STR
Expand All @@ -50,11 +51,18 @@ object OffsetType {
override def getSparkCol(c: Column): Column = c.cast(SparkStringType)
}

case object KafkaType extends OffsetType {
override val dataTypeString: String = KAFKA_TYPE_STR

override def getSparkCol(c: Column): Column = c
}

def fromString(dataType: String): OffsetType = dataType match {
case DATETIME_TYPE_STR => DateTimeType
case INTEGRAL_TYPE_STR => IntegralType
case INTEGRAL_TYPE_ALT_STR => IntegralType
case STRING_TYPE_STR => StringType
case KAFKA_TYPE_STR => KafkaType
case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package za.co.absa.pramen.api.offset

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import za.co.absa.pramen.api.offset.OffsetType.{DATETIME_TYPE_STR, INTEGRAL_TYPE_STR, STRING_TYPE_STR}
import za.co.absa.pramen.api.offset.OffsetType.{DATETIME_TYPE_STR, INTEGRAL_TYPE_STR, KAFKA_TYPE_STR, STRING_TYPE_STR}

import java.time.Instant
import scala.util.control.NonFatal

sealed trait OffsetValue extends Comparable[OffsetValue] {
def dataType: OffsetType
Expand All @@ -31,6 +32,9 @@ sealed trait OffsetValue extends Comparable[OffsetValue] {
}

object OffsetValue {
val KAFKA_PARTITION_FIELD = "kafka_partition"
val KAFKA_OFFSET_FIELD = "kafka_offset"

case class DateTimeValue(t: Instant) extends OffsetValue {
override val dataType: OffsetType = OffsetType.DateTimeType

Expand Down Expand Up @@ -76,6 +80,54 @@ object OffsetValue {
}
}

case class KafkaValue(value: Seq[KafkaPartition]) extends OffsetValue {
override val dataType: OffsetType = OffsetType.KafkaType

override def valueString: String = {
val q = "\""
value.sortBy(_.partition)
.map(p => s"$q${p.partition}$q:${p.offset}")
.mkString("{", ",", "}")
}

override def getSparkLit: Column = lit(valueString)

override def compareTo(other: OffsetValue): Int = {
other match {
case otherKafka@KafkaValue(otherValue) =>
if (value.length != otherValue.length) {
throw new IllegalArgumentException(s"Cannot compare Kafka offsets with different number of partitions: ${value.length} and ${otherValue.length} ($valueString vs ${otherKafka.valueString}).")
} else {
val comparisons = value.sortBy(_.partition).zip(otherValue.sortBy(_.partition)).map { case (v1, v2) =>
if (v1.partition != v2.partition) {
throw new IllegalArgumentException(s"Cannot compare Kafka offsets with different partition numbers: ${v1.partition} and ${v2.partition} ($valueString vs ${otherKafka.valueString}).")
} else {
v1.offset.compareTo(v2.offset)
}
}
val existPositive = comparisons.exists(_ > 0)
val existNegative = comparisons.exists(_ < 0)

if (existPositive && existNegative) {
throw new IllegalArgumentException(s"Some offsets are bigger, some are smaller when comparing partitions: $valueString vs ${otherKafka.valueString}.")
} else if (existPositive) {
1
} else if (existNegative) {
-1
} else {
0
}
}
case _ => throw new IllegalArgumentException(s"Cannot compare ${dataType.dataTypeString} with ${other.dataType.dataTypeString}")
}
}

def increment: OffsetValue = {
KafkaValue(value.map(p => p.copy(offset = p.offset + 1)))
}
}


def fromString(dataType: String, value: String): Option[OffsetValue] = {
if (value == null || value.isEmpty) {
None
Expand All @@ -84,6 +136,21 @@ object OffsetValue {
case DATETIME_TYPE_STR => Some(DateTimeValue(Instant.ofEpochMilli(value.toLong)))
case INTEGRAL_TYPE_STR => Some(IntegralValue(value.toLong))
case STRING_TYPE_STR => Some(StringValue(value))
case KAFKA_TYPE_STR =>
try {
Some(KafkaValue(
value
.replaceAll("[{}\"]", "")
.split(",")
.filter(_.nonEmpty)
.map { part =>
val Array(partStr, offsetStr) = part.split(":")
KafkaPartition(partStr.toInt, offsetStr.toLong)
}.toSeq
))
} catch {
case NonFatal(ex) => throw new IllegalArgumentException(s"Unexpected Kafka offset: '$value'. Expected a JSON mapping from partition to offset.", ex)
}
case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ abstract class SqlGeneratorBase(sqlConfig: SqlConfig) extends SqlGenerator {
}

object SqlGeneratorBase {
val MAX_STRING_OFFSET_CHARACTERS = 128
val MAX_STRING_OFFSET_CHARACTERS = 512

val forbiddenCharacters = ";'\\"
val normalCharacters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_."
Expand Down
Loading