Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
qiyuandong-db committed Jan 17, 2025
1 parent 67e6392 commit 70ab1a7
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,16 @@ private TransactionCommitResult commitWithRetry(
domainMetadatas =
RowTracking.updateRowIdHighWatermarkIfNeeded(
readSnapshot,
protocol,
Optional.empty() /* winningTxnRowIdHighWatermark */,
dataActions,
domainMetadatas);
dataActions =
RowTracking.assignBaseRowIdAndDefaultRowCommitVersion(
readSnapshot,
protocol,
Optional.empty() /* winningTxnRowIdHighWatermark */,
Optional.empty() /*prevCommitVersion*/,
Optional.empty() /* prevCommitVersion */,
commitAsVersion,
dataActions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,14 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW
updatedDomainMetadatas =
RowTracking.updateRowIdHighWatermarkIfNeeded(
snapshot,
transaction.getProtocol(),
lastWinningRowIdHighWatermark,
attemptDataActions,
transaction.getDomainMetadatas());
updatedDataActions =
RowTracking.assignBaseRowIdAndDefaultRowCommitVersion(
snapshot,
transaction.getProtocol(),
lastWinningRowIdHighWatermark,
Optional.of(attemptVersion),
lastWinningVersion + 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
*/
package io.delta.kernel.internal.rowtracking;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;

import io.delta.kernel.data.Row;
import io.delta.kernel.internal.DeltaErrors;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.TableFeatures;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.utils.CloseableIterable;
import io.delta.kernel.utils.CloseableIterator;
Expand Down Expand Up @@ -48,6 +51,7 @@ private RowTracking() {
* </ol>
*
* @param snapshot the snapshot of the table that this transaction is reading from
* @param protocol the (updated, if any) protocol that will result from this txn
* @param winningTxnRowIdHighWatermark the latest row ID high watermark from the winning
* transactions. Should be empty for initial assignment and present for conflict resolution.
* @param prevCommitVersion the commit version used by this transaction in the previous commit
Expand All @@ -59,10 +63,16 @@ private RowTracking() {
*/
public static CloseableIterable<Row> assignBaseRowIdAndDefaultRowCommitVersion(
SnapshotImpl snapshot,
Protocol protocol,
Optional<Long> winningTxnRowIdHighWatermark,
Optional<Long> prevCommitVersion,
long commitVersion,
CloseableIterable<Row> dataActions) {
checkArgument(
TableFeatures.isRowTrackingSupported(protocol),
"Base row ID and default row commit version are assigned "
+ "only when feature 'rowTracking' is supported.");

return new CloseableIterable<Row>() {
@Override
public void close() throws IOException {
Expand All @@ -80,6 +90,14 @@ public CloseableIterator<Row> iterator() {
final AtomicLong currRowIdHighWatermark =
new AtomicLong(winningTxnRowIdHighWatermark.orElse(prevRowIdHighWatermark));

// The row ID high watermark must increase monotonically, so the winning transaction's high
// watermark must be greater than or equal to the high watermark from the current
// transaction's read snapshot.
checkArgument(
currRowIdHighWatermark.get() >= prevRowIdHighWatermark,
"The current row ID high watermark must be greater than or equal to "
+ "the high watermark from the transaction's read snapshot");

return dataActions
.iterator()
.map(
Expand Down Expand Up @@ -122,6 +140,7 @@ public CloseableIterator<Row> iterator() {
* ID assignment or conflict resolution to reflect the change to the row ID high watermark.
*
* @param snapshot the snapshot of the table that this transaction is reading at
* @param protocol the (updated, if any) protocol that will result from this txn
* @param winningTxnRowIdHighWatermark the latest row ID high watermark from the winning
* transaction. Should be empty for initial assignment and present for conflict resolution.
* @param dataActions the data actions that this losing transaction was trying to commit
Expand All @@ -130,12 +149,16 @@ public CloseableIterator<Row> iterator() {
*/
public static List<DomainMetadata> updateRowIdHighWatermarkIfNeeded(
SnapshotImpl snapshot,
Protocol protocol,
Optional<Long> winningTxnRowIdHighWatermark,
CloseableIterable<Row> dataActions,
List<DomainMetadata> domainMetadatas) {
checkArgument(
TableFeatures.isRowTrackingSupported(protocol),
"Row ID high watermark is updated only when feature 'rowTracking' is supported.");

// Filter out existing row tracking domainMetadata action, if any
List<DomainMetadata> newDomainMetadatas =
List<DomainMetadata> nonRowTrackingDomainMetadatas =
domainMetadatas.stream()
.filter(dm -> !dm.getDomain().equals(RowTrackingMetadataDomain.DOMAIN_NAME))
.collect(Collectors.toList());
Expand All @@ -149,6 +172,14 @@ public static List<DomainMetadata> updateRowIdHighWatermarkIfNeeded(
final AtomicLong currRowIdHighWatermark =
new AtomicLong(winningTxnRowIdHighWatermark.orElse(prevRowIdHighWatermark));

// The row ID high watermark must increase monotonically, so the winning transaction's high
// watermark (if present) must be greater than or equal to the high watermark from the
// current transaction's read snapshot.
checkArgument(
currRowIdHighWatermark.get() >= prevRowIdHighWatermark,
"The current row ID high watermark must be greater than or equal to "
+ "the high watermark from the transaction's read snapshot");

dataActions.forEach(
row -> {
if (!row.isNullAt(SingleAction.ADD_FILE_ORDINAL)) {
Expand All @@ -161,11 +192,11 @@ public static List<DomainMetadata> updateRowIdHighWatermarkIfNeeded(
});

if (currRowIdHighWatermark.get() != prevRowIdHighWatermark) {
newDomainMetadatas.add(
nonRowTrackingDomainMetadatas.add(
new RowTrackingMetadataDomain(currRowIdHighWatermark.get()).toDomainMetadata());
}

return newDomainMetadatas;
return nonRowTrackingDomainMetadatas;
}

/**
Expand Down

0 comments on commit 70ab1a7

Please sign in to comment.