Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class InternalField {
// The id field for the field. This is used to identify the field in the schema even after
// renames.
Integer fieldId;
// The name of the column in the data file used to store this field if it differs from the name in
// the table's definition; otherwise, null
@Getter String storageName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment for this field?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will add

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xr-chen "name mapping" is a delta specific concept. The comment should describe more generally what is happening here. Something like The name of the column in the data file used to store this field when it differs from the name in table's definition
The comment should also describe whether this will be null when the names are the same or if the string is expected to be populated

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xr-chen will this value be null if it is not different?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add that detail to the comment?


// represents the fully qualified path to the field (dot separated)
@Getter(lazy = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaSchemaExtractor {
private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id";
private static final String DELTA_COLUMN_MAPPING_NAME = "delta.columnMapping.physicalName";
private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor();
// Timestamps in Delta are microsecond precision by default
private static final Map<InternalSchema.MetadataKey, Object>
Expand Down Expand Up @@ -136,6 +137,10 @@ private InternalSchema toInternalSchema(
field.metadata().contains(DELTA_COLUMN_MAPPING_ID)
? (int) field.metadata().getLong(DELTA_COLUMN_MAPPING_ID)
: null;
String storageName =
field.metadata().contains(DELTA_COLUMN_MAPPING_NAME)
? field.metadata().getString(DELTA_COLUMN_MAPPING_NAME)
: null;
String fieldComment =
field.getComment().isDefined() ? field.getComment().get() : null;
InternalSchema schema =
Expand All @@ -148,6 +153,7 @@ private InternalSchema toInternalSchema(
return InternalField.builder()
.name(field.name())
.fieldId(fieldId)
.storageName(storageName)
.parentPath(parentPath)
.schema(schema)
.defaultValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import lombok.extern.log4j.Log4j2;

Expand All @@ -39,6 +42,11 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.MappedFields;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;

import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.model.InternalTable;
Expand Down Expand Up @@ -161,12 +169,42 @@ private void initializeTableIfRequired(InternalTable internalTable) {
}
}

private MappedFields updateNameMapping(MappedFields mapping, Map<Integer, String> updates) {
if (mapping == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can mapping ever be null? If null is returned will NameMapping.of not throw an exception?

Copy link
Author

@xr-chen xr-chen Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mainly used to end the recursive call; all nested fields of a given field were processed using this function as well. In that case, having null as a nested field actually makes sense. And for the mapping we want to update, I believe MappingUtil.create won't return us a null

return null;
}
List<MappedField> fieldResults = new ArrayList<>();
for (MappedField field : mapping.fields()) {
Set<String> fieldNames = new HashSet<>(field.names());
if (updates.containsKey(field.id())) {
fieldNames.add(updates.get(field.id()));
}
MappedFields nestedMapping = updateNameMapping(field.nestedMapping(), updates);
fieldResults.add(MappedField.of(field.id(), fieldNames, nestedMapping));
}
return MappedFields.of(fieldResults);
}

@Override
public void syncSchema(InternalSchema schema) {
Schema latestSchema = schemaExtractor.toIceberg(schema);
String mappingJson = transaction.table().properties().get(TableProperties.DEFAULT_NAME_MAPPING);
boolean hasFieldIds =
schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != null);
// Recreate name mapping when field IDs were provided in the source schema to ensure every
// field in the mapping was assigned the same ID as what is in the source schema
NameMapping mapping =
mappingJson == null || hasFieldIds
? MappingUtil.create(latestSchema)
: NameMappingParser.fromJson(mappingJson);
mapping =
NameMapping.of(
updateNameMapping(mapping.asMappedFields(), schemaExtractor.getIdToStorageName()));
transaction
.updateProperties()
.set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(mapping))
.commit();
if (!transaction.table().schema().sameSchema(latestSchema)) {
boolean hasFieldIds =
schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != null);
if (hasFieldIds) {
// There is no clean way to sync the schema with the provided field IDs using the
// transaction API so we commit the current transaction and interact directly with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.stream.Collectors;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;

Expand All @@ -53,18 +54,63 @@
@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class IcebergSchemaExtractor {
private static final IcebergSchemaExtractor INSTANCE = new IcebergSchemaExtractor();
private static final String MAP_KEY_FIELD_NAME = "key";
private static final String MAP_VALUE_FIELD_NAME = "value";
private static final String LIST_ELEMENT_FIELD_NAME = "element";
@Getter private final Map<Integer, String> idToStorageName = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xr-chen This adds state to the class so we have to decide if we want to make an instance of this class per conversion or remove this state.

Removing the state would require you to return the map as part of the response for the toIceberg.

I don't have a strong opinion either way, but I would prefer that over the clear call in the toIceberg method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, this will avoid unexpected outcomes caused by adding states to a singleton object. I will update the code


public static IcebergSchemaExtractor getInstance() {
return INSTANCE;
return new IcebergSchemaExtractor();
}

private void initializeFieldIdTracker(InternalSchema schema, AtomicInteger fieldIdTracker) {
schema.getFields().stream()
.forEach(
field -> {
if (field.getFieldId() != null)
fieldIdTracker.accumulateAndGet(field.getFieldId(), Math::max);
initializeFieldIdTracker(field, fieldIdTracker);
});
}

private void initializeFieldIdTracker(InternalField field, AtomicInteger fieldIdTracker) {
switch (field.getSchema().getDataType()) {
case RECORD:
initializeFieldIdTracker(field.getSchema(), fieldIdTracker);
return;
case MAP:
field.getSchema().getFields().stream()
.filter(
mapField ->
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName())
|| InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName()))
.forEach(
mapField -> {
if (mapField.getFieldId() != null)
fieldIdTracker.accumulateAndGet(mapField.getFieldId(), Math::max);
initializeFieldIdTracker(mapField, fieldIdTracker);
});
return;
case LIST:
field.getSchema().getFields().stream()
.filter(
arrayField ->
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(arrayField.getName()))
.forEach(
arrayField -> {
if (arrayField.getFieldId() != null)
fieldIdTracker.accumulateAndGet(arrayField.getFieldId(), Math::max);
initializeFieldIdTracker(arrayField, fieldIdTracker);
});
}
}

public Schema toIceberg(InternalSchema internalSchema) {
// if field IDs are not assigned in the source, just use an incrementing integer
AtomicInteger fieldIdTracker = new AtomicInteger(0);
// traverse the schema before conversion to ensure fieldIdTracker won't return any
// fieldIds that are already present in the schema
initializeFieldIdTracker(internalSchema, fieldIdTracker);
List<Types.NestedField> nestedFields = convertFields(internalSchema, fieldIdTracker);
List<InternalField> recordKeyFields = internalSchema.getRecordKeyFields();
boolean recordKeyFieldsAreNotRequired =
Expand Down Expand Up @@ -154,6 +200,9 @@ private List<Types.NestedField> convertFields(
List<Types.NestedField> nestedFields = new ArrayList<>(schema.getFields().size());
for (int i = 0; i < schema.getFields().size(); i++) {
InternalField field = schema.getFields().get(i);
if (field.getStorageName() != null) {
idToStorageName.put(ids.get(i), field.getStorageName());
}
nestedFields.add(
Types.NestedField.of(
ids.get(i),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMappingParser;

@AllArgsConstructor(staticName = "of")
@Log4j2
Expand Down Expand Up @@ -88,14 +85,14 @@ Table getOrCreateTable(
new Schema(),
PartitionSpec.unpartitioned(),
basePath,
getDefaultMappingProperties(schema)))
Collections.emptyMap()))
.orElseGet(
() ->
getHadoopTables()
.create(
new Schema(),
PartitionSpec.unpartitioned(),
getDefaultMappingProperties(schema),
Collections.emptyMap(),
basePath));
// set the schema with the provided field IDs
TableOperations operations = ((BaseTable) tableWithEmptySchema).operations();
Expand All @@ -112,11 +109,6 @@ Table getOrCreateTable(
}
}

private Map<String, String> getDefaultMappingProperties(Schema schema) {
return Collections.singletonMap(
TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(MappingUtil.create(schema)));
}

private Optional<Catalog> getCatalog(IcebergCatalogConfig catalogConfig) {
if (catalogConfig == null) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,48 @@ public void testIcebergCorruptedSnapshotRecovery() throws Exception {
}
}

@Test
public void testColumnMappingEnabledDeltaToIceberg() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xr-chen to answer your question, yes this is exactly what I was looking for.

Do you think we should also do some minor schema evolution in this case?

Copy link
Author

@xr-chen xr-chen Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown Yes, I think so, but the code actually can't pass this test case now, so it probably won't work on any rename column type of schema change. It seems to me that only populating fieldId doesn't work, and the converted Iceberg doesn't know which 'physical' column in the data file to read data from for a 'logic' column name in the table schema, and it returns null values for all columns if we read from the generated Iceberg table. This issue is probably due to:

  1. We don't extract the delta.columnMapping.physicalName from the Delta table's schema in , so we don't know where the column is actually stored
  2. In the converted Iceberg, it doesn't have a name mapping to recognize which Parquet column corresponds to a given Iceberg field ID

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added delta's physical column names into the Icerberg's schema.name-mapping.default, and the converted table could read data from the correct place now and could return the same content as the original delta table. But I got a weird issue during testing,

  • All tests could pass with this new test disabled when running mvn verify
  • This test could pass when running it independently
  • When running this test with all other tests together, ITConversionController.testVariousOperations will fail
    @the-other-tim-brown, is there anything shared among the test cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filesystem is shared between the tests along with the hadoop and spark configurations

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, it's due to the idToStorageName field I added to the IcebergSchemaExtractor, the field wasn't reset before a new sync run, and the schema extractor was used as a singleton, so previous extraction results were carried over to the next run, which fails the test. Now the mvn clean verify passed on my end

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on this, thanks for digging into the issue. I added a comment on that class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you have this working, should we add some schema evolution here?

I think we should at least add a second commit as a sanity check that everything works as expected.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an existing function I can use to change the schema of the source delta table for testing, or I should implement it by myself?

By a second commit, you mean inserting more records by insertRows and syncing the table again to make sure it works in incremental sync mode as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have some helpers for this. Earlier in this test class you will see a test case that uses GenericTable.getInstanceWithAdditionalColumns. This has some helpers for creating the evolved Delta table schema under the hood that you should be able to build off of.

And yes, just inserting or updating some more records is fine. I just want to ensure there isn't some unexpected side-effect when we set this table property multiple times for Iceberg.

String tableName = getTableName();
ConversionSourceProvider<?> conversionSourceProvider = getConversionSourceProvider(DELTA);
try (TestSparkDeltaTable table =
TestSparkDeltaTable.forColumnMappingEnabled(tableName, tempDir, sparkSession, null)) {
table.insertRows(20);
ConversionController conversionController =
new ConversionController(jsc.hadoopConfiguration());
ConversionConfig conversionConfig =
getTableSyncConfig(
DELTA,
SyncMode.INCREMENTAL,
tableName,
table,
Collections.singletonList(ICEBERG),
null,
null);
conversionController.sync(conversionConfig, conversionSourceProvider);
table.insertRows(10);
conversionController.sync(conversionConfig, conversionSourceProvider);
table.insertRows(10);
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 40);

table.dropColumn("long_field");
table.insertRows(10);
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 50);

table.renameColumn("double_field", "scores");
table.insertRows(10);
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 60);

table.addColumn();
table.insertRows(10);
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 70);
}
}

