From 2ea98995372d4a3efdcb0f5dc6fee4226b35de36 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 14 Jan 2025 09:58:07 -0800 Subject: [PATCH 1/3] Add CC table properties to table config --- .../io/delta/kernel/internal/TableConfig.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java index aa82b284560..dbfaaf94a84 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java @@ -34,6 +34,46 @@ public class TableConfig { // TableConfigs // ////////////////// + /** + * The commit coordinator name for this table. If this property is not set, the table will be + * considered a file system table and commits will be done via atomically publishing the commit + * file. + */ + public static final TableConfig> COMMIT_COORDINATOR_NAME = + new TableConfig<>( + "delta.coordinatedCommits.commitCoordinator-preview", + null, /* default value */ + Optional::ofNullable, + value -> true, + "Needs to be a string.", + true); + + /** + * The configuration properties for the commit coordinator which is needed to build the commit + * coordinator client. + */ + public static final TableConfig> COMMIT_COORDINATOR_CONF = + new TableConfig<>( + "delta.coordinatedCommits.commitCoordinatorConf-preview", + null, /* default values */ + JsonUtils::parseJSONKeyValueMap, + value -> true, + "Needs to be a string-to-string map of configuration properties.", + true); + + /** + * The properties used to uniquely identify and describe this Delta table to the commit + * coordinator. + */ + public static final TableConfig> COORDINATED_COMMITS_TABLE_CONF = + new TableConfig<>( + "delta.coordinatedCommits.tableConf-preview", + null, /* default values */ + JsonUtils::parseJSONKeyValueMap, + value -> true, + "Needs to be a string-to-string map of properties.", + true); + /** * The shortest duration we have to keep logically deleted data files around before deleting them * physically. From a69ee338eb7dccbc8fb1c812bd939b72233fd13c Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 14 Jan 2025 09:58:19 -0800 Subject: [PATCH 2/3] Create TableDescriptor.java --- .../coordinatedcommits/TableDescriptor.java | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java b/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java new file mode 100644 index 00000000000..36dc3efad22 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java @@ -0,0 +1,92 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.coordinatedcommits; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.TableIdentifier; +import io.delta.kernel.annotation.Evolving; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * The complete descriptor of a Coordinated Commits (CC) Delta table, including its logPath, table + * identifier, and table CC configuration. + * + * @since 3.4.0 + */ +@Evolving +public class TableDescriptor { + + private final String logPath; + private final Optional tableId; + private final Map tableConf; + + public TableDescriptor( + String logPath, Optional tableId, Map tableConf) { + this.logPath = requireNonNull(logPath, "logPath is null"); + this.tableId = requireNonNull(tableId, "tableId is null"); + this.tableConf = requireNonNull(tableConf, "tableConf is null"); + } + + /** Returns the Delta log path of the table. */ + public String getLogPath() { + return logPath; + } + + /** Returns the optional table identifier of the table, e.g. $catalog / $schema / $tableName */ + public Optional getTableId() { + return tableId; + } + + /** + * Returns the Coordinated Commits table configuration. + * + *

This is the parsed value of the Delta table property {@link + * io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF} and represents the + * properties for describing the Delta table to the commit-coordinator. + */ + public Map getTableConf() { + return tableConf; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TableDescriptor that = (TableDescriptor) o; + return getLogPath().equals(that.getLogPath()) + && tableId.equals(that.tableId) + && getTableConf().equals(that.getTableConf()); + } + + @Override + public int hashCode() { + return Objects.hash(getLogPath(), tableId, getTableConf()); + } + + @Override + public String toString() { + return String.format( + "TableDescriptor{logPath='%s', tableId=%s, tableConf=%s}", logPath, tableId, tableConf); + } +} From 4846b7a205c5db8a98ca11709c35fca1fe521e10 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 14 Jan 2025 09:58:22 -0800 Subject: [PATCH 3/3] Create TableDescriptorSuite.scala --- .../TableDescriptorSuite.scala | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 kernel/kernel-api/src/test/scala/io/delta/kernel/coordinatedcommits/TableDescriptorSuite.scala diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/coordinatedcommits/TableDescriptorSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/coordinatedcommits/TableDescriptorSuite.scala new file mode 100644 index 00000000000..c79e53af28a --- /dev/null +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/coordinatedcommits/TableDescriptorSuite.scala @@ -0,0 +1,86 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.coordinatedcommits + +import org.scalatest.funsuite.AnyFunSuite +import io.delta.kernel.TableIdentifier +import java.util.Optional + +import scala.collection.JavaConverters._ + +class TableDescriptorSuite extends AnyFunSuite { + + test("TableDescriptor should throw NullPointerException for null constructor arguments") { + assertThrows[NullPointerException] { + new TableDescriptor(null, Optional.empty(), Map.empty[String, String].asJava) + } + assertThrows[NullPointerException] { + new TableDescriptor("/delta/logPath", null, Map.empty[String, String].asJava) + } + assertThrows[NullPointerException] { + new TableDescriptor("/delta/logPath", Optional.empty(), null) + } + } + + test("TableDescriptor should return the correct logPath, tableId, and tableConf") { + val logPath = "/delta/logPath" + val tableId = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table")) + val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava + + val tableDescriptor = new TableDescriptor(logPath, tableId, tableConf) + + assert(tableDescriptor.getLogPath == logPath) + assert(tableDescriptor.getTableId == tableId) + assert(tableDescriptor.getTableConf == tableConf) + } + + test("TableDescriptors with the same values should be equal") { + val logPath = "/delta/logPath" + val tableId = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table")) + val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava + + val tableDescriptor1 = new TableDescriptor(logPath, tableId, tableConf) + val tableDescriptor2 = new TableDescriptor(logPath, tableId, tableConf) + + assert(tableDescriptor1 == tableDescriptor2) + assert(tableDescriptor1.hashCode == tableDescriptor2.hashCode) + } + + test("TableDescriptors with different values should not be equal") { + val logPath = "/delta/logPath" + val tableId = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table")) + val tableConf1 = Map("key1" -> "value1").asJava + val tableConf2 = Map("key1" -> "value2").asJava + + val tableDescriptor1 = new TableDescriptor(logPath, tableId, tableConf1) + val tableDescriptor2 = new TableDescriptor(logPath, tableId, tableConf2) + + assert(tableDescriptor1 != tableDescriptor2) + } + + test("TableDescriptor toString format") { + val logPath = "/delta/logPath" + val tableId = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table")) + val tableConf = Map("key1" -> "value1").asJava + + val tableDescriptor = new TableDescriptor(logPath, tableId, tableConf) + val expectedString = "TableDescriptor{logPath='/delta/logPath', " + + "tableId=Optional[TableIdentifier{catalog.schema.table}], " + + "tableConf={key1=value1}}" + assert(tableDescriptor.toString == expectedString) + } +}