-
Notifications
You must be signed in to change notification settings - Fork 352
Fix: Group transaction changes by table #3360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |||
| import java.util.Arrays; | ||||
| import java.util.EnumSet; | ||||
| import java.util.HashSet; | ||||
| import java.util.LinkedHashMap; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.Optional; | ||||
|
|
@@ -1042,57 +1043,71 @@ public void commitTransaction(CommitTransactionRequest commitTransactionRequest) | |||
| new TransactionWorkspaceMetaStoreManager(diagnostics, metaStoreManager); | ||||
| ((IcebergCatalog) baseCatalog).setMetaStoreManager(transactionMetaStoreManager); | ||||
|
|
||||
| commitTransactionRequest.tableChanges().stream() | ||||
| .forEach( | ||||
| change -> { | ||||
| Table table = baseCatalog.loadTable(change.identifier()); | ||||
| if (!(table instanceof BaseTable baseTable)) { | ||||
| throw new IllegalStateException( | ||||
| "Cannot wrap catalog that does not produce BaseTable"); | ||||
| } | ||||
| if (isCreate(change)) { | ||||
| throw new BadRequestException( | ||||
| "Unsupported operation: commitTranaction with updateForStagedCreate: %s", | ||||
| change); | ||||
| } | ||||
| // Group all changes by table identifier to handle them atomically | ||||
| // This prevents conflicts when multiple changes target the same table entity | ||||
| // LinkedHashMap preserves insertion order for deterministic processing | ||||
| Map<TableIdentifier, List<UpdateTableRequest>> changesByTable = new LinkedHashMap<>(); | ||||
| for (UpdateTableRequest change : commitTransactionRequest.tableChanges()) { | ||||
| if (isCreate(change)) { | ||||
| throw new BadRequestException( | ||||
| "Unsupported operation: commitTranaction with updateForStagedCreate: %s", change); | ||||
| } | ||||
| changesByTable.computeIfAbsent(change.identifier(), k -> new ArrayList<>()).add(change); | ||||
| } | ||||
|
|
||||
| TableOperations tableOps = baseTable.operations(); | ||||
| TableMetadata currentMetadata = tableOps.current(); | ||||
|
|
||||
| // Validate requirements; any CommitFailedExceptions will fail the overall request | ||||
| change.requirements().forEach(requirement -> requirement.validate(currentMetadata)); | ||||
|
|
||||
| // Apply changes | ||||
| TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(currentMetadata); | ||||
| change.updates().stream() | ||||
| .forEach( | ||||
| singleUpdate -> { | ||||
| // Note: If location-overlap checking is refactored to be atomic, we could | ||||
| // support validation within a single multi-table transaction as well, but | ||||
| // will need to update the TransactionWorkspaceMetaStoreManager to better | ||||
| // expose the concept of being able to read uncommitted updates. | ||||
| if (singleUpdate instanceof MetadataUpdate.SetLocation setLocation) { | ||||
| if (!currentMetadata.location().equals(setLocation.location()) | ||||
| && !realmConfig.getConfig( | ||||
| FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) { | ||||
| throw new BadRequestException( | ||||
| "Unsupported operation: commitTransaction containing SetLocation" | ||||
| + " for table '%s' and new location '%s'", | ||||
| change.identifier(), | ||||
| ((MetadataUpdate.SetLocation) singleUpdate).location()); | ||||
| } | ||||
| } | ||||
|
|
||||
| // Apply updates to builder | ||||
| singleUpdate.applyTo(metadataBuilder); | ||||
| }); | ||||
|
|
||||
| // Commit into transaction workspace we swapped the baseCatalog to use | ||||
| TableMetadata updatedMetadata = metadataBuilder.build(); | ||||
| if (!updatedMetadata.changes().isEmpty()) { | ||||
| tableOps.commit(currentMetadata, updatedMetadata); | ||||
| // Process each table's changes in order | ||||
| changesByTable.forEach( | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new code logic looks correct to me. I think it's a worthy change to merge in its own right. However, as for the issue discussed in #3352 (comment) , I think this fix is effective, but it's not apparent that it will work correctly. The basic problem is that Persistence is called with multiple entity objects for the same ID. This means that the I do believe that one
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, logic also looks correct to me. +1 to adding a comment on the subtlety though that we're coalescing all the updates for a given table into a single Polaris entity update, which is a slightly different behavior than if the caller expected the various Note that this issue was a known limitation, and referenced in a TODO in Line 84 in 8abf19a
The alternative "fix" described there that is more general but more complex and probably has pitfalls is to really queue up the sequential mutations per entity in that "uncommitted persistence layer". The main implications would be that if we plug into the MetaStoreManager layer, we can intercept but inherit other relevant hooks, such as generating events, having entityVersion increments directly match actual update requests, etc. But I'm in favor of this more targeted change-coalescing fix here for now. We could either update/remove the TODO in |
||||
| (tableIdentifier, changes) -> { | ||||
| Table table = baseCatalog.loadTable(tableIdentifier); | ||||
| if (!(table instanceof BaseTable baseTable)) { | ||||
| throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); | ||||
| } | ||||
|
|
||||
| TableOperations tableOps = baseTable.operations(); | ||||
| TableMetadata baseMetadata = tableOps.current(); | ||||
|
|
||||
| // Apply each change sequentially: validate requirements against current state, | ||||
| // then apply updates. This ensures conflicts are detected (e.g., if two changes | ||||
| // both expect schema ID 0, the second will fail after the first increments it). | ||||
| TableMetadata currentMetadata = baseMetadata; | ||||
| for (UpdateTableRequest change : changes) { | ||||
| // Validate requirements against the current metadata state | ||||
| final TableMetadata metadataForValidation = currentMetadata; | ||||
| change | ||||
| .requirements() | ||||
| .forEach(requirement -> requirement.validate(metadataForValidation)); | ||||
|
|
||||
| // Apply this change's updates | ||||
| TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(currentMetadata); | ||||
| for (MetadataUpdate singleUpdate : change.updates()) { | ||||
| // Note: If location-overlap checking is refactored to be atomic, we could | ||||
| // support validation within a single multi-table transaction as well, but | ||||
| // will need to update the TransactionWorkspaceMetaStoreManager to better | ||||
| // expose the concept of being able to read uncommitted updates. | ||||
| if (singleUpdate instanceof MetadataUpdate.SetLocation setLocation) { | ||||
| if (!currentMetadata.location().equals(setLocation.location()) | ||||
| && !realmConfig.getConfig( | ||||
| FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) { | ||||
| throw new BadRequestException( | ||||
| "Unsupported operation: commitTransaction containing SetLocation" | ||||
| + " for table '%s' and new location '%s'", | ||||
| change.identifier(), ((MetadataUpdate.SetLocation) singleUpdate).location()); | ||||
| } | ||||
| } | ||||
| }); | ||||
|
|
||||
| // Apply updates to builder | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add a |
||||
| singleUpdate.applyTo(metadataBuilder); | ||||
| } | ||||
|
|
||||
| // Update currentMetadata to reflect this change for subsequent requirement validation | ||||
| currentMetadata = metadataBuilder.build(); | ||||
| } | ||||
|
|
||||
| // Commit all accumulated changes for this table in a single atomic operation | ||||
| if (!currentMetadata.changes().isEmpty()) { | ||||
| tableOps.commit(baseMetadata, currentMetadata); | ||||
| } | ||||
| }); | ||||
|
|
||||
| // Commit the collected updates in a single atomic operation | ||||
| List<EntityWithPath> pendingUpdates = transactionMetaStoreManager.getPendingUpdates(); | ||||
|
|
||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check and throw in case the table exists already? It'd also be nice to have a test case.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the Iceberg REST Catalog spec allow more than one table update in one
commitTransactionoperation?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spec doesn't mention the uniqueness of the table identifier,
polaris/spec/iceberg-rest-catalog-open-api.yaml
Line 3499 in 70ad92f
Within one
CommitTableRequest, multipleupdatesare allowed,polaris/spec/iceberg-rest-catalog-open-api.yaml
Line 3478 in 70ad92f
With that, I think it's undefined behavior whether the same table identifier could duplicate. We could explicitly disable it in Polaris though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather not disable this in Polaris unless the spec explicitly flagged it as a disallowed use case (which it did not).
In lieu of an explicit spec, applying multiple updates in sequence is a fairly straight-forward operation since each update is self-contained and will be validated against the current metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM.