Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] [Refactor] Remove superfluous engine param from ScanBuilder APIs #4106

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions docs/source/delta-kernel-java.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine);
Now that we have a consistent snapshot view of the table, we can query more details about the table. For example, you can get the version and schema of this snapshot.

```java
long version = mySnapshot.getVersion(myEngine);
StructType tableSchema = mySnapshot.getSchema(myEngine);
long version = mySnapshot.getVersion();
StructType tableSchema = mySnapshot.getSchema();
```

Next, to read the table data, we have to *build* a [`Scan`](https://delta-io.github.io/delta/snapshot/kernel-api/java/io/delta/kernel/Scan.html) object. In order to build a `Scan` object, create a [`ScanBuilder`](https://delta-io.github.io/delta/snapshot/kernel-api/java/io/delta/kernel/ScanBuilder.html) object which optionally allows selecting a subset of columns to read or setting a query filter. For now, ignore these optional settings.

```java
Scan myScan = mySnapshot.getScanBuilder(myEngine).build()
Scan myScan = mySnapshot.getScanBuilder().build()

// Common information about scanning for all data files to read.
Row scanState = myScan.getScanState(myEngine)
Expand Down Expand Up @@ -224,9 +224,7 @@ Predicate filter = new Predicate(
"=",
Arrays.asList(new Column("columnX"), Literal.ofInt(1)));

Scan myFilteredScan = mySnapshot.buildScan(engine)
.withFilter(myEngine, filter)
.build()
Scan myFilteredScan = mySnapshot.getScanBuilder().withFilter(filter).build()

// Subset of the given filter that is not guaranteed to be satisfied by
// Delta Kernel when it returns data. This filter is used by Delta Kernel
Expand Down Expand Up @@ -845,10 +843,7 @@ import io.delta.kernel.types.*;
StructType readSchema = ... ; // convert engine schema
Predicate filterExpr = ... ; // convert engine filter expression

Scan myScan = mySnapshot.buildScan(engine)
.withFilter(myEngine, filterExpr)
.withReadSchema(myEngine, readSchema)
.build()
Scan myScan = mySnapshot.getScanBuilder().withFilter(filterExpr).withReadSchema(readSchema).build();

```

Expand Down
4 changes: 2 additions & 2 deletions docs/source/delta-kernel.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ Here is an example of a simple table scan with a filter:
Engine myEngine = DefaultEngine.create() ; // define a engine (more details below)
Table myTable = Table.forPath("/delta/table/path"); // define what table to scan
Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine); // define which version of table to scan
Scan myScan = mySnapshot.getScanBuilder(myEngine) // specify the scan details
.withFilters(myEngine, scanFilter)
Scan myScan = mySnapshot.getScanBuilder() // specify the scan details
.withFilters(scanFilter)
.build();
CloseableIterator<ColumnarBatch> physicalData = // read the Parquet data files
.. read from Parquet data files ...
Expand Down
13 changes: 4 additions & 9 deletions kernel/USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine);
Now that we have a consistent snapshot view of the table, we can query more details about the table. For example, you can get the version and schema of this snapshot.

```java
long version = mySnapshot.getVersion(myEngine);
StructType tableSchema = mySnapshot.getSchema(myEngine);
long version = mySnapshot.getVersion();
StructType tableSchema = mySnapshot.getSchema();
```

