-
Notifications
You must be signed in to change notification settings - Fork 196
Fix Delta to Iceberg not working on column mapping enabled Delta source table #766
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@xr-chen thank you for the bug report and contribution! There is a test case called ITConversionController that converts between the formats. Is it possible to update this test so it will trigger the same issue that you saw? |
| : field.getFieldId()) | ||
| field -> { | ||
| int id = | ||
| field.getFieldId() == null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the fieldId is set, we need to use that ID. That is how we are able to handle renames in the schema. The reader will lookup the column based on the ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still uses the ID in the schema, if the fieldId is set. Here, I just updated the fieldIdTracker such that it won't return any ID that was already used
| .name("testRecord") | ||
| .dataType(InternalType.RECORD) | ||
| .isNullable(false) | ||
| .fields( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add another field that comes after the list to ensure the next field ID is chosen properly. For example, the field after the list is going to have ID 3. We want to make sure that this carries through to the Iceberg schema.
Let's make sure the Map case is also tested here to ensure there is no regression in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this field to be added come with an ID or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will test the Map case, that's a good point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map case and more fields after the list were added
|
Thanks @the-other-tim-brown for reviewing the changes. You mean adding a new test in that controller for column mapping enabled Delta tables, maybe kind of like this one but for Delta? incubator-xtable/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java Line 712 in 64f38b8
|
…onversion process
|
The problem is actually more complicated than fixing the fieldId generation; the data file written by a column mapping enabled Delta table doesn't follow the table schema at all. We need to somehow map the columns in the data files to the columns in the schema. mapping info in the delta table |
| } | ||
|
|
||
| @Test | ||
| public void testColumnMappingEnabledDeltaToIceberg() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xr-chen to answer your question, yes this is exactly what I was looking for.
Do you think we should also do some minor schema evolution in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@the-other-tim-brown Yes, I think so, but the code actually can't pass this test case now, so it probably won't work on any rename column type of schema change. It seems to me that only populating fieldId doesn't work, and the converted Iceberg doesn't know which 'physical' column in the data file to read data from for a 'logic' column name in the table schema, and it returns null values for all columns if we read from the generated Iceberg table. This issue is probably due to:
- We don't extract the
delta.columnMapping.physicalNamefrom the Delta table's schema in, so we don't know where the column is actually storedincubator-xtable/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
Line 56 in 64f38b8
public class DeltaSchemaExtractor { - In the converted Iceberg, it doesn't have a name mapping to recognize which Parquet column corresponds to a given Iceberg field ID
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added delta's physical column names into the Icerberg's schema.name-mapping.default, and the converted table could read data from the correct place now and could return the same content as the original delta table. But I got a weird issue during testing,
- All tests could pass with this new test disabled when running
mvn verify - This test could pass when running it independently
- When running this test with all other tests together,
ITConversionController.testVariousOperationswill fail
@the-other-tim-brown, is there anything shared among the test cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The filesystem is shared between the tests along with the hadoop and spark configurations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, it's due to the idToStorageName field I added to the IcebergSchemaExtractor, the field wasn't reset before a new sync run, and the schema extractor was used as a singleton, so previous extraction results were carried over to the next run, which fails the test. Now the mvn clean verify passed on my end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch on this, thanks for digging into the issue. I added a comment on that class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that you have this working, should we add some schema evolution here?
I think we should at least add a second commit as a sanity check that everything works as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an existing function I can use to change the schema of the source delta table for testing, or I should implement it by myself?
By a second commit, you mean inserting more records by insertRows and syncing the table again to make sure it works in incremental sync mode as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have some helpers for this. Earlier in this test class you will see a test case that uses GenericTable.getInstanceWithAdditionalColumns. This has some helpers for creating the evolved Delta table schema under the hood that you should be able to build off of.
And yes, just inserting or updating some more records is fine. I just want to ensure there isn't some unexpected side-effect when we set this table property multiple times for Iceberg.
| // The id field for the field. This is used to identify the field in the schema even after | ||
| // renames. | ||
| Integer fieldId; | ||
| @Getter String storageName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment for this field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, will add
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xr-chen "name mapping" is a delta specific concept. The comment should describe more generally what is happening here. Something like The name of the column in the data file used to store this field when it differs from the name in table's definition
The comment should also describe whether this will be null when the names are the same or if the string is expected to be populated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xr-chen will this value be null if it is not different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it will be null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add that detail to the comment?
| } | ||
|
|
||
| private MappedFields updateNameMapping(MappedFields mapping, Map<Integer, String> updates) { | ||
| if (mapping == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can mapping ever be null? If null is returned will NameMapping.of not throw an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's mainly used to end the recursive call; all nested fields of a given field were processed using this function as well. In that case, having null as a nested field actually makes sense. And for the mapping we want to update, I believe MappingUtil.create won't return us a null
| public void syncSchema(InternalSchema schema) { | ||
| Schema latestSchema = schemaExtractor.toIceberg(schema); | ||
| if (!schemaExtractor.getIdToStorageName().isEmpty()) { | ||
| NameMapping mapping = MappingUtil.create(latestSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IcebergTableManager is also setting the default name mapping, should we remove that and just rely on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense; it's better to update the name mapping in a single place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I attempted moving the code for setting default name mapping in IcebergTableManager to here, but some test cases failed; there might be some reasons the map should be initialized there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no special reason. It is just there from when I first set this up. The mapping needs to be there to handle the case where the field ID is not set in the file schema as you have seen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I've moved the name mapping code to the schema sync function now
| private static final String MAP_KEY_FIELD_NAME = "key"; | ||
| private static final String MAP_VALUE_FIELD_NAME = "value"; | ||
| private static final String LIST_ELEMENT_FIELD_NAME = "element"; | ||
| @Getter private final Map<Integer, String> idToStorageName = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xr-chen This adds state to the class so we have to decide if we want to make an instance of this class per conversion or remove this state.
Removing the state would require you to return the map as part of the response for the toIceberg.
I don't have a strong opinion either way, but I would prefer that over the clear call in the toIceberg method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, this will avoid unexpected outcomes caused by adding states to a singleton object. I will update the code
Important Read
What is the purpose of the pull request
Currently, a column mapping enabled Delta table with array/map columns can't be converted into an Iceberg table using xTable because
fieIdIdTrackare already used in the schema, which violates the field ID requirements of Iceberg's NestFiled type.For example, the schema of a Delta table with a string column
nameand an array columnscoreswould look like:In the above schema, there wasn't a
delta.columnMapping.idfor elements in the array column. Similarly, map columns don't have field IDs for their key value.Brief change log
Update the
fieldIdTrackervariable to be the latest field ID every time we get a field ID either from the source table schema or fromfieldIdTrackeritself, such thatfieldIdTrackerwon't return any ID already in the source table schema.Verify this pull request
testToIcebergWithPartialFieldIdsSetto verify the change.