Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions persistence/relational-jdbc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ dependencies {

compileOnly(platform(libs.jackson.bom))
compileOnly("com.fasterxml.jackson.core:jackson-annotations")
compileOnly("com.fasterxml.jackson.core:jackson-databind")
compileOnly(libs.jakarta.annotation.api)
compileOnly(libs.jakarta.enterprise.cdi.api)
compileOnly(libs.jakarta.inject.api)

// Iceberg API for metrics report conversion
compileOnly(platform(libs.iceberg.bom))
compileOnly("org.apache.iceberg:iceberg-api")
compileOnly("org.apache.iceberg:iceberg-core")

implementation(libs.smallrye.common.annotation) // @Identifier
implementation(libs.postgresql)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@
import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.persistence.relational.jdbc.models.EntityNameLookupRecordConverter;
import org.apache.polaris.persistence.relational.jdbc.models.ModelCommitMetricsReport;
import org.apache.polaris.persistence.relational.jdbc.models.ModelCommitMetricsReportConverter;
import org.apache.polaris.persistence.relational.jdbc.models.ModelEntity;
import org.apache.polaris.persistence.relational.jdbc.models.ModelEvent;
import org.apache.polaris.persistence.relational.jdbc.models.ModelGrantRecord;
import org.apache.polaris.persistence.relational.jdbc.models.ModelPolicyMappingRecord;
import org.apache.polaris.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData;
import org.apache.polaris.persistence.relational.jdbc.models.ModelScanMetricsReport;
import org.apache.polaris.persistence.relational.jdbc.models.ModelScanMetricsReportConverter;
import org.apache.polaris.persistence.relational.jdbc.models.SchemaVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -315,6 +319,264 @@ public void writeEvents(@Nonnull List<PolarisEvent> events) {
}
}

/**
* Writes a scan metrics report to the database as a first-class entity.
*
* @param report the scan metrics report to persist
*/
public void writeScanMetricsReport(@Nonnull ModelScanMetricsReport report) {
try {
PreparedQuery pq =
QueryGenerator.generateInsertQueryWithoutRealmId(
ModelScanMetricsReport.ALL_COLUMNS,
ModelScanMetricsReport.TABLE_NAME,
report.toMap(datasourceOperations.getDatabaseType()).values().stream().toList());
int updated = datasourceOperations.executeUpdate(pq);
if (updated == 0) {
throw new SQLException("Scan metrics report was not inserted.");
}
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to write scan metrics report due to %s", e.getMessage()), e);
}
}

/**
* Writes a commit metrics report to the database as a first-class entity.
*
* @param report the commit metrics report to persist
*/
public void writeCommitMetricsReport(@Nonnull ModelCommitMetricsReport report) {
try {
PreparedQuery pq =
QueryGenerator.generateInsertQueryWithoutRealmId(
ModelCommitMetricsReport.ALL_COLUMNS,
ModelCommitMetricsReport.TABLE_NAME,
report.toMap(datasourceOperations.getDatabaseType()).values().stream().toList());
int updated = datasourceOperations.executeUpdate(pq);
if (updated == 0) {
throw new SQLException("Commit metrics report was not inserted.");
}
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to write commit metrics report due to %s", e.getMessage()), e);
}
}

