Skip to content

Commit 1ceaf88

Browse files
committed
Reduce filesystem calls for reading delta deletion vector
1 parent 94d9c9b commit 1ceaf88

File tree

3 files changed

+23
-27
lines changed

3 files changed

+23
-27
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
import io.delta.kernel.internal.deletionvectors.Base85Codec;
1818
import io.trino.filesystem.Location;
1919
import io.trino.filesystem.TrinoFileSystem;
20+
import io.trino.filesystem.TrinoInput;
2021
import io.trino.filesystem.TrinoInputFile;
2122
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
2223
import io.trino.spi.TrinoException;
2324
import org.roaringbitmap.RoaringBitmap;
2425

25-
import java.io.DataInputStream;
2626
import java.io.DataOutputStream;
2727
import java.io.IOException;
2828
import java.nio.ByteBuffer;
@@ -32,7 +32,7 @@
3232
import java.util.zip.Checksum;
3333

3434
import static com.google.common.base.Preconditions.checkArgument;
35-
import static com.google.common.base.Preconditions.checkState;
35+
import static io.airlift.slice.SizeOf.SIZE_OF_INT;
3636
import static io.delta.kernel.internal.deletionvectors.Base85Codec.decodeUUID;
3737
import static io.delta.kernel.internal.deletionvectors.Base85Codec.encodeUUID;
3838
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
@@ -63,7 +63,7 @@ public static RoaringBitmapArray readDeletionVectors(TrinoFileSystem fileSystem,
6363
{
6464
if (deletionVector.storageType().equals(UUID_MARKER)) {
6565
TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(deletionVector.pathOrInlineDv())));
66-
byte[] buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes());
66+
ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes());
6767
return deserializeDeletionVectors(buffer);
6868
}
6969
if (deletionVector.storageType().equals(INLINE_MARKER) || deletionVector.storageType().equals(PATH_MARKER)) {
@@ -125,38 +125,40 @@ public static String toFileName(String pathOrInlineDv)
125125
return "%sdeletion_vector_%s.bin".formatted(prefix, uuid);
126126
}
127127

