|
31 | 31 |
|
32 | 32 | import java.io.File; |
33 | 33 | import java.net.URI; |
| 34 | +import java.net.URL; |
34 | 35 | import java.nio.file.Files; |
35 | 36 | import java.nio.file.Path; |
36 | 37 | import java.util.Arrays; |
|
45 | 46 | import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.CDF_DATA; |
46 | 47 | import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.CHECKPOINT; |
47 | 48 | import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.DATA; |
| 49 | +import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.DELETION_VECTOR; |
48 | 50 | import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.LAST_CHECKPOINT; |
49 | 51 | import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.STARBURST_EXTENDED_STATS_JSON; |
50 | 52 | import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.TRANSACTION_LOG_JSON; |
|
53 | 55 | import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; |
54 | 56 | import static io.trino.testing.TestingNames.randomNameSuffix; |
55 | 57 | import static java.lang.Math.toIntExact; |
| 58 | +import static java.lang.String.format; |
56 | 59 | import static java.util.Objects.requireNonNull; |
57 | 60 | import static java.util.stream.Collectors.toCollection; |
58 | 61 |
|
@@ -924,6 +927,40 @@ public void testV2CheckpointParquet() |
924 | 927 | assertUpdate("DROP TABLE " + tableName); |
925 | 928 | } |
926 | 929 |
|
| 930 | + @Test |
| 931 | + public void testDeletionVectors() |
| 932 | + { |
| 933 | + String tableName = "test_deletion_vectors_" + randomNameSuffix(); |
| 934 | + registerTable(tableName, "databricks122/deletion_vectors"); |
| 935 | + assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => '%s')".formatted(tableName)); |
| 936 | + assertFileSystemAccesses( |
| 937 | + "SELECT * FROM " + tableName, |
| 938 | + ImmutableMultiset.<FileOperation>builder() |
| 939 | + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) |
| 940 | + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) |
| 941 | + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) |
| 942 | + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) |
| 943 | + .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) |
| 944 | + .add(new FileOperation(DELETION_VECTOR, "deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin", "InputFile.newStream")) |
| 945 | + .add(new FileOperation(DATA, "no partition", "InputFile.newInput")) |
| 946 | + .build()); |
| 947 | + assertFileSystemAccesses( |
| 948 | + "EXPLAIN ANALYZE SELECT * FROM " + tableName, |
| 949 | + ImmutableMultiset.<FileOperation>builder() |
| 950 | + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) |
| 951 | + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) |
| 952 | + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) |
| 953 | + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) |
| 954 | + .add(new FileOperation(STARBURST_EXTENDED_STATS_JSON, "extendeded_stats.json", "InputFile.newStream")) |
| 955 | + .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) |
| 956 | + .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) |
| 957 | + .add(new FileOperation(DELETION_VECTOR, "deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin", "InputFile.newStream")) |
| 958 | + .add(new FileOperation(DATA, "no partition", "InputFile.newInput")) |
| 959 | + .build()); |
| 960 | + |
| 961 | + assertUpdate("DROP TABLE " + tableName); |
| 962 | + } |
| 963 | + |
927 | 964 | private int countCdfFilesForKey(String partitionValue) |
928 | 965 | { |
929 | 966 | String path = (String) computeScalar("SELECT \"$path\" FROM table_changes_file_system_access WHERE key = '" + partitionValue + "'"); |
@@ -979,6 +1016,9 @@ public static FileOperation create(String path, String operationType) |
979 | 1016 | if (path.matches(".*/_delta_log/_starburst_meta/extendeded_stats.json")) { |
980 | 1017 | return new FileOperation(STARBURST_EXTENDED_STATS_JSON, fileName, operationType); |
981 | 1018 | } |
| 1019 | + if (path.contains("/deletion_vector_")) { |
| 1020 | + return new FileOperation(DELETION_VECTOR, fileName, operationType); |
| 1021 | + } |
982 | 1022 | Pattern dataFilePattern = Pattern.compile(".*?/(?<partition>key=[^/]*/)?[^/]+"); |
983 | 1023 | if (path.matches(".*/_change_data/.*")) { |
984 | 1024 | Matcher matcher = dataFilePattern.matcher(path); |
@@ -1012,6 +1052,18 @@ enum FileType |
1012 | 1052 | STARBURST_EXTENDED_STATS_JSON, |
1013 | 1053 | DATA, |
1014 | 1054 | CDF_DATA, |
| 1055 | + DELETION_VECTOR, |
1015 | 1056 | /**/; |
1016 | 1057 | } |
| 1058 | + |
| 1059 | + private void registerTable(String name, String resourcePath) |
| 1060 | + { |
| 1061 | + String dataPath = getResourceLocation(resourcePath).toExternalForm(); |
| 1062 | + getQueryRunner().execute(format("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')", name, dataPath)); |
| 1063 | + } |
| 1064 | + |
| 1065 | + private URL getResourceLocation(String resourcePath) |
| 1066 | + { |
| 1067 | + return getClass().getClassLoader().getResource(resourcePath); |
| 1068 | + } |
1017 | 1069 | } |
0 commit comments