@Test
public void testMetadataRetention() throws Exception {
String tableName = getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,36 @@ public static TestSparkDeltaTable forSchemaWithAdditionalColumnsAndPartitioning(
return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, true);
}

public static TestSparkDeltaTable forColumnMappingEnabled(
String tableName, Path tempDir, SparkSession sparkSession, String partitionField) {
return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, true, true);
}

public TestSparkDeltaTable(
String name,
Path tempDir,
SparkSession sparkSession,
String partitionField,
boolean includeAdditionalColumns) {
this(name, tempDir, sparkSession, partitionField, includeAdditionalColumns, false);
}

public TestSparkDeltaTable(
String name,
Path tempDir,
SparkSession sparkSession,
String partitionField,
boolean includeAdditionalColumns,
boolean enableColumnMapping) {
try {
this.tableName = name;
this.basePath = initBasePath(tempDir, tableName);
this.sparkSession = sparkSession;
this.partitionField = partitionField;
this.includeAdditionalColumns = includeAdditionalColumns;
this.testDeltaHelper =
TestDeltaHelper.createTestDataHelper(partitionField, includeAdditionalColumns);
TestDeltaHelper.createTestDataHelper(
partitionField, includeAdditionalColumns, enableColumnMapping);
testDeltaHelper.createTable(sparkSession, tableName, basePath);
this.deltaLog = DeltaLog.forTable(sparkSession, basePath);
this.deltaTable = DeltaTable.forPath(sparkSession, basePath);
Expand Down Expand Up @@ -260,4 +276,20 @@ public List<String> getColumnsToSelect() {
.filter(columnName -> !columnName.equals("yearOfBirth"))
.collect(Collectors.toList());
}

public void dropColumn(String colName) {
testDeltaHelper.dropColumn(colName);
sparkSession.sql(String.format("ALTER TABLE delta.`%s` DROP COLUMN %s", basePath, colName));
}

public void renameColumn(String colName, String newColName) {
testDeltaHelper.renameColumn(colName, newColName);
sparkSession.sql(
String.format(
"ALTER TABLE delta.`%s` RENAME COLUMN %s TO %s", basePath, colName, newColName));
}

public void addColumn() {
testDeltaHelper.addColumn();
}
}
Loading