/**
* Retrieves scan metrics reports for a specific table within a time range.
*
* @param catalogName the catalog name
* @param namespace the namespace
* @param tableName the table name
* @param startTimeMs start of time range (inclusive), or null for no lower bound
* @param endTimeMs end of time range (exclusive), or null for no upper bound
* @param limit maximum number of results to return
* @return list of scan metrics reports matching the criteria
*/
@Nonnull
public List<ModelScanMetricsReport> queryScanMetricsReports(
@Nonnull String catalogName,
@Nonnull String namespace,
@Nonnull String tableName,
@Nullable Long startTimeMs,
@Nullable Long endTimeMs,
int limit) {
try {
StringBuilder whereClause = new StringBuilder();
whereClause.append("realm_id = ? AND catalog_name = ? AND namespace = ? AND table_name = ?");
List<Object> values = new ArrayList<>(List.of(realmId, catalogName, namespace, tableName));

if (startTimeMs != null) {
whereClause.append(" AND timestamp_ms >= ?");
values.add(startTimeMs);
}
if (endTimeMs != null) {
whereClause.append(" AND timestamp_ms < ?");
values.add(endTimeMs);
}

String sql =
"SELECT * FROM "
+ QueryGenerator.getFullyQualifiedTableName(ModelScanMetricsReport.TABLE_NAME)
+ " WHERE "
+ whereClause
+ " ORDER BY timestamp_ms DESC LIMIT "
+ limit;

PreparedQuery query = new PreparedQuery(sql, values);
var results =
datasourceOperations.executeSelect(query, new ModelScanMetricsReportConverter());
return results == null ? Collections.emptyList() : results;
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to query scan metrics reports due to %s", e.getMessage()), e);
}
}

/**
* Retrieves commit metrics reports for a specific table within a time range.
*
* @param catalogName the catalog name
* @param namespace the namespace
* @param tableName the table name
* @param startTimeMs start of time range (inclusive), or null for no lower bound
* @param endTimeMs end of time range (exclusive), or null for no upper bound
* @param limit maximum number of results to return
* @return list of commit metrics reports matching the criteria
*/
@Nonnull
public List<ModelCommitMetricsReport> queryCommitMetricsReports(
@Nonnull String catalogName,
@Nonnull String namespace,
@Nonnull String tableName,
@Nullable Long startTimeMs,
@Nullable Long endTimeMs,
int limit) {
try {
List<Object> values = new ArrayList<>(List.of(realmId, catalogName, namespace, tableName));

StringBuilder whereClause = new StringBuilder();
whereClause.append("realm_id = ? AND catalog_name = ? AND namespace = ? AND table_name = ?");

if (startTimeMs != null) {
whereClause.append(" AND timestamp_ms >= ?");
values.add(startTimeMs);
}
if (endTimeMs != null) {
whereClause.append(" AND timestamp_ms < ?");
values.add(endTimeMs);
}

String sql =
"SELECT * FROM "
+ QueryGenerator.getFullyQualifiedTableName(ModelCommitMetricsReport.TABLE_NAME)
+ " WHERE "
+ whereClause
+ " ORDER BY timestamp_ms DESC LIMIT "
+ limit;

PreparedQuery query = new PreparedQuery(sql, values);
var results =
datasourceOperations.executeSelect(query, new ModelCommitMetricsReportConverter());
return results == null ? Collections.emptyList() : results;
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to query commit metrics reports due to %s", e.getMessage()), e);
}
}

/**
* Retrieves scan metrics reports by OpenTelemetry trace ID.
*
* @param traceId the OpenTelemetry trace ID
* @return list of scan metrics reports with the given trace ID
*/
@Nonnull
public List<ModelScanMetricsReport> queryScanMetricsReportsByTraceId(@Nonnull String traceId) {
try {
String sql =
"SELECT * FROM "
+ QueryGenerator.getFullyQualifiedTableName(ModelScanMetricsReport.TABLE_NAME)
+ " WHERE realm_id = ? AND otel_trace_id = ? ORDER BY timestamp_ms DESC";

PreparedQuery query = new PreparedQuery(sql, List.of(realmId, traceId));
var results =
datasourceOperations.executeSelect(query, new ModelScanMetricsReportConverter());
return results == null ? Collections.emptyList() : results;
} catch (SQLException e) {
throw new RuntimeException(
String.format(
"Failed to query scan metrics reports by trace ID due to %s", e.getMessage()),
e);
}
}

