Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from trinodb:master #284

Merged
merged 3 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.plugin.deltalake;

import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -24,6 +26,7 @@
public record DeltaLakeMergeResult(
List<String> partitionValues,
Optional<String> oldFile,
Optional<DeletionVectorEntry> oldDeletionVector,
Optional<DataFileInfo> newFile)
{
public DeltaLakeMergeResult
Expand All @@ -32,7 +35,9 @@ public record DeltaLakeMergeResult(
// noinspection Java9CollectionFactory
partitionValues = unmodifiableList(new ArrayList<>(requireNonNull(partitionValues, "partitionValues is null")));
requireNonNull(oldFile, "oldFile is null");
requireNonNull(oldDeletionVector, "oldDeletionVector is null");
requireNonNull(newFile, "newFile is null");
checkArgument(oldFile.isPresent() || newFile.isPresent(), "old or new must be present");
checkArgument(oldDeletionVector.isEmpty() || oldFile.isPresent(), "oldDeletionVector is present only when oldFile is present");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public CompletableFuture<Collection<Slice>> finish()
insertPageSink.finish().join().stream()
.map(Slice::getBytes)
.map(dataFileInfoCodec::fromJson)
.map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.of(info)))
.map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.empty(), Optional.of(info)))
.map(mergeResultJsonCodec::toJsonBytes)
.map(Slices::wrappedBuffer)
.forEach(fragments::add);
Expand All @@ -345,7 +345,7 @@ public CompletableFuture<Collection<Slice>> finish()
MoreFutures.getDone(cdfPageSink.finish()).stream()
.map(Slice::getBytes)
.map(dataFileInfoCodec::fromJson)
.map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.of(info)))
.map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.empty(), Optional.of(info)))
.map(mergeResultJsonCodec::toJsonBytes)
.map(Slices::wrappedBuffer)
.forEach(fragments::add);
Expand All @@ -365,7 +365,7 @@ private Slice writeMergeResult(Slice path, FileDeletion deletion)
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
long rowCount = parquetMetadata.getBlocks().stream().map(BlockMetadata::rowCount).mapToLong(Long::longValue).sum();
RoaringBitmapArray rowsRetained = new RoaringBitmapArray();
rowsRetained.addRange(0, rowCount);
rowsRetained.addRange(0, rowCount - 1);
rowsRetained.andNot(deletedRows);
if (rowsRetained.isEmpty()) {
// No rows are retained in the file, so we don't need to write deletion vectors.
Expand Down Expand Up @@ -407,7 +407,7 @@ private Slice writeDeletionVector(
deletion.partitionValues,
readStatistics(parquetMetadata, dataColumns, rowCount),
Optional.of(deletionVectorEntry));
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.of(newFileInfo));
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.empty(), Optional.of(newFileInfo));
return utf8Slice(mergeResultJsonCodec.toJson(result));
}
catch (Throwable e) {
Expand All @@ -426,7 +426,8 @@ private Slice writeDeletionVector(
private Slice onlySourceFile(String sourcePath, FileDeletion deletion)
{
String sourceRelativePath = relativePath(rootTableLocation.toString(), sourcePath);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty());
DeletionVectorEntry deletionVector = deletionVectors.get(sourceRelativePath);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.ofNullable(deletionVector), Optional.empty());
return utf8Slice(mergeResultJsonCodec.toJson(result));
}

Expand All @@ -453,7 +454,7 @@ private List<Slice> rewriteFile(String sourcePath, FileDeletion deletion)

Optional<DataFileInfo> newFileInfo = rewriteParquetFile(sourceLocation, deletion, writer);

DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), newFileInfo);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty(), newFileInfo);
return ImmutableList.of(utf8Slice(mergeResultJsonCodec.toJson(result)));
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
while (addFileEntryIterator.hasNext()) {
long writeTimestamp = Instant.now().toEpochMilli();
AddFileEntry addFileEntry = addFileEntryIterator.next();
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true));
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty()));
}
}
protocolEntry = protocolEntryForTable(tableHandle.getProtocolEntry(), containsTimestampType, tableMetadata.getProperties());
Expand Down Expand Up @@ -1610,7 +1610,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
AddFileEntry addFileEntry = addFileEntryIterator.next();
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true));
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty()));
}
}
}
Expand Down Expand Up @@ -2564,7 +2564,7 @@ private long commitMergeOperation(
if (mergeResult.oldFile().isEmpty()) {
continue;
}
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(mergeResult.oldFile().get()), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true));
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(mergeResult.oldFile().get()), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector()));
}

appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true);
Expand Down Expand Up @@ -2767,7 +2767,8 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
toUriFormat(relativePath),
createPartitionValuesMap(canonicalPartitionValues),
writeTimestamp,
false));
false,
Optional.empty()));
}

// Note: during writes we want to preserve original case of partition columns
Expand Down Expand Up @@ -4160,7 +4161,7 @@ private CommitDeleteOperationResult commitDeleteOperation(
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
AddFileEntry addFileEntry = addFileEntryIterator.next();
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true));
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty()));

Optional<Long> fileRecords = addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords);
allDeletedFilesStatsPresent &= fileRecords.isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public void add(long value)
highBitmap.add(low);
}

/**
* @param rangeStart inclusive beginning of range
* @param rangeEnd exclusive ending of range
*/
public void addRange(long rangeStart, long rangeEnd)
{
checkArgument(rangeStart >= 0 && rangeStart <= rangeEnd, "Unsupported value: %s", rangeStart);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
import jakarta.annotation.Nullable;

import java.util.Map;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public record RemoveFileEntry(
String path,
@Nullable Map<String, String> partitionValues,
long deletionTimestamp,
boolean dataChange)
boolean dataChange,
Optional<DeletionVectorEntry> deletionVector)
{
public RemoveFileEntry
{
requireNonNull(path, "path is null");
requireNonNull(deletionVector, "deletionVector is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public String getColumnName()
private final Optional<RowType> addDeletionVectorType;
private final Optional<RowType> addParsedStatsFieldType;
private final Optional<RowType> removeType;
private final Optional<RowType> removeDeletionVectorType;
private final Optional<RowType> metadataType;
private final Optional<RowType> protocolType;
private final Optional<RowType> commitType;
Expand Down Expand Up @@ -246,6 +247,7 @@ public CheckpointEntryIterator(
addDeletionVectorType = addType.flatMap(type -> getOptionalFieldType(type, "deletionVector"));
addParsedStatsFieldType = addType.flatMap(type -> getOptionalFieldType(type, "stats_parsed"));
removeType = getParquetType(fields, REMOVE);
removeDeletionVectorType = removeType.flatMap(type -> getOptionalFieldType(type, "deletionVector"));
metadataType = getParquetType(fields, METADATA);
protocolType = getParquetType(fields, PROTOCOL);
commitType = getParquetType(fields, COMMIT);
Expand Down Expand Up @@ -537,11 +539,17 @@ private DeltaLakeTransactionLogEntry buildRemoveEntry(ConnectorSession session,
format("Expected block %s to have %d children, but found %s", block, removeFields, removeEntryRow.getFieldCount()));
}
CheckpointFieldReader remove = new CheckpointFieldReader(session, removeEntryRow, type);
Optional<DeletionVectorEntry> deletionVector = Optional.empty();
if (deletionVectorsEnabled) {
deletionVector = Optional.ofNullable(remove.getRow("deletionVector"))
.map(row -> parseDeletionVectorFromParquet(session, row, removeDeletionVectorType.orElseThrow()));
}
RemoveFileEntry result = new RemoveFileEntry(
remove.getString("path"),
remove.getMap(stringMap, "partitionValues"),
remove.getLong("deletionTimestamp"),
remove.getBoolean("dataChange"));
remove.getBoolean("dataChange"),
deletionVector);
log.debug("Result: %s", result);
return DeltaLakeTransactionLogEntry.removeFileEntry(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.reader.MetadataReader;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
Expand Down Expand Up @@ -1163,8 +1164,15 @@ public void testDeletionVectorsAllRows()
copyDirectoryContents(new File(Resources.getResource("databricks122/deletion_vectors").toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));

assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 1);

// 'remove' entry should have the same deletion vector as the previous operation when deleting all rows
DeletionVectorEntry deletionVector = getEntriesFromJson(2, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow().get(2).getAdd().getDeletionVector().orElseThrow();
assertThat(getEntriesFromJson(3, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow().get(1).getRemove().deletionVector().orElseThrow())
.isEqualTo(deletionVector);

assertUpdate("INSERT INTO " + tableName + " VALUES (3, 31), (3, 32)", 2);
assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 3);
assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 2);
assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);

assertUpdate("INSERT INTO " + tableName + " VALUES (1, 10), (2, 20)", 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ public class TestTransactionLogAccess
"age=29/part-00000-3794c463-cb0c-4beb-8d07-7cc1e3b5920f.c000.snappy.parquet");

private static final Set<RemoveFileEntry> EXPECTED_REMOVE_ENTRIES = ImmutableSet.of(
new RemoveFileEntry("age=30/part-00000-7e43a3c3-ea26-4ae7-8eac-8f60cbb4df03.c000.snappy.parquet", null, 1579190163932L, false),
new RemoveFileEntry("age=30/part-00000-72a56c23-01ba-483a-9062-dd0accc86599.c000.snappy.parquet", null, 1579190163932L, false),
new RemoveFileEntry("age=42/part-00000-951068bd-bcf4-4094-bb94-536f3c41d31f.c000.snappy.parquet", null, 1579190155406L, false),
new RemoveFileEntry("age=25/part-00000-609e34b1-5466-4dbc-a780-2708166e7adb.c000.snappy.parquet", null, 1579190163932L, false),
new RemoveFileEntry("age=42/part-00000-6aed618a-2beb-4edd-8466-653e67a9b380.c000.snappy.parquet", null, 1579190155406L, false),
new RemoveFileEntry("age=42/part-00000-b82d8859-84a0-4f05-872c-206b07dd54f0.c000.snappy.parquet", null, 1579190163932L, false));
new RemoveFileEntry("age=30/part-00000-7e43a3c3-ea26-4ae7-8eac-8f60cbb4df03.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()),
new RemoveFileEntry("age=30/part-00000-72a56c23-01ba-483a-9062-dd0accc86599.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()),
new RemoveFileEntry("age=42/part-00000-951068bd-bcf4-4094-bb94-536f3c41d31f.c000.snappy.parquet", null, 1579190155406L, false, Optional.empty()),
new RemoveFileEntry("age=25/part-00000-609e34b1-5466-4dbc-a780-2708166e7adb.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()),
new RemoveFileEntry("age=42/part-00000-6aed618a-2beb-4edd-8466-653e67a9b380.c000.snappy.parquet", null, 1579190155406L, false, Optional.empty()),
new RemoveFileEntry("age=42/part-00000-b82d8859-84a0-4f05-872c-206b07dd54f0.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()));

private final TestingTelemetry testingTelemetry = TestingTelemetry.create("transaction-log-access");
private final TracingFileSystemFactory tracingFileSystemFactory = new TracingFileSystemFactory(testingTelemetry.getTracer(), new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ public void testCheckpointBuilder()
builder.addLogEntry(transactionEntry(app2TransactionV5));

AddFileEntry addA1 = new AddFileEntry("a", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty());
RemoveFileEntry removeA1 = new RemoveFileEntry("a", Map.of(), 1, true);
RemoveFileEntry removeA1 = new RemoveFileEntry("a", Map.of(), 1, true, Optional.empty());
AddFileEntry addA2 = new AddFileEntry("a", Map.of(), 2, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty());
AddFileEntry addB = new AddFileEntry("b", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty());
RemoveFileEntry removeB = new RemoveFileEntry("b", Map.of(), 1, true);
RemoveFileEntry removeC = new RemoveFileEntry("c", Map.of(), 1, true);
RemoveFileEntry removeB = new RemoveFileEntry("b", Map.of(), 1, true, Optional.empty());
RemoveFileEntry removeC = new RemoveFileEntry("c", Map.of(), 1, true, Optional.empty());
builder.addLogEntry(addFileEntry(addA1));
builder.addLogEntry(removeFileEntry(removeA1));
builder.addLogEntry(addFileEntry(addA2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,8 @@ public void testReadAllEntries()
// partitionValues information is missing in the checkpoint
null,
1579190155406L,
false));
false,
Optional.empty()));

// CommitInfoEntry
// not found in the checkpoint, TODO add a test
Expand Down Expand Up @@ -925,7 +926,8 @@ public void testSkipRemoveEntries()
UUID.randomUUID().toString(),
ImmutableMap.of("part_key", "2023-01-01 00:00:00"),
1000,
true))
true,
Optional.empty()))
.collect(toImmutableSet());

CheckpointEntries entries = new CheckpointEntries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public void testCheckpointWriteReadJsonRoundtrip()
"removeFilePath",
ImmutableMap.of("part_key", "7.0"),
1000,
true);
true,
Optional.empty());

CheckpointEntries entries = new CheckpointEntries(
metadataEntry,
Expand Down Expand Up @@ -325,7 +326,8 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip()
"removeFilePath",
ImmutableMap.of("part_key", "7.0"),
1000,
true);
true,
Optional.empty());

CheckpointEntries entries = new CheckpointEntries(
metadataEntry,
Expand Down
Loading