Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel][Metrics][PR#6] Support TransactionReport to log metrics for a Transaction operation #4037

Merged
merged 8 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.TransactionMetrics;
import io.delta.kernel.internal.metrics.TransactionReportImpl;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.metrics.TransactionReport;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
import io.delta.kernel.utils.CloseableIterator;
Expand Down Expand Up @@ -138,9 +141,31 @@ public List<DomainMetadata> getDomainMetadatas() {
@Override
public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> dataActions)
throws ConcurrentWriteException {
checkState(!closed, "Transaction is already attempted to commit. Create a new transaction.");
TransactionMetrics transactionMetrics = new TransactionMetrics();
try {
checkState(!closed, "Transaction is already attempted to commit. Create a new transaction.");
TransactionCommitResult result =
transactionMetrics.totalCommitTimer.time(
() -> commitWithRetry(engine, dataActions, transactionMetrics));
recordTransactionReport(
engine,
Optional.of(result.getVersion()) /* committedVersion */,
transactionMetrics,
Optional.empty() /* exception */);
return result;
} catch (Exception e) {
recordTransactionReport(
engine,
Optional.empty() /* committedVersion */,
transactionMetrics,
Optional.of(e) /* exception */);
throw e;
}
}

private TransactionCommitResult commitWithRetry(
Engine engine, CloseableIterable<Row> dataActions, TransactionMetrics transactionMetrics) {
try {
long commitAsVersion = readSnapshot.getVersion(engine) + 1;
// Generate the commit action with the inCommitTimestamp if ICT is enabled.
CommitInfo attemptCommitInfo = generateCommitAction(engine);
Expand All @@ -160,9 +185,11 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data

int numRetries = 0;
do {
transactionMetrics.commitAttemptsCounter.increment();
logger.info("Committing transaction as version = {}.", commitAsVersion);
try {
return doCommit(engine, commitAsVersion, attemptCommitInfo, dataActions);
return doCommit(
engine, commitAsVersion, attemptCommitInfo, dataActions, transactionMetrics);
} catch (FileAlreadyExistsException fnfe) {
logger.info(
"Concurrent write detected when committing as version = {}. "
Expand Down Expand Up @@ -230,7 +257,8 @@ private TransactionCommitResult doCommit(
Engine engine,
long commitAsVersion,
CommitInfo attemptCommitInfo,
CloseableIterable<Row> dataActions)
CloseableIterable<Row> dataActions,
TransactionMetrics transactionMetrics)
throws FileAlreadyExistsException {
List<Row> metadataActions = new ArrayList<>();
metadataActions.add(createCommitInfoSingleAction(attemptCommitInfo.toRow()));
Expand Down Expand Up @@ -270,14 +298,26 @@ private TransactionCommitResult doCommit(
}
}

// Counters may be partially incremented from previous tries, reset the action counters to 0
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
transactionMetrics.resetActionCounters();

// Write the staged data to a delta file
wrapEngineExceptionThrowsIO(
() -> {
engine
.getJsonHandler()
.writeJsonFileAtomically(
FileNames.deltaFile(logPath, commitAsVersion),
dataAndMetadataActions,
dataAndMetadataActions.map(
action -> {
transactionMetrics.totalActionsCounter.increment();
if (!action.isNullAt(ADD_FILE_ORDINAL)) {
transactionMetrics.addFilesCounter.increment();
} else if (!action.isNullAt(REMOVE_FILE_ORDINAL)) {
transactionMetrics.removeFilesCounter.increment();
}
return action;
}),
false /* overwrite */);
return null;
},
Expand Down Expand Up @@ -342,6 +382,23 @@ private Map<String, String> getOperationParameters() {
return Collections.emptyMap();
}

private void recordTransactionReport(
Engine engine,
Optional<Long> committedVersion,
TransactionMetrics transactionMetrics,
Optional<Exception> exception) {
TransactionReport transactionReport =
new TransactionReportImpl(
dataPath.toString() /* tablePath */,
operation.toString(),
engineInfo,
committedVersion,
transactionMetrics,
readSnapshot.getSnapshotReport(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the readSnapshot of a new table creation? is it just InitialSnapshot? with version = -1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exception);
engine.getMetricsReporters().forEach(reporter -> reporter.report(transactionReport));
}

/**
* Get the part of the schema of the table that needs the statistics to be collected per file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class SingleAction {

private static final int TXN_ORDINAL = FULL_SCHEMA.indexOf("txn");
public static final int ADD_FILE_ORDINAL = FULL_SCHEMA.indexOf("add");
private static final int REMOVE_FILE_ORDINAL = FULL_SCHEMA.indexOf("remove");
public static final int REMOVE_FILE_ORDINAL = FULL_SCHEMA.indexOf("remove");
private static final int METADATA_ORDINAL = FULL_SCHEMA.indexOf("metaData");
private static final int PROTOCOL_ORDINAL = FULL_SCHEMA.indexOf("protocol");
private static final int COMMIT_INFO_ORDINAL = FULL_SCHEMA.indexOf("commitInfo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.delta.kernel.metrics.MetricsReport;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.metrics.TransactionReport;

/** Defines JSON serializers for {@link MetricsReport} types */
public final class MetricsReportSerializers {
Expand All @@ -40,6 +41,16 @@ public static String serializeSnapshotReport(SnapshotReport snapshotReport)
return OBJECT_MAPPER.writeValueAsString(snapshotReport);
}

/**
* Serializes a {@link TransactionReport} to a JSON string
*
* @throws JsonProcessingException
*/
public static String serializeTransactionReport(TransactionReport transactionReport)
throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(transactionReport);
}

/////////////////////////////////
// Private fields and methods //
////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics;

import io.delta.kernel.metrics.TransactionMetricsResult;

/**
* Stores the metrics for an ongoing transaction. These metrics are updated and recorded throughout
* the transaction using this class.
*
* <p>At report time, we create an immutable {@link TransactionMetricsResult} from an instance of
* {@link TransactionMetrics} to capture the metrics collected during the transaction. The {@link
* TransactionMetricsResult} interface exposes getters for any metrics collected in this class.
*/
public class TransactionMetrics {

public final Timer totalCommitTimer = new Timer();

public final Counter commitAttemptsCounter = new Counter();

public final Counter addFilesCounter = new Counter();

public final Counter removeFilesCounter = new Counter();

public final Counter totalActionsCounter = new Counter();

/**
* Resets the action counters (addFilesCounter, removeFilesCounter and totalActionsCounter) to 0
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
*/
public void resetActionCounters() {
addFilesCounter.reset();
removeFilesCounter.reset();
totalActionsCounter.reset();
}

public TransactionMetricsResult captureTransactionMetricsResult() {
return new TransactionMetricsResult() {

final long totalCommitDuration = totalCommitTimer.totalDurationNs();
final long numCommitAttempts = commitAttemptsCounter.value();
final long numAddFiles = addFilesCounter.value();
final long numRemoveFiles = removeFilesCounter.value();
final long numTotalActions = totalActionsCounter.value();

@Override
public long getTotalCommitDuration() {
return totalCommitDuration;
}

@Override
public long getNumCommitAttempts() {
return numCommitAttempts;
}

@Override
public long getNumAddFiles() {
return numAddFiles;
}

@Override
public long getNumRemoveFiles() {
return numRemoveFiles;
}

@Override
public long getNumTotalActions() {
return numTotalActions;
}
};
}

@Override
public String toString() {
return String.format(
"TransactionMetrics(totalCommitTimer=%s, commitAttemptsCounter=%s, addFilesCounter=%s, "
+ "removeFilesCounter=%s, totalActionsCounter=%s)",
totalCommitTimer,
commitAttemptsCounter,
addFilesCounter,
removeFilesCounter,
totalActionsCounter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.metrics.TransactionMetricsResult;
import io.delta.kernel.metrics.TransactionReport;
import java.util.Optional;
import java.util.UUID;

/** A basic POJO implementation of {@link TransactionReport} for creating them */
public class TransactionReportImpl extends DeltaOperationReportImpl implements TransactionReport {

private final String operation;
private final String engineInfo;
private final long snapshotVersion;
private final Optional<UUID> snapshotReportUUID;
private final Optional<Long> committedVersion;
private final TransactionMetricsResult transactionMetrics;

public TransactionReportImpl(
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
String tablePath,
String operation,
String engineInfo,
Optional<Long> committedVersion,
TransactionMetrics transactionMetrics,
SnapshotReport snapshotReport,
Optional<Exception> exception) {
super(tablePath, exception);
this.operation = requireNonNull(operation);
this.engineInfo = requireNonNull(engineInfo);
this.transactionMetrics = requireNonNull(transactionMetrics).captureTransactionMetricsResult();
this.committedVersion = committedVersion;
requireNonNull(snapshotReport);
checkArgument(
!snapshotReport.getException().isPresent(),
"Expected a successful SnapshotReport provided report has exception");
checkArgument(
snapshotReport.getVersion().isPresent(),
"Expected a successful SnapshotReport but missing version");
this.snapshotVersion = requireNonNull(snapshotReport).getVersion().get();
if (snapshotVersion < 0) {
// For a new table, no Snapshot is actually loaded and thus no SnapshotReport is emitted
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
this.snapshotReportUUID = Optional.empty();
} else {
this.snapshotReportUUID = Optional.of(snapshotReport.getReportUUID());
}
}

@Override
public String getOperation() {
return operation;
}

@Override
public String getEngineInfo() {
return engineInfo;
}

@Override
public long getSnapshotVersion() {
return snapshotVersion;
}

@Override
public Optional<UUID> getSnapshotReportUUID() {
return snapshotReportUUID;
}

@Override
public Optional<Long> getCommittedVersion() {
return committedVersion;
}

@Override
public TransactionMetricsResult getTransactionMetrics() {
return transactionMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.delta.kernel.metrics;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import java.util.Optional;

/** Stores the metrics results for a {@link SnapshotReport} */
@JsonPropertyOrder({"timestampToVersionResolutionDurationNs", "loadInitialDeltaActionsDurationNs"})
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
public interface SnapshotMetricsResult {

/**
Expand Down
Loading
Loading