/**
* Retrieves commit metrics reports by OpenTelemetry trace ID.
*
* @param traceId the OpenTelemetry trace ID
* @return list of commit metrics reports with the given trace ID
*/
@Nonnull
public List<ModelCommitMetricsReport> queryCommitMetricsReportsByTraceId(
@Nonnull String traceId) {
try {
String sql =
"SELECT * FROM "
+ QueryGenerator.getFullyQualifiedTableName(ModelCommitMetricsReport.TABLE_NAME)
+ " WHERE realm_id = ? AND otel_trace_id = ? ORDER BY timestamp_ms DESC";

PreparedQuery query = new PreparedQuery(sql, List.of(realmId, traceId));
var results =
datasourceOperations.executeSelect(query, new ModelCommitMetricsReportConverter());
return results == null ? Collections.emptyList() : results;
} catch (SQLException e) {
throw new RuntimeException(
String.format(
"Failed to query commit metrics reports by trace ID due to %s", e.getMessage()),
e);
}
}

/**
* Deletes scan metrics reports older than the specified timestamp.
*
* @param olderThanMs timestamp in milliseconds; reports with timestamp_ms less than this will be
* deleted
* @return the number of reports deleted
*/
public int deleteScanMetricsReportsOlderThan(long olderThanMs) {
try {
String sql =
"DELETE FROM "
+ QueryGenerator.getFullyQualifiedTableName(ModelScanMetricsReport.TABLE_NAME)
+ " WHERE realm_id = ? AND timestamp_ms < ?";

PreparedQuery query = new PreparedQuery(sql, List.of(realmId, olderThanMs));
return datasourceOperations.executeUpdate(query);
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to delete old scan metrics reports due to %s", e.getMessage()), e);
}
}

/**
* Deletes commit metrics reports older than the specified timestamp.
*
* @param olderThanMs timestamp in milliseconds; reports with timestamp_ms less than this will be
* deleted
* @return the number of reports deleted
*/
public int deleteCommitMetricsReportsOlderThan(long olderThanMs) {
try {
String sql =
"DELETE FROM "
+ QueryGenerator.getFullyQualifiedTableName(ModelCommitMetricsReport.TABLE_NAME)
+ " WHERE realm_id = ? AND timestamp_ms < ?";

PreparedQuery query = new PreparedQuery(sql, List.of(realmId, olderThanMs));
return datasourceOperations.executeUpdate(query);
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to delete old commit metrics reports due to %s", e.getMessage()),
e);
}
}

/**
* Deletes all metrics reports (both scan and commit) older than the specified timestamp.
*
* @param olderThanMs timestamp in milliseconds; reports with timestamp_ms less than this will be
* deleted
* @return the total number of reports deleted (scan + commit)
*/
public int deleteAllMetricsReportsOlderThan(long olderThanMs) {
int scanDeleted = deleteScanMetricsReportsOlderThan(olderThanMs);
int commitDeleted = deleteCommitMetricsReportsOlderThan(olderThanMs);
return scanDeleted + commitDeleted;
}

@Override
public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
ModelEntity modelEntity = ModelEntity.fromEntity(entity, schemaVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,30 @@ public static PreparedQuery generateInsertQuery(
return new PreparedQuery(sql, finalValues);
}

/**
* Generates an INSERT query for a given table without appending realm_id. Use this when the
* columns already include realm_id.
*
* @param allColumns Columns to insert values into (should already include realm_id if needed).
* @param tableName Target table name.
* @param values Values for each column (must match order of columns).
* @return INSERT query with value bindings.
*/
public static PreparedQuery generateInsertQueryWithoutRealmId(
@Nonnull List<String> allColumns, @Nonnull String tableName, List<Object> values) {
String columns = String.join(", ", allColumns);
String placeholders = allColumns.stream().map(c -> "?").collect(Collectors.joining(", "));
String sql =
"INSERT INTO "
+ getFullyQualifiedTableName(tableName)
+ " ("
+ columns
+ ") VALUES ("
+ placeholders
+ ")";
return new PreparedQuery(sql, values);
}

/**
* Builds an UPDATE query.
*
Expand Down Expand Up @@ -317,7 +341,7 @@ public static PreparedQuery generateOverlapQuery(
return new PreparedQuery(query.sql(), where.parameters());
}

private static String getFullyQualifiedTableName(String tableName) {
static String getFullyQualifiedTableName(String tableName) {
// TODO: make schema name configurable.
return "POLARIS_SCHEMA." + tableName;
}
Expand Down
Loading