diff --git a/plugins/spark/v3.5/integration/build.gradle.kts b/plugins/spark/v3.5/integration/build.gradle.kts index f7c9892086..9a64ac4836 100644 --- a/plugins/spark/v3.5/integration/build.gradle.kts +++ b/plugins/spark/v3.5/integration/build.gradle.kts @@ -62,11 +62,51 @@ dependencies { exclude("org.apache.logging.log4j", "log4j-core") exclude("org.slf4j", "jul-to-slf4j") } + + // Add spark-hive for Hudi integration - provides HiveExternalCatalog that Hudi needs + testRuntimeOnly("org.apache.spark:spark-hive_${scalaVersion}:${spark35Version}") { + // exclude log4j dependencies to match spark-sql exclusions + exclude("org.apache.logging.log4j", "log4j-slf4j2-impl") + exclude("org.apache.logging.log4j", "log4j-1.2-api") + exclude("org.apache.logging.log4j", "log4j-core") + exclude("org.slf4j", "jul-to-slf4j") + // exclude old slf4j 1.x to log4j 2.x bridge that conflicts with slf4j 2.x bridge + exclude("org.apache.logging.log4j", "log4j-slf4j-impl") + } // enforce the usage of log4j 2.24.3. This is for the log4j-api compatibility // of spark-sql dependency testRuntimeOnly("org.apache.logging.log4j:log4j-core:2.25.2") testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1") + testImplementation("org.apache.hudi:hudi-spark3.5-bundle_${scalaVersion}:1.1.0") { + // exclude log4j dependencies to match spark-sql exclusions + // exclude log4j dependencies to match spark-sql exclusions and prevent version conflicts + exclude("org.apache.logging.log4j", "log4j-slf4j2-impl") + exclude("org.apache.logging.log4j", "log4j-1.2-api") + exclude("org.apache.logging.log4j", "log4j-core") + exclude("org.slf4j", "jul-to-slf4j") + exclude("org.slf4j", "slf4j-log4j12") + exclude("org.slf4j", "slf4j-reload4j") + exclude("ch.qos.reload4j", "reload4j") + exclude("log4j", "log4j") + // exclude old slf4j 1.x to log4j 2.x bridge that conflicts with slf4j 2.x bridge + exclude("org.apache.logging.log4j", "log4j-slf4j-impl") + } + + // The hudi-spark-bundle includes most Hive libraries but excludes hive-exec to keep size + // manageable + // This matches what Spark 3.5 distribution provides (hive-exec-2.3.9-core.jar) + testImplementation("org.apache.hive:hive-exec:2.3.9:core") { + // Exclude conflicting dependencies to use Spark's versions + exclude("org.apache.hadoop", "*") + exclude("org.apache.commons", "*") + exclude("org.slf4j", "*") + exclude("log4j", "*") + exclude("org.apache.logging.log4j", "*") + exclude("org.pentaho", "*") + exclude("org.apache.calcite", "*") + exclude("org.apache.tez", "*") + } testImplementation(platform(libs.jackson.bom)) testImplementation("com.fasterxml.jackson.jakarta.rs:jackson-jakarta-rs-json-provider") diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkHudiIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkHudiIT.java new file mode 100644 index 0000000000..1093362db4 --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkHudiIT.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.quarkus.test.junit.QuarkusIntegrationTest; +import java.io.File; +import java.nio.file.Path; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.polaris.service.it.env.IntegrationTestsHelper; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +@QuarkusIntegrationTest +public class SparkHudiIT extends SparkIntegrationBase { + + @Override + protected SparkSession buildSparkSession() { + return SparkSession.builder() + .master("local[1]") + .config("spark.ui.showConsoleProgress", "false") + .config("spark.ui.enabled", "false") + .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + .config( + String.format("spark.sql.catalog.%s", catalogName), + "org.apache.polaris.spark.SparkCatalog") + .config("spark.sql.warehouse.dir", warehouseDir.toString()) + .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") + .config( + String.format("spark.sql.catalog.%s.uri", catalogName), + endpoints.catalogApiEndpoint().toString()) + .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName) + .config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL") + .config( + String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId()) + .config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken) + .config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey") + .config( + String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret") + .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") + // for intial integration test have disabled for now, to revisit enabling in future + .config("hoodie.metadata.enable", "false") + .getOrCreate(); + } + + private String defaultNs; + private String tableRootDir; + + private String getTableLocation(String tableName) { + return String.format("%s/%s", tableRootDir, tableName); + } + + private String getTableNameWithRandomSuffix() { + return generateName("huditb"); + } + + @BeforeEach + public void createDefaultResources(@TempDir Path tempDir) { + spark.sparkContext().setLogLevel("INFO"); + defaultNs = generateName("hudi"); + // create a default namespace + sql("CREATE NAMESPACE %s", defaultNs); + sql("USE NAMESPACE %s", defaultNs); + tableRootDir = + IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath(); + } + + @AfterEach + public void cleanupHudiData() { + // clean up hudi data + if (tableRootDir != null) { + File dirToDelete = new File(tableRootDir); + FileUtils.deleteQuietly(dirToDelete); + } + if (defaultNs != null) { + sql("DROP NAMESPACE %s", defaultNs); + } + } + + @Test + public void testBasicTableOperations() { + // create a regular hudi table + String huditb1 = "huditb1"; + sql( + "CREATE TABLE %s (id INT, name STRING) USING HUDI LOCATION '%s'", + huditb1, getTableLocation(huditb1)); + sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", huditb1); + List results = sql("SELECT id,name FROM %s WHERE id > 1 ORDER BY id DESC", huditb1); + assertThat(results.size()).isEqualTo(1); + assertThat(results.get(0)).isEqualTo(new Object[] {2, "bob"}); + + // create a hudi table with partition + String huditb2 = "huditb2"; + sql( + "CREATE TABLE %s (name String, age INT, country STRING) USING HUDI PARTITIONED BY (country) LOCATION '%s'", + huditb2, getTableLocation(huditb2)); + sql( + "INSERT INTO %s VALUES ('anna', 10, 'US'), ('james', 32, 'US'), ('yan', 16, 'CHINA')", + huditb2); + results = sql("SELECT name, country FROM %s ORDER BY age", huditb2); + assertThat(results.size()).isEqualTo(3); + assertThat(results.get(0)).isEqualTo(new Object[] {"anna", "US"}); + assertThat(results.get(1)).isEqualTo(new Object[] {"yan", "CHINA"}); + assertThat(results.get(2)).isEqualTo(new Object[] {"james", "US"}); + + // verify the partition dir is created + List subDirs = listDirs(getTableLocation(huditb2)); + assertThat(subDirs).contains(".hoodie", "country=CHINA", "country=US"); + + // test listTables + List tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(2); + assertThat(tables) + .contains( + new Object[] {defaultNs, huditb1, false}, new Object[] {defaultNs, huditb2, false}); + + sql("DROP TABLE %s", huditb1); + sql("DROP TABLE %s", huditb2); + tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(0); + } + + @Test + public void testUnsupportedAlterTableOperations() { + String huditb = getTableNameWithRandomSuffix(); + sql( + "CREATE TABLE %s (name String, age INT, country STRING) USING HUDI PARTITIONED BY (country) LOCATION '%s'", + huditb, getTableLocation(huditb)); + + // ALTER TABLE ... RENAME TO ... fails + assertThatThrownBy(() -> sql("ALTER TABLE %s RENAME TO new_hudi", huditb)) + .isInstanceOf(UnsupportedOperationException.class); + + // ALTER TABLE ... SET LOCATION ... fails + assertThatThrownBy(() -> sql("ALTER TABLE %s SET LOCATION '/tmp/new/path'", huditb)) + .isInstanceOf(UnsupportedOperationException.class); + + sql("DROP TABLE %s", huditb); + } + + @Test + public void testUnsupportedTableCreateOperations() { + String huditb = getTableNameWithRandomSuffix(); + // create hudi table with no location + assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING HUDI", huditb)) + .isInstanceOf(UnsupportedOperationException.class); + + // CTAS fails + assertThatThrownBy( + () -> + sql( + "CREATE TABLE %s USING HUDI LOCATION '%s' AS SELECT 1 AS id", + huditb, getTableLocation(huditb))) + .isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/plugins/spark/v3.5/spark/build.gradle.kts b/plugins/spark/v3.5/spark/build.gradle.kts index 45af3b6f93..9e67e20e91 100644 --- a/plugins/spark/v3.5/spark/build.gradle.kts +++ b/plugins/spark/v3.5/spark/build.gradle.kts @@ -46,6 +46,7 @@ dependencies { // TODO: extract a polaris-rest module as a thin layer for // client to depends on. implementation(project(":polaris-core")) { isTransitive = false } + testImplementation("org.apache.hudi:hudi-spark3.5-bundle_${scalaVersion}:1.1.0") implementation( "org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"