128-
public static byte[] readDeletionVector(TrinoInputFile inputFile, int offset, int expectedSize)
128+
public static ByteBuffer readDeletionVector(TrinoInputFile inputFile, int offset, int expectedSize)
129129
throws IOException
130130
{
131-
byte[] bytes = new byte[expectedSize];
132-
try (DataInputStream inputStream = new DataInputStream(inputFile.newStream())) {
133-
checkState(inputStream.skip(offset) == offset);
134-
int actualSize = inputStream.readInt();
131+
try (TrinoInput input = inputFile.newInput()) {
132+
ByteBuffer buffer = input.readFully(offset, SIZE_OF_INT + expectedSize + SIZE_OF_INT).toByteBuffer();
133+
int actualSize = buffer.getInt(0);
135134
if (actualSize != expectedSize) {
136135
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "The size of deletion vector %s expects %s but got %s".formatted(inputFile.location(), expectedSize, actualSize));
137136
}
138-
inputStream.readFully(bytes);
139-
int checksum = inputStream.readInt();
140-
if (calculateChecksum(bytes) != checksum) {
137+
int checksum = buffer.getInt(SIZE_OF_INT + expectedSize);
138+
if (calculateChecksum(buffer.array(), buffer.arrayOffset() + SIZE_OF_INT, expectedSize) != checksum) {
141139
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Checksum mismatch for deletion vector: " + inputFile.location());
142140
}
141+
return buffer.slice(SIZE_OF_INT, expectedSize).order(LITTLE_ENDIAN);
143142
}
144-
return bytes;
145143
}
146144

147145
private static int calculateChecksum(byte[] data)
146+
{
147+
return calculateChecksum(data, 0, data.length);
148+
}
149+
150+
private static int calculateChecksum(byte[] data, int offset, int length)
148151
{
149152
// Delta Lake allows integer overflow intentionally because it's fine from checksum perspective
150153
// https://github.com/delta-io/delta/blob/039a29abb4abc72ac5912651679233dc983398d6/spark/src/main/scala/org/apache/spark/sql/delta/storage/dv/DeletionVectorStore.scala#L115
151154
Checksum crc = new CRC32();
152-
crc.update(data);
155+
crc.update(data, offset, length);
153156
return (int) crc.getValue();
154157
}
155158

156-
private static RoaringBitmapArray deserializeDeletionVectors(byte[] bytes)
159+
private static RoaringBitmapArray deserializeDeletionVectors(ByteBuffer buffer)
157160
throws IOException
158161
{
159-
ByteBuffer buffer = ByteBuffer.wrap(bytes).order(LITTLE_ENDIAN);
160162
checkArgument(buffer.order() == LITTLE_ENDIAN, "Byte order must be little endian: %s", buffer.order());
161163
int magicNumber = buffer.getInt();
162164
if (magicNumber == PORTABLE_ROARING_BITMAP_MAGIC_NUMBER) {

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,10 @@ public void testCacheDeletionVectorsFileOperations()
247247
.add(new CacheOperation("Alluxio.readCached", "data", 0, 796))
248248
.add(new CacheOperation("Input.readFully", "data", 0, 796))
249249
.add(new CacheOperation("Alluxio.writeCache", "data", 0, 796))
250-
.add(new CacheOperation("Alluxio.readCached", "deletion_vector", 5, 34))
251-
.add(new CacheOperation("Alluxio.readCached", "deletion_vector", 1, 4))
252-
.add(new CacheOperation("Alluxio.readCached", "deletion_vector", 39, 4))
253-
.add(new CacheOperation("Alluxio.readExternalStream", "deletion_vector", 38, 1))
254-
.add(new CacheOperation("Alluxio.readExternalStream", "deletion_vector", 1, 4))
255-
.add(new CacheOperation("InputFile.newStream", "deletion_vector"))
250+
.add(new CacheOperation("Alluxio.readCached", "deletion_vector", 1, 42))
251+
.add(new CacheOperation("Input.readFully", "deletion_vector", 0, 43))
256252
.add(new CacheOperation("InputFile.length", "deletion_vector"))
257-
.addCopies(new CacheOperation("Alluxio.writeCache", "deletion_vector", 0, 43), 2)
253+
.add(new CacheOperation("Alluxio.writeCache", "deletion_vector", 0, 43))
258254
.add(new CacheOperation("InputFile.newStream", "_last_checkpoint"))
259255
.build());
260256
assertFileSystemAccesses(
@@ -268,9 +264,7 @@ public void testCacheDeletionVectorsFileOperations()
268264
.add(new CacheOperation("InputFile.length", "00000000000000000002.json"))
269265
.add(new CacheOperation("InputFile.length", "00000000000000000003.json"))
270266
.add(new CacheOperation("Alluxio.readCached", "data", 0, 796))
271-
.add(new CacheOperation("Alluxio.readCached", "deletion_vector", 5, 34))
272-
.add(new CacheOperation("Alluxio.readCached", "deletion_vector", 1, 4))
273-
.add(new CacheOperation("Alluxio.readCached", "deletion_vector", 39, 4))
267+
.add(new CacheOperation("Alluxio.readCached", "deletion_vector", 1, 42))
274268
.add(new CacheOperation("InputFile.length", "deletion_vector"))
275269
.add(new CacheOperation("InputFile.newStream", "extended_stats.json"))
276270
.add(new CacheOperation("InputFile.newStream", "extendeded_stats.json"))

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,7 @@ public void testDeletionVectors()
941941
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"))
942942
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"))
943943
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"))
944-
.add(new FileOperation(DELETION_VECTOR, "deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin", "InputFile.newStream"))
944+
.add(new FileOperation(DELETION_VECTOR, "deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin", "InputFile.newInput"))
945945
.add(new FileOperation(DATA, "no partition", "InputFile.newInput"))
946946
.build());
947947
assertFileSystemAccesses(
@@ -954,7 +954,7 @@ public void testDeletionVectors()
954954
.add(new FileOperation(STARBURST_EXTENDED_STATS_JSON, "extendeded_stats.json", "InputFile.newStream"))
955955
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
956956
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"))
957-
.add(new FileOperation(DELETION_VECTOR, "deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin", "InputFile.newStream"))
957+
.add(new FileOperation(DELETION_VECTOR, "deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin", "InputFile.newInput"))
958958
.add(new FileOperation(DATA, "no partition", "InputFile.newInput"))
959959
.build());
960960

0 commit comments

Comments
 (0)