Next, to read the table data, we have to *build* a [`Scan`](https://delta-io.github.io/delta/snapshot/kernel-api/java/io/delta/kernel/Scan.html) object. In order to build a `Scan` object, create a [`ScanBuilder`](https://delta-io.github.io/delta/snapshot/kernel-api/java/io/delta/kernel/ScanBuilder.html) object which optionally allows selecting a subset of columns to read or setting a query filter. For now, ignore these optional settings.
Expand Down Expand Up @@ -220,9 +220,7 @@ Predicate filter = new Predicate(
"=",
Arrays.asList(new Column("columnX"), Literal.ofInt(1)));

Scan myFilteredScan = mySnapshot.buildScan(engine)
.withFilter(myEngine, filter)
.build()
Scan myFilteredScan = mySnapshot.getScanBuilder().withFilter(filter).build()

// Subset of the given filter that is not guaranteed to be satisfied by
// Delta Kernel when it returns data. This filter is used by Delta Kernel
Expand Down Expand Up @@ -865,10 +863,7 @@ import io.delta.kernel.types.*;
StructType readSchema = ... ; // convert engine schema
Predicate filterExpr = ... ; // convert engine filter expression

Scan myScan = mySnapshot.buildScan(engine)
.withFilter(myEngine, filterExpr)
.withReadSchema(myEngine, readSchema)
.build()
Scan myScan = mySnapshot.getScanBuilder().withFilter(filterExpr).withReadSchema(readSchema).build();

```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ public int show(int limit, Optional<List<String>> columnsOpt, Optional<Predicate
Snapshot snapshot = table.getLatestSnapshot(engine);
StructType readSchema = pruneSchema(snapshot.getSchema(), columnsOpt);

ScanBuilder scanBuilder = snapshot.getScanBuilder().withReadSchema(engine, readSchema);
ScanBuilder scanBuilder = snapshot.getScanBuilder().withReadSchema(readSchema);

if (predicate.isPresent()) {
scanBuilder = scanBuilder.withFilter(engine, predicate.get());
scanBuilder = scanBuilder.withFilter(predicate.get());
}

return new Reader(limit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ public int show(int limit, Optional<List<String>> columnsOpt, Optional<Predicate
Snapshot snapshot = table.getLatestSnapshot(engine);
StructType readSchema = pruneSchema(snapshot.getSchema(), columnsOpt);

ScanBuilder scanBuilder = snapshot.getScanBuilder().withReadSchema(engine, readSchema);
ScanBuilder scanBuilder = snapshot.getScanBuilder().withReadSchema(readSchema);

if (predicate.isPresent()) {
scanBuilder = scanBuilder.withFilter(engine, predicate.get());
scanBuilder = scanBuilder.withFilter(predicate.get());
}

return readData(readSchema, scanBuilder.build(), limit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,19 @@ public interface ScanBuilder {
* read from the scan files (returned by {@link Scan#getScanFiles(Engine)}) to completely filter
* out the data that doesn't satisfy the filter.```
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @param predicate a {@link Predicate} to prune the metadata or data.
* @return A {@link ScanBuilder} with filter applied.
*/
ScanBuilder withFilter(Engine engine, Predicate predicate);
ScanBuilder withFilter(Predicate predicate);

/**
* Apply the given <i>readSchema</i>. If the builder already has a projection applied, calling
* this again replaces the existing projection.
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @param readSchema Subset of columns to read from the Delta table.
* @return A {@link ScanBuilder} with projection pruning.
*/
ScanBuilder withReadSchema(Engine engine, StructType readSchema);
ScanBuilder withReadSchema(StructType readSchema);

/** @return Build the {@link Scan instance} */
Scan build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
Expand Down Expand Up @@ -55,7 +54,7 @@ public ScanBuilderImpl(
}

@Override
public ScanBuilder withFilter(Engine engine, Predicate predicate) {
public ScanBuilder withFilter(Predicate predicate) {
if (this.predicate.isPresent()) {
throw new IllegalArgumentException("There already exists a filter in current builder");
}
Expand All @@ -64,7 +63,7 @@ public ScanBuilder withFilter(Engine engine, Predicate predicate) {
}

@Override
public ScanBuilder withReadSchema(Engine engine, StructType readSchema) {
public ScanBuilder withReadSchema(StructType readSchema) {
// TODO: validate the readSchema is a subset of the table schema
this.readSchema = readSchema;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static boolean partitionExists(
io.delta.kernel.internal.util.PartitionUtils.validatePredicateOnlyOnPartitionColumns(
partitionPredicate, snapshotPartColNames);

final Scan scan = snapshot.getScanBuilder().withFilter(engine, partitionPredicate).build();
final Scan scan = snapshot.getScanBuilder().withFilter(partitionPredicate).build();

try (CloseableIterator<FilteredColumnarBatch> columnarBatchIter = scan.getScanFiles(engine)) {
while (columnarBatchIter.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,13 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
val snapshot = latestSnapshot(tablePath)
hits.foreach { predicate =>
val scanFiles = collectScanFileRows(
snapshot.getScanBuilder()
.withFilter(defaultEngine, predicate)
.build())
snapshot.getScanBuilder().withFilter(predicate).build())
assert(scanFiles.nonEmpty, s"Expected hit but got miss for $predicate")
}
misses.foreach { predicate =>
val scanFiles = collectScanFileRows(
snapshot.getScanBuilder()
.withFilter(defaultEngine, predicate)
.withFilter(predicate)
.build())
assert(scanFiles.isEmpty, s"Expected miss but got hit for $predicate\n" +
s"Returned scan files have stats: ${getScanFileStats(scanFiles)}"
Expand All @@ -121,9 +119,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
val snapshot = latestSnapshot(tablePath)
filterToNumExpFiles.foreach { case (filter, numExpFiles) =>
val scanFiles = collectScanFileRows(
snapshot.getScanBuilder()
.withFilter(defaultEngine, filter)
.build())
snapshot.getScanBuilder().withFilter(filter).build())
assert(scanFiles.length == numExpFiles,
s"Expected $numExpFiles but found ${scanFiles.length} for $filter")
}
Expand Down Expand Up @@ -1010,7 +1006,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
predicate: Predicate, expNumPartitions: Int, expNumFiles: Long): Unit = {
val snapshot = latestSnapshot(tableDir.getCanonicalPath)
val scanFiles = collectScanFileRows(
snapshot.getScanBuilder().withFilter(defaultEngine, predicate).build())
snapshot.getScanBuilder().withFilter(predicate).build())
assert(scanFiles.length == expNumFiles,
s"Expected $expNumFiles but found ${scanFiles.length} for $predicate")

Expand Down Expand Up @@ -1496,7 +1492,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
val partFilter = equals(new Column("part"), ofInt(1))
verifyNoStatsColumn(
snapshot(engineDisallowedStatsReads)
.getScanBuilder().withFilter(engine, partFilter).build()
.getScanBuilder().withFilter(partFilter).build()
.getScanFiles(engine))

// no eligible data skipping filter --> don't read stats
Expand All @@ -1505,7 +1501,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
ofInt(1))
verifyNoStatsColumn(
snapshot(engineDisallowedStatsReads)
.getScanBuilder().withFilter(engine, nonEligibleFilter).build()
.getScanBuilder().withFilter(nonEligibleFilter).build()
.getScanFiles(engine))
}

Expand Down Expand Up @@ -1543,9 +1539,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
val engine = engineVerifyJsonParseSchema(verifySchema(expectedCols))
collectScanFileRows(
Table.forPath(engine, path).getLatestSnapshot(engine)
.getScanBuilder()
.withFilter(engine, predicate)
.build(),
.getScanBuilder().withFilter(predicate).build(),
engine = engine)
}
}
Expand Down Expand Up @@ -1573,7 +1567,6 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
latestSnapshot(tempDir.getCanonicalPath)
.getScanBuilder()
.withFilter(
defaultEngine,
greaterThan(
new ScalarExpression("+", Seq(col("id"), ofInt(10)).asJava),
ofInt(100)
Expand All @@ -1584,13 +1577,8 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
checkStatsPresent(
latestSnapshot(tempDir.getCanonicalPath)
.getScanBuilder()
.withFilter(
defaultEngine,
greaterThan(
col("id"),
ofInt(0)
)
).build()
.withFilter(greaterThan(col("id"), ofInt(0)))
.build()
)
}
}
Expand Down Expand Up @@ -1634,7 +1622,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with

val scanBuilder = snapshot.getScanBuilder()
val scan = predicate match {
case Some(pred) => scanBuilder.withFilter(defaultEngine, pred).build()
case Some(pred) => scanBuilder.withFilter(pred).build()
case None => scanBuilder.build()
}
val scanFiles = scan.asInstanceOf[ScanImpl].getScanFiles(defaultEngine, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ trait TestUtils extends Assertions with SQLHelper {
var scanBuilder = snapshot.getScanBuilder()

if (readSchema != null) {
scanBuilder = scanBuilder.withReadSchema(engine, readSchema)
scanBuilder = scanBuilder.withReadSchema(readSchema)
}

if (filter != null) {
scanBuilder = scanBuilder.withFilter(engine, filter)
scanBuilder = scanBuilder.withFilter(filter)
}

val scan = scanBuilder.build()
Expand Down Expand Up @@ -265,7 +265,7 @@ trait TestUtils extends Assertions with SQLHelper {
val scan = Table.forPath(engine, tablePath)
.getLatestSnapshot(engine)
.getScanBuilder()
.withReadSchema(engine, readSchema)
.withReadSchema(readSchema)
.build()
val scanState = scan.getScanState(engine)

Expand Down
Loading