Skip to content

Commit 3647ad4

Browse files
committed
Done refactor
1 parent 4965ed4 commit 3647ad4

File tree

8 files changed

+212
-113
lines changed

8 files changed

+212
-113
lines changed

connectors/flink/src/main/scala/io/delta/flink/internal/KernelSnapshotWrapper.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919
package io.delta.standalone.internal;
2020

2121
import java.lang.reflect.Constructor;
22-
import java.util.ArrayList;
2322
import java.util.List;
2423
import java.util.Optional;
2524

26-
import io.delta.kernel.data.ColumnVector;
2725
import io.delta.kernel.internal.types.DataTypeJsonSerDe;
2826
import org.slf4j.Logger;
2927
import org.slf4j.LoggerFactory;
@@ -140,11 +138,7 @@ private Metadata convertMetadata() {
140138
);
141139

142140
// Convert the partition columns from a ColumnVector to a List<String>
143-
ColumnVector partitionsVec = kernelMetadata.getPartitionColumns().getElements();
144-
ArrayList<String> partitionColumns = new ArrayList<String>(partitionsVec.getSize());
145-
for(int i = 0; i < partitionsVec.getSize(); i++) {
146-
partitionColumns.add(partitionsVec.getString(i));
147-
}
141+
final List<String> partitionColumns = kernelMetadata.getPartitionColumns();
148142

149143
// Convert over the schema StructType
150144
List<io.delta.kernel.types.StructField> kernelFields = kernelMetadata.getSchema().fields();
@@ -186,8 +180,8 @@ private Metadata convertMetadata() {
186180

187181
return new Metadata(
188182
kernelMetadata.getId(),
189-
kernelMetadata.getName().orElse(null),
190-
kernelMetadata.getDescription().orElse(null),
183+
kernelMetadata.getName(),
184+
kernelMetadata.getDescription(),
191185
format,
192186
partitionColumns,
193187
kernelMetadata.getConfiguration(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.actions;
18+
19+
import io.delta.kernel.annotation.Evolving;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Optional;
23+
24+
/**
25+
* Interface for metadata actions in Delta. The metadata defines the metadata of the table.
26+
*
27+
* @since 3.4.0
28+
*/
29+
@Evolving
30+
public interface AbstractMetadata {
31+
32+
/** A unique table identifier. */
33+
String getId();
34+
35+
/** User-specified table identifier. */
36+
// TODO: this should be Optional<String>. Delta protocol defines 'name' as optional.
37+
String getName();
38+
39+
/** User-specified table description. */
40+
// TODO: this should be Optional<String>. Delta protocol defines 'description' as optional.
41+
String getDescription();
42+
43+
/** The table provider format. */
44+
String getProvider();
45+
46+
/** The format options */
47+
Map<String, String> getFormatOptions();
48+
49+
/** The table schema in string representation. */
50+
String getSchemaString();
51+
52+
/** List of partition columns. */
53+
List<String> getPartitionColumns();
54+
55+
/** The table properties defined on the table. */
56+
Map<String, String> getConfiguration();
57+
58+
/** Timestamp for the creation of this metadata. */
59+
Optional<Long> getCreatedTime();
60+
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public ScanImpl(
7777
this.dataPath = dataPath;
7878
this.partitionColToStructFieldMap =
7979
() -> {
80-
Set<String> partitionColNames = metadata.getPartitionColNames();
80+
Set<String> partitionColNames = metadata.getPartitionColumnsLowercase();
8181
return metadata.getSchema().fields().stream()
8282
.filter(field -> partitionColNames.contains(field.getName().toLowerCase(Locale.ROOT)))
8383
.collect(toMap(field -> field.getName().toLowerCase(Locale.ROOT), identity()));
@@ -156,7 +156,7 @@ public Row getScanState(Engine engine) {
156156
// Compute the physical data read schema, basically the list of columns to read
157157
// from a Parquet data file. It should exclude partition columns and include
158158
// row_index metadata columns (in case DVs are present)
159-
List<String> partitionColumns = VectorUtils.toJavaList(metadata.getPartitionColumns());
159+
List<String> partitionColumns = metadata.getPartitionColumns();
160160
StructType physicalDataReadSchema =
161161
PartitionUtils.physicalSchemaWithoutPartitionColumns(
162162
readSchema, /* logical read schema */
@@ -185,7 +185,7 @@ private Optional<Tuple2<Predicate, Predicate>> splitFilters(Optional<Predicate>
185185
return filter.map(
186186
predicate ->
187187
PartitionUtils.splitMetadataAndDataPredicates(
188-
predicate, metadata.getPartitionColNames()));
188+
predicate, metadata.getPartitionColumnsLowercase()));
189189
}
190190

191191
private Optional<Predicate> getDataFilters() {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
3232
import io.delta.kernel.internal.replay.LogReplay;
3333
import io.delta.kernel.internal.snapshot.LogSegment;
34-
import io.delta.kernel.internal.util.VectorUtils;
3534
import io.delta.kernel.metrics.SnapshotReport;
3635
import io.delta.kernel.types.StructType;
3736
import java.util.List;
@@ -134,7 +133,7 @@ public Protocol getProtocol() {
134133
}
135134

136135
public List<String> getPartitionColumnNames(Engine engine) {
137-
return VectorUtils.toJavaList(getMetadata().getPartitionColumns());
136+
return getMetadata().getPartitionColumns();
138137
}
139138

140139
public SnapshotReport getSnapshotReport() {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public Row getTransactionState(Engine engine) {
110110

111111
@Override
112112
public List<String> getPartitionColumns(Engine engine) {
113-
return VectorUtils.toJavaList(metadata.getPartitionColumns());
113+
return metadata.getPartitionColumns();
114114
}
115115

116116
@Override
@@ -332,7 +332,7 @@ private boolean isReadyForCheckpoint(long newVersion) {
332332

333333
private Map<String, String> getOperationParameters() {
334334
if (isNewTable) {
335-
List<String> partitionCols = VectorUtils.toJavaList(metadata.getPartitionColumns());
335+
List<String> partitionCols = metadata.getPartitionColumns();
336336
String partitionBy =
337337
partitionCols.stream()
338338
.map(col -> "\"" + col + "\"")

0 commit comments

Comments
 (0)