Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'delta-io/master' into metrics-7
Browse files Browse the repository at this point in the history
allisonport-db committed Jan 30, 2025

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
2 parents 952b245 + f32539d commit 7e52e17
Showing 94 changed files with 3,322 additions and 1,239 deletions.
4 changes: 3 additions & 1 deletion PROTOCOL.md
Original file line number Diff line number Diff line change
@@ -488,6 +488,8 @@ That means specifically that for any commit…
- it is **legal** for the same `path` to occur in an `add` action and a `remove` action, but with two different `dvId`s.
- it is **legal** for the same `path` to be added and/or removed and also occur in a `cdc` action.
- it is **illegal** for the same `path` to be occur twice with different `dvId`s within each set of `add` or `remove` actions.
- it is **illegal** for a `path` to occur in an `add` action that already occurs with a different `dvId` in the list of `add` actions from the snapshot of the version immediately preceeding the commit, unless the commit also contains a remove for the later combination.
- it is **legal** to commit an existing `path` and `dvId` combination again (this allows metadata updates).

The `dataChange` flag on either an `add` or a `remove` can be set to `false` to indicate that an action when combined with other actions in the same atomic version only rearranges existing data or adds new statistics.
For example, streaming queries that are tailing the transaction log can use this flag to skip actions that would not affect the final results.
@@ -825,7 +827,7 @@ A given snapshot of the table can be computed by replaying the events committed
- A single `metaData` action
- A collection of `txn` actions with unique `appId`s
- A collection of `domainMetadata` actions with unique `domain`s.
- A collection of `add` actions with unique `(path, deletionVector.uniqueId)` keys.
- A collection of `add` actions with unique path keys, corresponding to the newest (path, deletionVector.uniqueId) pair encountered for each path.
- A collection of `remove` actions with unique `(path, deletionVector.uniqueId)` keys. The intersection of the primary keys in the `add` collection and `remove` collection must be empty. That means a logical file cannot exist in both the `remove` and `add` collections at the same time; however, the same *data file* can exist with *different* DVs in the `remove` collection, as logically they represent different content. The `remove` actions act as _tombstones_, and only exist for the benefit of the VACUUM command. Snapshot reads only return `add` actions on the read path.

To achieve the requirements above, related actions from different delta files need to be reconciled with each other:
2 changes: 1 addition & 1 deletion build/sbt-config/repositories
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
local
local-preloaded-ivy: file:///${sbt.preloaded-${sbt.global.base-${user.home}/.sbt}/preloaded/}, [organization]/[module]/[revision]/[type]s/[artifact](-[classifier]).[ext]
local-preloaded: file:///${sbt.preloaded-${sbt.global.base-${user.home}/.sbt}/preloaded/}
gcs-maven-central-mirror: https://maven-central.storage-download.googleapis.com/repos/central/data/
gcs-maven-central-mirror: https://maven-central.storage-download.googleapis.com/maven2/
maven-central
typesafe-ivy-releases: https://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sbt-ivy-snapshots: https://repo.scala-sbt.org/scalasbt/ivy-snapshots/, [organization]/[module]/[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
50 changes: 30 additions & 20 deletions build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
@@ -36,42 +36,52 @@ dlog () {
[[ $debug ]] && echoerr "$@"
}

download_sbt () {
local url=$1
local output=$2
local temp_file="${output}.part"

if [ $(command -v curl) ]; then
curl --fail --location --silent ${url} > "${temp_file}" &&\
mv "${temp_file}" "${output}"
elif [ $(command -v wget) ]; then
wget --quiet ${url} -O "${temp_file}" &&\
mv "${temp_file}" "${output}"
else
printf "You do not have curl or wget installed, unable to downlaod ${url}\n"
exit -1
fi
}


