Skip to content

Commit df51328

Browse files
authored
Feature/hudi scd2 writer (#297)
SCD2 HUDI writer
1 parent 0db8d47 commit df51328

File tree

71 files changed

+871
-472
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+871
-472
lines changed

.sonarcloud.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
#
15+
16+
sonar.cpd.exclusions=**/TestDeltaCDCToSCD2Writer.scala,**/TestHudiCDCToSCD2Writer.scala

README.md

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ The data ingestion pipeline of Hyperdrive consists of four components: readers,
4646
- `KafkaStreamWriter` - writes to a Kafka topic.
4747
- `DeltaCDCToSnapshotWriter` - writes the DataFrame in Delta format. It expects CDC events and performs merge logic and creates the latest snapshot table.
4848
- `DeltaCDCToSCD2Writer` - writes the DataFrame in Delta format. It expects CDC events and performs merge logic and creates SCD2 table.
49+
- `HudiCDCToSCD2Writer` - writes the DataFrame in Hudi format. It expects CDC events and performs merge logic and creates SCD2 table.
4950

5051
### Custom components
5152
Custom components can be implemented using the [Component Archetype](component-archetype) following the API defined in the package `za.co.absa.hyperdrive.ingestor.api`
@@ -345,7 +346,7 @@ Any additional properties for the `DataStreamWriter` can be added with the prefi
345346

346347
**Example**
347348

348-
- `component.writer=za.co.absa.hyperdrive.compatibility.impl.writer.delta.snapshot.DeltaCDCToSnapshotWriter`
349+
- `component.writer=za.co.absa.hyperdrive.compatibility.impl.writer.cdc.delta.snapshot.DeltaCDCToSnapshotWriter`
349350
- `writer.deltacdctosnapshot.destination.directory=/tmp/destination`
350351
- `writer.deltacdctosnapshot.key.column=key`
351352
- `writer.deltacdctosnapshot.operation.column=ENTTYP`
@@ -370,7 +371,7 @@ Any additional properties for the `DataStreamWriter` can be added with the prefi
370371
Any additional properties for the `DataStreamWriter` can be added with the prefix `writer.deltacdctoscd2.options`, e.g. `writer.deltacdctoscd2.options.key=value`
371372

372373
**Example**
373-
- `component.writer=za.co.absa.hyperdrive.compatibility.impl.writer.delta.scd2.DeltaCDCToSCD2Writer`
374+
- `component.writer=za.co.absa.hyperdrive.compatibility.impl.writer.cdc.delta.scd2.DeltaCDCToSCD2Writer`
374375
- `writer.deltacdctoscd2.destination.directory=/tmp/destination`
375376
- `writer.deltacdctoscd2.key.column=key`
376377
- `writer.deltacdctoscd2.timestamp.column=TIMSTAMP`
@@ -379,6 +380,32 @@ Any additional properties for the `DataStreamWriter` can be added with the prefi
379380
- `writer.deltacdctoscd2.precombineColumns=ENTTYP`
380381
- `writer.deltacdctoscd2.precombineColumns.customOrder.ENTTYP=PT,FI,RR,UB,UP,DL,FD`
381382

383+
##### HudiCDCToSCD2Writer
384+
| Property Name | Required | Description |
385+
|:------------------------------------------------------| :---: |:---------------------------------------------------------------------------------------------------------------------------------------------------------|
386+
| `writer.hudicdctoscd2.destination.directory` | Yes | Destination path of the sink. Equivalent to Spark property `path` for the `DataStreamWriter` |
387+
| `writer.hudicdctoscd2.partition.columns` | No | Comma-separated list of columns to partition by. |
388+
| `writer.hudicdctoscd2.key.column` | Yes | A column with unique entity identifier. |
389+
| `writer.hudicdctoscd2.timestamp.column` | Yes | A column with timestamp. |
390+
| `writer.hudicdctoscd2.operation.column` | Yes | A column containing value marking a record with an operation. |
391+
| `writer.hudicdctoscd2.operation.deleted.values` | Yes | Values marking a record for deletion in the operation column. |
392+
| `writer.hudicdctoscd2.precombineColumns` | Yes | When two records have the same key and timestamp value, we will pick the one with the largest value for precombine columns. Evaluated in provided order. |
393+
| `writer.hudicdctoscd2.precombineColumns.customOrder` | No | Precombine column's custom order in ascending order. |
394+
| `writer.common.trigger.type` | No | See [Combination writer properties](#common-writer-properties) |
395+
| `writer.common.trigger.processing.time` | No | See [Combination writer properties](#common-writer-properties) |
396+
397+
Any additional properties for the `DataStreamWriter` can be added with the prefix `writer.hudicdctoscd2.options`, e.g. `writer.hudicdctoscd2.options.key=value`
398+
399+
**Example**
400+
- `component.writer=za.co.absa.hyperdrive.compatibility.impl.writer.cdc.hudi.scd2.HudiCDCToSCD2Writer`
401+
- `writer.hudicdctoscd2.destination.directory=/tmp/destination`
402+
- `writer.hudicdctoscd2.key.column=key`
403+
- `writer.hudicdctoscd2.timestamp.column=TIMSTAMP`
404+
- `writer.hudicdctoscd2.operation.column=ENTTYP`
405+
- `writer.hudicdctoscd2.operation.deleted.values=DL,FD`
406+
- `writer.hudicdctoscd2.precombineColumns=ENTTYP`
407+
- `writer.hudicdctoscd2.precombineColumns.customOrder.ENTTYP=PT,FI,RR,UB,UP,DL,FD`
408+
382409
#### Common writer properties
383410

384411
| Property Name | Required |Description |

compatibility_spark-3/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,10 @@
6060
<groupId>io.delta</groupId>
6161
<artifactId>delta-core_${scala.compat.version}</artifactId>
6262
</dependency>
63+
<dependency>
64+
<groupId>org.apache.hudi</groupId>
65+
<artifactId>hudi-spark3.2-bundle_${scala.compat.version}</artifactId>
66+
<scope>provided</scope>
67+
</dependency>
6368
</dependencies>
6469
</project>
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
* limitations under the License.
1414
*/
1515

16-
package za.co.absa.hyperdrive.compatibility.impl.writer.delta.scd2
16+
package za.co.absa.hyperdrive.compatibility.impl.writer.cdc
1717

1818
import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterCommonAttributes
1919
import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
2020

21-
trait DeltaCDCToSCD2WriterAttributes extends HasComponentAttributes {
22-
private val rootFactoryConfKey = "writer.deltacdctoscd2"
21+
trait CDCToSCD2WriterAttributes extends HasComponentAttributes {
22+
val rootFactoryConfKey: String
2323
val KEY_DESTINATION_DIRECTORY = s"$rootFactoryConfKey.destination.directory"
2424
val KEY_EXTRA_CONFS_ROOT = s"$rootFactoryConfKey.options"
2525
val KEY_PARTITION_COLUMNS = s"$rootFactoryConfKey.partition.columns"
@@ -30,9 +30,9 @@ trait DeltaCDCToSCD2WriterAttributes extends HasComponentAttributes {
3030
val KEY_PRECOMBINE_COLUMNS = s"$rootFactoryConfKey.precombineColumns"
3131
val KEY_PRECOMBINE_COLUMNS_CUSTOM_ORDER = s"$rootFactoryConfKey.precombineColumns.customOrder"
3232

33-
override def getName: String = "Delta Stream Writer"
33+
override def getName: String
3434

35-
override def getDescription: String = "This writer saves ingested data in Delta format on a filesystem (e.g. HDFS)"
35+
override def getDescription: String
3636

3737
override def getProperties: Map[String, PropertyMetadata] = Map(
3838
KEY_DESTINATION_DIRECTORY -> PropertyMetadata("Destination directory", Some("A path to a directory"), required = true),
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.compatibility.impl.writer.cdc
17+
18+
import org.apache.hadoop.fs.FileSystem
19+
import org.apache.spark.sql.expressions.Window
20+
import org.apache.spark.sql.functions._
21+
import org.apache.spark.sql.types.{BooleanType, StructField, StructType, TimestampType}
22+
import org.apache.spark.sql.{DataFrame, SparkSession}
23+
import za.co.absa.hyperdrive.shared.utils.FileUtils
24+
25+
import java.net.URI
26+
27+
object CDCUtil {
28+
29+
private val StartDateColumn = "_start_date"
30+
private val EndDateColumn = "_end_date"
31+
private val IsCurrentColumn = "_is_current"
32+
private val IsOldDataColumn = "_is_old_data"
33+
private val SortFieldPrefix = "_tmp_hyperdrive_"
34+
private val OldData = "_old_data"
35+
private val NewData = "_new_data"
36+
private val SortFieldCustomOrderColumn = "_tmp_hyperdrive_sort_field_custom_order_"
37+
38+
private[hyperdrive] case class SCD2Fields(keyColumn: String,
39+
timestampColumn: String,
40+
operationColumn: String,
41+
operationDeleteValues: Seq[String],
42+
precombineColumns: Seq[String],
43+
precombineColumnsCustomOrder: Map[String, Seq[String]])
44+
45+
private[hyperdrive] def getStagedDataForSCD2(history: DataFrame, input: DataFrame, scd2Fields: SCD2Fields): DataFrame = {
46+
val uniqueChangesForEachKeyAndTimestamp = removeDuplicates(input, scd2Fields)
47+
val previousEvents = getPreviousEvents(history, uniqueChangesForEachKeyAndTimestamp, scd2Fields)
48+
val nextEvents = getNextEvents(history, uniqueChangesForEachKeyAndTimestamp, scd2Fields)
49+
50+
val union = previousEvents.union(nextEvents).distinct().union(
51+
uniqueChangesForEachKeyAndTimestamp
52+
.withColumn(StartDateColumn, col(scd2Fields.timestampColumn))
53+
.withColumn(EndDateColumn, lit(null))
54+
.withColumn(IsCurrentColumn, lit(false))
55+
.withColumn(IsOldDataColumn, lit(false))
56+
.selectExpr(
57+
Seq(StartDateColumn, EndDateColumn, IsCurrentColumn) ++
58+
uniqueChangesForEachKeyAndTimestamp.columns ++
59+
Seq(IsOldDataColumn): _*
60+
)
61+
)
62+
63+
val uniqueEvents = removeDuplicates(union, scd2Fields)
64+
setSCD2Fields(uniqueEvents, scd2Fields).drop(IsOldDataColumn)
65+
}
66+
67+
private[hyperdrive] def getDataFrameWithSortColumns(dataFrame: DataFrame, sortFieldsPrefix: String, precombineColumns: Seq[String], precombineColumnsCustomOrder: Map[String, Seq[String]]): DataFrame = {
68+
precombineColumns.foldLeft(dataFrame) { (df, precombineColumn) =>
69+
val order = precombineColumnsCustomOrder.getOrElse(precombineColumn, Seq.empty[String])
70+
order match {
71+
case o if o.isEmpty =>
72+
df.withColumn(s"$sortFieldsPrefix$precombineColumn", col(precombineColumn))
73+
case o =>
74+
df
75+
.withColumn(SortFieldCustomOrderColumn, lit(o.toArray))
76+
.withColumn(
77+
s"$sortFieldsPrefix$precombineColumn",
78+
expr(s"""array_position($SortFieldCustomOrderColumn,$precombineColumn)""")
79+
).drop(SortFieldCustomOrderColumn)
80+
}
81+
}
82+
}
83+
84+
private[hyperdrive] def getSchemaWithSCD2Fields(input: DataFrame): StructType = {
85+
StructType(
86+
Seq(
87+
StructField(StartDateColumn, TimestampType, nullable = false),
88+
StructField(EndDateColumn, TimestampType, nullable = true),
89+
StructField(IsCurrentColumn, BooleanType, nullable = false)
90+
).toArray ++ input.schema.fields
91+
)
92+
}
93+
94+
private[hyperdrive] def isDirEmptyOrDoesNotExist(spark: SparkSession, destination: String): Boolean = {
95+
implicit val fs: FileSystem = FileSystem.get(new URI(destination), spark.sparkContext.hadoopConfiguration)
96+
if (FileUtils.exists(destination)) {
97+
if (FileUtils.isDirectory(destination)) {
98+
FileUtils.isEmpty(destination)
99+
} else {
100+
false
101+
}
102+
} else {
103+
true
104+
}
105+
}
106+
107+
private def removeDuplicates(input: DataFrame, scd2Fields: SCD2Fields): DataFrame = {
108+
val dataFrameWithSortColumns = getDataFrameWithSortColumns(input, SortFieldPrefix, scd2Fields.precombineColumns, scd2Fields.precombineColumnsCustomOrder)
109+
val sortColumnsWithPrefix = dataFrameWithSortColumns.schema.fieldNames.filter(_.startsWith(SortFieldPrefix))
110+
val window = Window
111+
.partitionBy(s"${scd2Fields.keyColumn}", s"${scd2Fields.timestampColumn}")
112+
.orderBy(sortColumnsWithPrefix.map(col(_).desc): _*)
113+
dataFrameWithSortColumns
114+
.withColumn("rank", row_number().over(window))
115+
.where("rank == 1")
116+
.drop("rank")
117+
.drop(sortColumnsWithPrefix: _*)
118+
}
119+
120+
private def getPreviousEvents(history: DataFrame, uniqueChangesForEachKeyAndTimestamp: DataFrame, scd2Fields: SCD2Fields): DataFrame = {
121+
history.as(OldData).join(
122+
uniqueChangesForEachKeyAndTimestamp.as(NewData),
123+
col(s"$NewData.${scd2Fields.keyColumn}").equalTo(col(s"$OldData.${scd2Fields.keyColumn}"))
124+
.and(col(s"$NewData.${scd2Fields.timestampColumn}").>=(col(s"$OldData.$StartDateColumn")))
125+
.and(col(s"$NewData.${scd2Fields.timestampColumn}").<=(col(s"$OldData.$EndDateColumn")))
126+
.or(
127+
col(s"$NewData.${scd2Fields.keyColumn}").equalTo(col(s"$OldData.${scd2Fields.keyColumn}"))
128+
.and(col(s"$NewData.${scd2Fields.timestampColumn}").>=(col(s"$OldData.$StartDateColumn")))
129+
.and(col(s"$OldData.$IsCurrentColumn").equalTo(true))
130+
)
131+
).select(s"$OldData.*").withColumn(s"$IsOldDataColumn", lit(true))
132+
}
133+
134+
private def getNextEvents(history: DataFrame, uniqueChangesForEachKeyAndTimestamp: DataFrame, scd2Fields: SCD2Fields): DataFrame = {
135+
val window = Window
136+
.partitionBy(col(s"$OldData.${scd2Fields.keyColumn}"), col(s"$NewData.${scd2Fields.timestampColumn}"))
137+
.orderBy(col(s"$OldData.$StartDateColumn").asc, col(s"$OldData.${scd2Fields.timestampColumn}").asc)
138+
139+
history.as(OldData).join(
140+
uniqueChangesForEachKeyAndTimestamp.as(NewData),
141+
col(s"$NewData.${scd2Fields.keyColumn}").equalTo(col(s"$OldData.${scd2Fields.keyColumn}"))
142+
.and(col(s"$NewData.${scd2Fields.timestampColumn}").<(col(s"$OldData.$StartDateColumn")))
143+
).select(s"$OldData.*", s"$NewData.${scd2Fields.timestampColumn}")
144+
.withColumn("rank", row_number().over(window))
145+
.where("rank == 1")
146+
.drop("rank")
147+
.select(s"$OldData.*")
148+
.withColumn(s"$IsOldDataColumn", lit(true))
149+
}
150+
151+
private def setSCD2Fields(dataFrame: DataFrame, scd2Fields: SCD2Fields): DataFrame = {
152+
val idWindowDesc = org.apache.spark.sql.expressions.Window
153+
.partitionBy(scd2Fields.keyColumn)
154+
.orderBy(col(scd2Fields.timestampColumn).desc, col(IsOldDataColumn).desc)
155+
dataFrame
156+
.withColumn(
157+
EndDateColumn,
158+
when(
159+
col(IsOldDataColumn).equalTo(true).and(
160+
lag(scd2Fields.keyColumn, 1, null).over(idWindowDesc).isNull
161+
),
162+
col(EndDateColumn)
163+
).when(
164+
col(IsOldDataColumn).equalTo(true).and(
165+
lag(IsOldDataColumn, 1, false).over(idWindowDesc).equalTo(true)
166+
),
167+
col(EndDateColumn)
168+
).otherwise(
169+
lag(StartDateColumn, 1, null).over(idWindowDesc)
170+
)
171+
)
172+
.withColumn(
173+
EndDateColumn,
174+
when(col(scd2Fields.operationColumn).isInCollection(scd2Fields.operationDeleteValues), col(StartDateColumn))
175+
.when(!col(scd2Fields.operationColumn).isInCollection(scd2Fields.operationDeleteValues), col(EndDateColumn))
176+
.otherwise(null)
177+
)
178+
.withColumn(
179+
IsCurrentColumn,
180+
when(col(EndDateColumn).isNull, lit(true)).otherwise(lit(false))
181+
)
182+
}
183+
184+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.compatibility.impl.writer.cdc.delta
17+
18+
import io.delta.tables.DeltaTable
19+
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
20+
import org.apache.spark.sql.types.StructType
21+
import org.slf4j.LoggerFactory
22+
import za.co.absa.hyperdrive.compatibility.impl.writer.cdc.CDCUtil.isDirEmptyOrDoesNotExist
23+
24+
object DeltaUtil {
25+
private val logger = LoggerFactory.getLogger(this.getClass)
26+
27+
private[hyperdrive] def createDeltaTableIfNotExists(sparkSession: SparkSession, destination: String, schema: StructType, partitionColumns: Seq[String]): Unit = {
28+
if (!DeltaTable.isDeltaTable(sparkSession, destination)) {
29+
if (isDirEmptyOrDoesNotExist(sparkSession, destination)) {
30+
logger.info(s"Destination: $destination is not a delta table. Creating new delta table.")
31+
sparkSession
32+
.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], schema)
33+
.write
34+
.format("delta")
35+
.mode(SaveMode.Overwrite)
36+
.option("overwriteSchema", "true")
37+
.partitionBy(partitionColumns: _*)
38+
.save(destination)
39+
} else {
40+
throw new IllegalArgumentException(s"Could not create new delta table. Directory $destination is not empty!")
41+
}
42+
}
43+
}
44+
}

0 commit comments

Comments
 (0)