diff --git a/benchmarks/Makefile b/benchmarks/Makefile index ee9f0d2d..9fe0a3a1 100644 --- a/benchmarks/Makefile +++ b/benchmarks/Makefile @@ -17,6 +17,30 @@ # under the License. # +.PHONY: help +help: + @echo "Available commands:" + @echo "" + @echo "Benchmark Simulations:" + @echo " make s3-sign-request-simulation - Run S3 sign request simulation" + @echo " make create-dataset-simulation - Run create tree dataset simulation" + @echo " make read-simulation - Run read tree dataset simulation" + @echo " make read-update-simulation - Run read/update tree dataset simulation" + @echo " make create-commits-simulation - Run create commits simulation" + @echo " make weighted-workload-simulation - Run weighted workload simulation" + @echo "" + @echo "Reports:" + @echo " make reports-list - List all generated reports" + @echo " make reports-clean - Clean all generated reports" + @echo "" + @echo "Help:" + @echo " make help - Show this help message" + +.PHONY: s3-sign-request-simulation +s3-sign-request-simulation: + ./gradlew gatlingRun --simulation org.apache.polaris.benchmarks.simulations.S3SignRequest \ + -Dconfig.file=./application.conf + .PHONY: create-dataset-simulation create-dataset-simulation: ./gradlew gatlingRun --simulation org.apache.polaris.benchmarks.simulations.CreateTreeDataset \ @@ -54,7 +78,7 @@ reports-list: millis="$$(echo $$timestamp | cut -c15-17)"; \ formatted_date="$$(echo $$date | sed 's/\([0-9]\{4\}\)\([0-9]\{2\}\)\([0-9]\{2\}\)/\1-\2-\3/')"; \ formatted_time="$$(echo $$time | sed 's/\([0-9]\{2\}\)\([0-9]\{2\}\)\([0-9]\{2\}\)/\1:\2:\3/')"; \ - index_path="$${report}index.html"; \ + index_path="./$${report}index.html"; \ if [ -f "$$index_path" ]; then \ echo "$$name | $$formatted_date $$formatted_time.$$millis | $$index_path"; \ else \ diff --git a/benchmarks/README.md b/benchmarks/README.md index a3dc8681..b5160a42 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -76,6 +76,8 @@ auth { max-retries = 10 # Maximum number of retry attempts for authentication failures retryable-http-codes = [500] # HTTP status codes that should trigger a retry refresh-interval-seconds = 60 # Refresh interval for the authentication token in seconds + catalog-role = "catalog_admin" # Catalog role to grant privileges to + privileges = ["CATALOG_MANAGE_CONTENT"] # List of privileges to grant to the catalog role } ``` @@ -143,6 +145,9 @@ make create-commits-simulation # Weighted workload make weighted-workload-simulation + +# S3 sign request +make s3-sign-request-simulation ``` A message will show the location of the Gatling report: diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf b/benchmarks/src/gatling/resources/benchmark-defaults.conf index 9caeec09..7e71dbde 100644 --- a/benchmarks/src/gatling/resources/benchmark-defaults.conf +++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf @@ -42,6 +42,14 @@ auth { # HTTP status codes that should trigger a retry retryable-http-codes = [500] + + # Catalog role to grant privileges to + # Default: "catalog_admin" + catalog-role = "catalog_admin" + + # List of privileges to grant to the catalog role + # Default: ["CATALOG_MANAGE_CONTENT"] + privileges = ["CATALOG_MANAGE_CONTENT"] } # Dataset tree structure configuration @@ -217,4 +225,12 @@ workload { # Default: 5 duration-in-minutes = 5 } + + # Configuration for the S3SignRequest simulation + s3-sign-request { + # Number of table S3 sign operations to perform simultaneously + # This controls the concurrency level for table S3 signing operations + # Default: 20 + table-concurrency = 20 + } } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/CatalogActions.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/CatalogActions.scala index 1062b4d9..8f301221 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/CatalogActions.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/CatalogActions.scala @@ -129,4 +129,42 @@ case class CatalogActions( .header("Content-Type", "application/json") .check(status.is(200)) ) + + /** + * Grants a specified privilege to a catalog role for a catalog. This is a fire-and-forget + * operation that does not validate the response, allowing for maximum throughput. The catalog + * name, catalog role name, and privilege should be available in the session. + */ + val grantCatalogPrivilege: ChainBuilder = exec( + http("Grant catalog privilege") + .put("/api/management/v1/catalogs/#{catalogName}/catalog-roles/#{catalogRoleName}/grants") + .header("Authorization", "Bearer #{accessToken}") + .header("Content-Type", "application/json") + .body( + StringBody( + """{ + | "grant": { + | "type": "catalog", + | "privilege": "#{privilege}" + | } + |}""".stripMargin + ) + ) + ) + + /** + * Verifies that a specific privilege is granted to a catalog role for a catalog. The catalog + * name, catalog role name, and privilege should be available in the session. If the privilege is + * not found, the check will fail and the request will be marked as failed. + */ + val checkCatalogPrivilegeGranted: ChainBuilder = exec( + http("Check catalog privilege granted") + .get("/api/management/v1/catalogs/#{catalogName}/catalog-roles/#{catalogRoleName}/grants") + .header("Authorization", "Bearer #{accessToken}") + .header("Content-Type", "application/json") + .check(status.is(200)) + .check( + jsonPath("$.grants[?(@.type == 'catalog' && @.privilege == '#{privilege}')]").exists + ) + ) } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/S3SignActions.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/S3SignActions.scala new file mode 100644 index 00000000..6944a42e --- /dev/null +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/S3SignActions.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.benchmarks.actions + +import io.gatling.core.Predef._ +import io.gatling.core.structure.ChainBuilder +import io.gatling.http.Predef._ +import org.apache.polaris.benchmarks.parameters.{DatasetParameters, WorkloadParameters} +import org.slf4j.LoggerFactory +import play.api.libs.json.Json + +import java.util.concurrent.atomic.AtomicReference + +/** + * Actions for performance testing S3 sign requests. This class provides methods to sign S3 requests + * for table files. + * + * @param dp Dataset parameters controlling the dataset generation + * @param wp Workload parameters controlling the workload configuration + * @param accessToken Reference to the authentication token shared across actions + */ +case class S3SignActions( + dp: DatasetParameters, + wp: WorkloadParameters, + accessToken: AtomicReference[String] +) { + private val logger = LoggerFactory.getLogger(getClass) + + private val region: String = + try { + val json = Json.parse(dp.storageConfigInfo) + (json \ "region").asOpt[String].getOrElse("us-east-1") + } catch { + case _: Exception => "us-east-1" + } + + private val bucketName: String = dp.defaultBaseLocation.stripPrefix("s3://").split("/")(0) + private val basePath: String = dp.defaultBaseLocation.stripPrefix(s"s3://$bucketName/") + private val s3Domain: String = s"s3.$region.amazonaws.com" + + /** + * Sends a request to sign an S3 request for a table file. + */ + val signTableRequest: ChainBuilder = exec { session => + val catalogName = session("catalogName").as[String] + val parentNamespacePath = session("parentNamespacePath").as[Seq[String]] + val tableName = session("tableName").as[String] + val namespacePath = parentNamespacePath.mkString("/") + val fileUri = + s"https://$bucketName.$s3Domain/$basePath/$catalogName/$namespacePath/$tableName/metadata/00000-example.metadata.json" + + session + .set("region", region) + .set("method", "GET") + .set("uri", fileUri) + } + .exec { session => + val uri = session("uri").as[String] + logger.info(s"Signing S3 request for $uri") + session + } + .exec( + http("Sign S3 Request") + .post("/api/s3-sign/v1/#{catalogName}/namespaces/#{multipartNamespace}/tables/#{tableName}") + .header("Authorization", "Bearer #{accessToken}") + .header("Content-Type", "application/json") + .body( + StringBody( + """{ + | "region": "#{region}", + | "method": "#{method}", + | "uri": "#{uri}", + | "headers": {} + |}""".stripMargin + ) + ) + .check(status.is(200)) + .check(jsonPath("$.uri").exists) + .check(jsonPath("$.headers").exists) + ) +} diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/AuthParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/AuthParameters.scala index f6f87332..ce95cae0 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/AuthParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/AuthParameters.scala @@ -32,7 +32,9 @@ case class AuthParameters( clientSecret: String, refreshIntervalSeconds: Int, maxRetries: Int, - retryableHttpCodes: Set[Int] + retryableHttpCodes: Set[Int], + catalogRole: String, + privileges: Seq[String] ) { require(clientId != null && clientId.nonEmpty, "Client ID cannot be null or empty") require(clientSecret != null && clientSecret.nonEmpty, "Client secret cannot be null or empty") diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala index 2865768c..fe53566e 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala @@ -20,6 +20,8 @@ package org.apache.polaris.benchmarks.parameters import com.typesafe.config.{Config, ConfigFactory} +import scala.jdk.CollectionConverters._ + object BenchmarkConfig { val config: BenchmarkConfig = apply() @@ -40,7 +42,9 @@ object BenchmarkConfig { auth.getString("client-secret"), auth.getInt("refresh-interval-seconds"), auth.getInt("max-retries"), - auth.getIntList("retryable-http-codes").toArray.map(_.asInstanceOf[Int]).toSet + auth.getIntList("retryable-http-codes").toArray.map(_.asInstanceOf[Int]).toSet, + auth.getString("catalog-role"), + auth.getStringList("privileges").asScala.toSeq ) val workloadParams = { @@ -49,6 +53,7 @@ object BenchmarkConfig { val ctdConfig = workload.getConfig("create-tree-dataset") val rutdConfig = workload.getConfig("read-update-tree-dataset") val wwotdConfig = workload.getConfig("weighted-workload-on-tree-dataset") + val s3srConfig = workload.getConfig("s3-sign-request") WorkloadParameters( CreateCommitsParameters( @@ -74,6 +79,9 @@ object BenchmarkConfig { WeightedWorkloadOnTreeDatasetParameters.loadDistributionsList(wwotdConfig, "readers"), WeightedWorkloadOnTreeDatasetParameters.loadDistributionsList(wwotdConfig, "writers"), wwotdConfig.getInt("duration-in-minutes") + ), + S3SignRequestParameters( + s3srConfig.getInt("table-concurrency") ) ) } diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/S3SignRequestParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/S3SignRequestParameters.scala new file mode 100644 index 00000000..addd4868 --- /dev/null +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/S3SignRequestParameters.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.benchmarks.parameters + +/** + * Case class to hold the parameters for the S3SignRequest simulation. + * + * @param tableThroughput The number of table S3 sign operations to perform simultaneously. + */ +case class S3SignRequestParameters( + tableThroughput: Int +) { + require(tableThroughput >= 0, "Table throughput cannot be negative") +} diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala index 0ee96908..dccdd209 100644 --- a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WorkloadParameters.scala @@ -23,5 +23,6 @@ case class WorkloadParameters( readTreeDataset: ReadTreeDatasetParameters, createTreeDataset: CreateTreeDatasetParameters, readUpdateTreeDataset: ReadUpdateTreeDatasetParameters, - weightedWorkloadOnTreeDataset: WeightedWorkloadOnTreeDatasetParameters + weightedWorkloadOnTreeDataset: WeightedWorkloadOnTreeDatasetParameters, + s3SignRequest: S3SignRequestParameters ) {} diff --git a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/S3SignRequest.scala b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/S3SignRequest.scala new file mode 100644 index 00000000..a4035b5a --- /dev/null +++ b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/S3SignRequest.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.benchmarks.simulations + +import io.gatling.core.Predef._ +import io.gatling.http.Predef._ +import org.apache.polaris.benchmarks.actions._ +import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config +import org.apache.polaris.benchmarks.parameters.WorkloadParameters +import org.slf4j.LoggerFactory + +import java.util.concurrent.atomic.AtomicInteger + +/** + * This simulation signs S3 requests for table files in a pre-existing tree dataset. It is intended + * to be used against a Polaris instance with a pre-existing tree dataset. It has no side effect on + * the dataset and therefore can be executed multiple times without any issue. It signs each table + * file exactly once. + */ +class S3SignRequest extends Simulation { + private val logger = LoggerFactory.getLogger(getClass) + + private val cp = config.connectionParameters + private val ap = config.authParameters + private val dp = config.datasetParameters + val wp: WorkloadParameters = config.workloadParameters + + private val numNamespaces: Int = dp.nAryTree.numberOfNodes + private val setupActions = SetupActions(cp, ap) + private val catalogActions = CatalogActions(dp, setupActions.accessToken) + private val namespaceActions = NamespaceActions(dp, wp, setupActions.accessToken) + private val tableActions = TableActions(dp, wp, setupActions.accessToken) + private val s3SignActions = S3SignActions(dp, wp, setupActions.accessToken) + + private val grantedCatalogs = new AtomicInteger() + private val verifiedPrivileges = new AtomicInteger() + private val verifiedCatalogs = new AtomicInteger() + private val verifiedNamespaces = new AtomicInteger() + private val signedTables = new AtomicInteger() + + private val grantCatalogPrivileges = scenario("Grant privileges to catalog role") + .exec(setupActions.restoreAccessTokenInSession) + .asLongAs(session => + grantedCatalogs.getAndIncrement() < dp.numCatalogs && session.contains("accessToken") + )( + feed(catalogActions.feeder()) + .exec { session => + session.set("catalogRoleName", ap.catalogRole) + } + .foreach(ap.privileges, "privilege") { + exec(catalogActions.grantCatalogPrivilege) + } + ) + + private val verifyCatalogPrivileges = scenario("Verify catalog privileges are granted") + .exec(setupActions.restoreAccessTokenInSession) + .asLongAs(session => + verifiedPrivileges.getAndIncrement() < dp.numCatalogs && session.contains("accessToken") + )( + feed(catalogActions.feeder()) + .exec { session => + session.set("catalogRoleName", ap.catalogRole) + } + .foreach(ap.privileges, "privilege") { + exec(catalogActions.checkCatalogPrivilegeGranted) + } + ) + + private val verifyCatalogs = scenario("Verify catalogs using the Polaris Management REST API") + .exec(setupActions.restoreAccessTokenInSession) + .asLongAs(session => + verifiedCatalogs.getAndIncrement() < dp.numCatalogs && session.contains("accessToken") + )( + feed(catalogActions.feeder()) + .exec(catalogActions.fetchCatalog) + ) + + private val verifyNamespaces = scenario("Verify namespaces using the Iceberg REST API") + .exec(setupActions.restoreAccessTokenInSession) + .asLongAs(session => + verifiedNamespaces.getAndIncrement() < numNamespaces && session.contains("accessToken") + )( + feed(namespaceActions.namespaceFetchFeeder()) + .exec(namespaceActions.fetchAllChildrenNamespaces) + .exec(namespaceActions.checkNamespaceExists) + .exec(namespaceActions.fetchNamespace) + ) + + private val signTables = scenario("Sign S3 requests for table files") + .exec(setupActions.restoreAccessTokenInSession) + .asLongAs(session => + signedTables.getAndIncrement() < dp.numTables && session.contains("accessToken") + )( + feed(tableActions.tableFetchFeeder()) + .exec(tableActions.fetchAllTables) + .exec(tableActions.checkTableExists) + .exec(s3SignActions.signTableRequest) + ) + + private val httpProtocol = http + .baseUrl(cp.baseUrl) + .acceptHeader("application/json") + .contentTypeHeader("application/json") + .disableCaching + + private val tableThroughput = wp.s3SignRequest.tableThroughput + + setUp( + setupActions.continuouslyRefreshOauthToken().inject(atOnceUsers(1)).protocols(httpProtocol), + setupActions.waitForAuthentication + .inject(atOnceUsers(1)) + .andThen(verifyCatalogs.inject(atOnceUsers(1)).protocols(httpProtocol)) + .andThen(grantCatalogPrivileges.inject(atOnceUsers(1)).protocols(httpProtocol)) + .andThen(verifyCatalogPrivileges.inject(atOnceUsers(1)).protocols(httpProtocol)) + .andThen(verifyNamespaces.inject(atOnceUsers(dp.nsDepth)).protocols(httpProtocol)) + .andThen( + signTables.inject(atOnceUsers(tableThroughput)).protocols(httpProtocol) + ) + .andThen(setupActions.stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol)) + ) +}