acquire_sbt_jar () {
SBT_VERSION=`awk -F "=" '/sbt\.version/ {print $2}' ./project/build.properties`

# Download sbt from mirror URL if the environment variable is provided
# Set primary and fallback URLs
if [[ "${SBT_VERSION}" == "0.13.18" ]] && [[ -n "${SBT_MIRROR_JAR_URL}" ]]; then
URL1="${SBT_MIRROR_JAR_URL}"
elif [[ "${SBT_VERSION}" == "1.5.5" ]] && [[ -n "${SBT_1_5_5_MIRROR_JAR_URL}" ]]; then
URL1="${SBT_1_5_5_MIRROR_JAR_URL}"
else
URL1=${DEFAULT_ARTIFACT_REPOSITORY:-https://repo1.maven.org/maven2/}org/scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch-${SBT_VERSION}.jar
URL1=${DEFAULT_ARTIFACT_REPOSITORY:-https://maven-central.storage-download.googleapis.com/maven2/}org/scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch-${SBT_VERSION}.jar
fi
BACKUP_URL="https://repo1.maven.org/maven2/org/scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch-${SBT_VERSION}.jar"

JAR=build/sbt-launch-${SBT_VERSION}.jar
sbt_jar=$JAR

if [[ ! -f "$sbt_jar" ]]; then
# Download sbt launch jar if it hasn't been downloaded yet
if [ ! -f "${JAR}" ]; then
# Download
printf 'Attempting to fetch sbt from %s\n' "${URL1}"
JAR_DL="${JAR}.part"
if [ $(command -v curl) ]; then
curl --fail --location --silent ${URL1} > "${JAR_DL}" &&\
mv "${JAR_DL}" "${JAR}"
elif [ $(command -v wget) ]; then
wget --quiet ${URL1} -O "${JAR_DL}" &&\
mv "${JAR_DL}" "${JAR}"
else
printf "You do not have curl or wget installed, please install sbt manually from https://www.scala-sbt.org/\n"
exit -1
fi
download_sbt "${URL1}" "${JAR}"

if [[ ! -f "${JAR}" ]]; then
printf 'Download from %s failed. Retrying from %s\n' "${URL1}" "${BACKUP_URL}"
download_sbt "${BACKUP_URL}" "${JAR}"
fi
if [ ! -f "${JAR}" ]; then
# We failed to download
printf "Our attempt to download sbt locally to ${JAR} failed. Please install sbt manually from https://www.scala-sbt.org/\n"
exit -1

if [[ ! -f "${JAR}" ]]; then
printf "Failed to download sbt. Please install sbt manually from https://www.scala-sbt.org/\n"
exit 1
fi
printf "Launching sbt from ${JAR}\n"
fi
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ class KernelDeltaLogDelegator(
kernelSnapshotWrapper,
hadoopConf,
logPath,
kernelSnapshot.getVersion(engine), // note: engine isn't used
kernelSnapshot.getVersion(),
this,
standaloneDeltaLog
))
Original file line number Diff line number Diff line change
@@ -73,9 +73,7 @@ public Metadata getMetadata() {
*/
@Override
public long getVersion() {
// WARNING: getVersion in SnapshotImpl currently doesn't use the engine so we can
// pass null, but if this changes this code could break
return kernelSnapshot.getVersion(null);
return kernelSnapshot.getVersion();
}

/**
15 changes: 5 additions & 10 deletions docs/source/delta-kernel-java.md
Original file line number Diff line number Diff line change
@@ -99,14 +99,14 @@ Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine);
Now that we have a consistent snapshot view of the table, we can query more details about the table. For example, you can get the version and schema of this snapshot.

```java
long version = mySnapshot.getVersion(myEngine);
StructType tableSchema = mySnapshot.getSchema(myEngine);
long version = mySnapshot.getVersion();
StructType tableSchema = mySnapshot.getSchema();
```

Next, to read the table data, we have to *build* a [`Scan`](https://delta-io.github.io/delta/snapshot/kernel-api/java/io/delta/kernel/Scan.html) object. In order to build a `Scan` object, create a [`ScanBuilder`](https://delta-io.github.io/delta/snapshot/kernel-api/java/io/delta/kernel/ScanBuilder.html) object which optionally allows selecting a subset of columns to read or setting a query filter. For now, ignore these optional settings.

```java
Scan myScan = mySnapshot.getScanBuilder(myEngine).build()
Scan myScan = mySnapshot.getScanBuilder().build()

// Common information about scanning for all data files to read.
Row scanState = myScan.getScanState(myEngine)
@@ -224,9 +224,7 @@ Predicate filter = new Predicate(
"=",
Arrays.asList(new Column("columnX"), Literal.ofInt(1)));

Scan myFilteredScan = mySnapshot.buildScan(engine)
.withFilter(myEngine, filter)
.build()
Scan myFilteredScan = mySnapshot.getScanBuilder().withFilter(filter).build()

// Subset of the given filter that is not guaranteed to be satisfied by
// Delta Kernel when it returns data. This filter is used by Delta Kernel
@@ -845,10 +843,7 @@ import io.delta.kernel.types.*;
StructType readSchema = ... ; // convert engine schema
Predicate filterExpr = ... ; // convert engine filter expression

Scan myScan = mySnapshot.buildScan(engine)
.withFilter(myEngine, filterExpr)
.withReadSchema(myEngine, readSchema)
.build()
Scan myScan = mySnapshot.getScanBuilder().withFilter(filterExpr).withReadSchema(readSchema).build();

```

4 changes: 2 additions & 2 deletions docs/source/delta-kernel.md
Original file line number Diff line number Diff line change
@@ -22,8 +22,8 @@ Here is an example of a simple table scan with a filter:
Engine myEngine = DefaultEngine.create() ; // define a engine (more details below)
Table myTable = Table.forPath("/delta/table/path"); // define what table to scan
Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine); // define which version of table to scan
Scan myScan = mySnapshot.getScanBuilder(myEngine) // specify the scan details
.withFilters(myEngine, scanFilter)
Scan myScan = mySnapshot.getScanBuilder() // specify the scan details
.withFilters(scanFilter)
.build();
CloseableIterator<ColumnarBatch> physicalData = // read the Parquet data files
.. read from Parquet data files ...
13 changes: 12 additions & 1 deletion docs/source/delta-storage.md
Original file line number Diff line number Diff line change
@@ -133,6 +133,17 @@ This mode supports concurrent writes to S3 from multiple clusters and has to be
#### Requirements (S3 multi-cluster)
- All of the requirements listed in [_](#requirements-s3-single-cluster) section
- In additon to S3 credentials, you also need DynamoDB operating permissions
- To ensure proper coordination across clusters, it's crucial to maintain consistency in how tables are referenced. Always use the same scheme (e.g., all cluster refer the table path with either s3a:// or s3:// but not a combination of the two) and maintain case sensitivity when referring to a table from different clusters. For example, use s3a://mybucket/mytable consistently across all clusters. This consistency is vital because DynamoDB relies on the table path as a key to achieve put-if-absent semantics, and inconsistent references can lead to coordination issues. If the table is registered in a catalog, verify (use `DESCRIBE FORMATTED` or other equivalent commands) that the registered path matches the path used for writes from other clusters. By adhering to these guidelines, you can minimize the risk of coordination problems and ensure smooth operation across multiple clusters.
- In case the table should be referred using `s3` scheme Delta-Spark connector, following configs are needed:


```
"spark.delta.logStore.s3.impl" = "io.delta.storage.S3DynamoDBLogStore"
"spark.io.delta.storage.S3DynamoDBLogStore.ddb.region" = "<region>"
"spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName" = "<dynamodb_table_name>"
"spark.hadoop.fs.s3.impl"="org.apache.hadoop.fs.s3a.S3AFileSystem"
# and any other config key name that has `s3a` in it should be changed to `s3`
```

#### Quickstart (S3 multi-cluster)

@@ -554,4 +565,4 @@ spark.read.format("delta").load("cos://<your-cos-bucket>.service/<path-to-delta-
```

.. <Delta> replace:: Delta Lake
.. <AS> replace:: Apache Spark
.. <AS> replace:: Apache Spark
Original file line number Diff line number Diff line change
@@ -67,9 +67,9 @@ class HudiConverter(spark: SparkSession)
// Save an atomic reference of the snapshot being converted, and the txn that triggered
// resulted in the specified snapshot
protected val currentConversion =
new AtomicReference[(Snapshot, OptimisticTransactionImpl)]()
new AtomicReference[(Snapshot, DeltaTransaction)]()
protected val standbyConversion =
new AtomicReference[(Snapshot, OptimisticTransactionImpl)]()
new AtomicReference[(Snapshot, DeltaTransaction)]()

// Whether our async converter thread is active. We may already have an alive thread that is
// about to shutdown, but in such cases this value should return false.
@@ -88,7 +88,7 @@ class HudiConverter(spark: SparkSession)
*/
override def enqueueSnapshotForConversion(
snapshotToConvert: Snapshot,
txn: OptimisticTransactionImpl): Unit = {
txn: DeltaTransaction): Unit = {
if (!UniversalFormat.hudiEnabled(snapshotToConvert.metadata)) {
return
}
@@ -138,7 +138,7 @@ class HudiConverter(spark: SparkSession)
}

// Get a snapshot to convert from the hudiQueue. Sets the queue to null after.
private def getNextSnapshot: (Snapshot, OptimisticTransactionImpl) =
private def getNextSnapshot: (Snapshot, DeltaTransaction) =
asyncThreadLock.synchronized {
val potentialSnapshotAndTxn = standbyConversion.get()
currentConversion.set(potentialSnapshotAndTxn)
@@ -189,7 +189,7 @@ class HudiConverter(spark: SparkSession)
* @return Converted Delta version and commit timestamp
*/
override def convertSnapshot(
snapshotToConvert: Snapshot, txn: OptimisticTransactionImpl): Option[(Long, Long)] = {
snapshotToConvert: Snapshot, txn: DeltaTransaction): Option[(Long, Long)] = {
if (!UniversalFormat.hudiEnabled(snapshotToConvert.metadata)) {
return None
}
@@ -207,7 +207,7 @@ class HudiConverter(spark: SparkSession)
*/
private def convertSnapshot(
snapshotToConvert: Snapshot,
txnOpt: Option[OptimisticTransactionImpl],
txnOpt: Option[DeltaTransaction],
catalogTable: Option[CatalogTable]): Option[(Long, Long)] =
recordFrameProfile("Delta", "HudiConverter.convertSnapshot") {
val log = snapshotToConvert.deltaLog
Original file line number Diff line number Diff line change
@@ -119,7 +119,8 @@ object IcebergPartitionUtil {
def getPartitionFields(partSpec: PartitionSpec, schema: Schema): Seq[StructField] = {
// Skip removed partition fields due to partition evolution.
partSpec.fields.asScala.toSeq.collect {
case partField if !partField.transform().isInstanceOf[VoidTransform[_]] =>
case partField if !partField.transform().isInstanceOf[VoidTransform[_]] &&
!partField.transform().isInstanceOf[Bucket[_]] =>
val sourceColumnName = schema.findColumnName(partField.sourceId())
val sourceField = schema.findField(partField.sourceId())
val sourceType = sourceField.`type`()
@@ -190,4 +191,8 @@ object IcebergPartitionUtil {
*/
private def icebergNumericTruncateExpression(colName: String, width: Long): String =
s"$colName - (($colName % $width) + $width) % $width"

def hasBucketPartition(partSpec: PartitionSpec): Boolean = {
partSpec.fields.asScala.toSeq.exists(spec => spec.transform().isInstanceOf[Bucket[_]])
}
}
Original file line number Diff line number Diff line change
@@ -31,7 +31,8 @@ import org.apache.spark.sql.types.StructType
class IcebergFileManifest(
spark: SparkSession,
table: Table,
partitionSchema: StructType) extends ConvertTargetFileManifest with LoggingShims {
partitionSchema: StructType,
convertStats: Boolean = true) extends ConvertTargetFileManifest with LoggingShims {

// scalastyle:off sparkimplicits
import spark.implicits._
@@ -106,6 +107,8 @@ class IcebergFileManifest(
null
}

val shouldConvertStats = convertStats

val manifestFiles = localTable
.currentSnapshot()
.dataManifests(localTable.io())
@@ -125,12 +128,16 @@ class IcebergFileManifest(
),
partitionValues = if (shouldConvertPartition) {
Some(convertPartition.toDelta(dataFile.partition()))
} else None,
stats = if (shouldConvertStats) {
IcebergStatsUtils.icebergStatsToDelta(localTable.schema, dataFile)
} else None
)
}
.cache()
}


override def close(): Unit = {
fileSparkResults.map(_.unpersist())
fileSparkResults = None
Original file line number Diff line number Diff line change
@@ -55,7 +55,8 @@ object IcebergPartitionConverter {
def physicalNameToPartitionField(
table: Table, partitionSchema: StructType): Map[String, PartitionField] =
table.spec().fields().asScala.collect {
case field if field.transform().toString != "void" =>
case field if field.transform().toString != "void" &&
!field.transform().toString.contains("bucket") =>
DeltaColumnMapping.getPhysicalName(partitionSchema(field.name)) -> field
}.toMap
}
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import org.apache.spark.sql.delta.DeltaColumnMapping
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.iceberg.Schema
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.types.TypeUtil

import org.apache.spark.sql.types.{MetadataBuilder, StructType}

@@ -29,12 +30,18 @@ object IcebergSchemaUtils {
* Given an iceberg schema, convert it to a Spark schema. This conversion will keep the Iceberg
* column IDs (used to read Parquet files) in the field metadata
*
* @param icebergSchema
* @return StructType for the converted schema
* @param icebergSchema Iceberg schema
* @param castTimeType cast Iceberg TIME type to Spark Long
* @return Spark schema converted from Iceberg schema
*/
def convertIcebergSchemaToSpark(icebergSchema: Schema): StructType = {
def convertIcebergSchemaToSpark(icebergSchema: Schema,
castTimeType: Boolean = false): StructType = {
// Convert from Iceberg schema to Spark schema but without the column IDs
val baseConvertedSchema = SparkSchemaUtil.convert(icebergSchema)
val baseConvertedSchema = if (castTimeType) {
TypeUtil.visit(icebergSchema, new TypeToSparkTypeWithCustomCast()).asInstanceOf[StructType]
} else {
SparkSchemaUtil.convert(icebergSchema)
}

// For each field, find the column ID (fieldId) and add to the StructField metadata
SchemaMergingUtils.transformColumns(baseConvertedSchema) { (path, field, _) =>
Loading

0 comments on commit 7e52e17

Please sign in to comment.