Skip to content

Commit f32539d

Browse files
authored
[Kernel] Add better param validations to LogSegment constructor (#4105)
#### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Add better validations to LogSegment ## How was this patch tested? New UTs. ## Does this PR introduce _any_ user-facing changes? No.
1 parent 6ba5c7f commit f32539d

File tree

3 files changed

+178
-6
lines changed

3 files changed

+178
-6
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/LogSegment.java

+61-5
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,13 @@
1616

1717
package io.delta.kernel.internal.snapshot;
1818

19+
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
20+
import static java.util.Objects.requireNonNull;
21+
1922
import io.delta.kernel.internal.fs.Path;
2023
import io.delta.kernel.internal.lang.Lazy;
24+
import io.delta.kernel.internal.lang.ListUtils;
25+
import io.delta.kernel.internal.util.FileNames;
2126
import io.delta.kernel.utils.FileStatus;
2227
import java.util.Collections;
2328
import java.util.Comparator;
@@ -38,8 +43,7 @@ public class LogSegment {
3843
private final Lazy<List<FileStatus>> allFilesReversed;
3944

4045
public static LogSegment empty(Path logPath) {
41-
return new LogSegment(
42-
logPath, -1, Collections.emptyList(), Collections.emptyList(), Optional.empty(), -1);
46+
return new LogSegment(logPath, -1, Collections.emptyList(), Collections.emptyList(), -1);
4347
}
4448

4549
/**
@@ -50,7 +54,6 @@ public static LogSegment empty(Path logPath) {
5054
* @param version The Snapshot version to generate
5155
* @param deltas The delta commit files (.json) to read
5256
* @param checkpoints The checkpoint file(s) to read
53-
* @param checkpointVersionOpt The checkpoint version used to start replay
5457
* @param lastCommitTimestamp The "unadjusted" timestamp of the last commit within this segment.
5558
* By unadjusted, we mean that the commit timestamps may not necessarily be monotonically
5659
* increasing for the commits within this segment.
@@ -60,13 +63,66 @@ public LogSegment(
6063
long version,
6164
List<FileStatus> deltas,
6265
List<FileStatus> checkpoints,
63-
Optional<Long> checkpointVersionOpt,
6466
long lastCommitTimestamp) {
67+
68+
///////////////////////
69+
// Input validations //
70+
///////////////////////
71+
72+
requireNonNull(logPath, "logPath is null");
73+
requireNonNull(deltas, "deltas is null");
74+
requireNonNull(checkpoints, "checkpoints is null");
75+
checkArgument(
76+
deltas.stream().allMatch(fs -> FileNames.isCommitFile(fs.getPath())),
77+
"deltas must all be actual delta (commit) files");
78+
checkArgument(
79+
checkpoints.stream().allMatch(fs -> FileNames.isCheckpointFile(fs.getPath())),
80+
"checkpoints must all be actual checkpoint files");
81+
82+
this.checkpointVersionOpt =
83+
checkpoints.isEmpty()
84+
? Optional.empty()
85+
: Optional.of(FileNames.checkpointVersion(new Path(checkpoints.get(0).getPath())));
86+
87+
checkArgument(
88+
checkpoints.stream()
89+
.map(fs -> FileNames.checkpointVersion(new Path(fs.getPath())))
90+
.allMatch(v -> checkpointVersionOpt.get().equals(v)),
91+
"All checkpoint files must have the same version");
92+
93+
if (version != -1) {
94+
checkArgument(!deltas.isEmpty() || !checkpoints.isEmpty(), "No files to read");
95+
96+
if (!deltas.isEmpty()) {
97+
this.checkpointVersionOpt.ifPresent(
98+
checkpointVersion -> {
99+
checkArgument(
100+
FileNames.deltaVersion(deltas.get(0).getPath()) == checkpointVersion + 1,
101+
"First delta file version must equal checkpointVersion + 1");
102+
});
103+
104+
checkArgument(
105+
FileNames.deltaVersion(ListUtils.getLast(deltas).getPath()) == version,
106+
"Last delta file version must equal the version of this LogSegment");
107+
} else {
108+
this.checkpointVersionOpt.ifPresent(
109+
checkpointVersion -> {
110+
checkArgument(
111+
checkpointVersion == version,
112+
"If there are no deltas, then checkpointVersion must equal the version "
113+
+ "of this LogSegment");
114+
});
115+
}
116+
}
117+
118+
////////////////////////////////
119+
// Member variable assignment //
120+
////////////////////////////////
121+
65122
this.logPath = logPath;
66123
this.version = version;
67124
this.deltas = deltas;
68125
this.checkpoints = checkpoints;
69-
this.checkpointVersionOpt = checkpointVersionOpt;
70126
this.lastCommitTimestamp = lastCommitTimestamp;
71127

72128
this.allFiles =

kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java

-1
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,6 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionT
562562
newVersion,
563563
deltasAfterCheckpoint,
564564
latestCompleteCheckpointFileStatuses,
565-
latestCompleteCheckpointOpt.map(x -> x.version),
566565
lastCommitTimestamp);
567566
}
568567

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.internal.snapshot
18+
19+
import java.util.Arrays
20+
import java.util.{Collections, Optional}
21+
22+
import io.delta.kernel.internal.fs.Path
23+
import io.delta.kernel.internal.util.FileNames
24+
import io.delta.kernel.utils.FileStatus
25+
import org.scalatest.funsuite.AnyFunSuite
26+
27+
class LogSegmentSuite extends AnyFunSuite {
28+
private val logPath = new Path("/a/_delta_log")
29+
private val checkpointFs10 =
30+
FileStatus.of(FileNames.checkpointFileSingular(logPath, 10).toString, 1, 1)
31+
private val checkpointFs10List = Collections.singletonList(checkpointFs10)
32+
private val deltaFs11 = FileStatus.of(FileNames.deltaFile(logPath, 11), 1, 1)
33+
private val deltaFs11List = Collections.singletonList(deltaFs11)
34+
private val deltaFs12 = FileStatus.of(FileNames.deltaFile(logPath, 12), 1, 1)
35+
private val deltaFs12List = Collections.singletonList(deltaFs12)
36+
private val deltasFs11To12List = Arrays.asList(deltaFs11, deltaFs12)
37+
private val badJsonsList = Collections.singletonList(
38+
FileStatus.of(s"${logPath.toString}/gibberish.json", 1, 1))
39+
private val badCheckpointsList = Collections.singletonList(
40+
FileStatus.of(s"${logPath.toString}/gibberish.checkpoint.parquet", 1, 1))
41+
42+
test("constructor -- valid case (empty)") {
43+
LogSegment.empty(new Path("/a/_delta_log"))
44+
}
45+
46+
test("constructor -- valid case (non-empty)") {
47+
val logPath = new Path("/a/_delta_log")
48+
new LogSegment(logPath, 12, deltasFs11To12List, checkpointFs10List, 1)
49+
}
50+
51+
test("constructor -- null arguments => throw") {
52+
// logPath is null
53+
intercept[NullPointerException] {
54+
new LogSegment(
55+
null, 1, Collections.emptyList(), Collections.emptyList(), -1)
56+
}
57+
// deltas is null
58+
intercept[NullPointerException] {
59+
new LogSegment(
60+
new Path("/a/_delta_log"), 1, null, Collections.emptyList(), -1)
61+
}
62+
// checkpoints is null
63+
intercept[NullPointerException] {
64+
new LogSegment(
65+
new Path("/a/_delta_log"), 1, Collections.emptyList(), null, -1)
66+
}
67+
}
68+
69+
test("constructor -- all deltas must be actual delta files") {
70+
val exMsg = intercept[IllegalArgumentException] {
71+
new LogSegment(
72+
logPath, 12, badJsonsList, checkpointFs10List, 1)
73+
}.getMessage
74+
assert(exMsg === "deltas must all be actual delta (commit) files")
75+
}
76+
77+
test("constructor -- all checkpoints must be actual checkpoint files") {
78+
val exMsg = intercept[IllegalArgumentException] {
79+
new LogSegment(
80+
logPath, 12, deltasFs11To12List, badCheckpointsList, 1)
81+
}.getMessage
82+
assert(exMsg === "checkpoints must all be actual checkpoint files")
83+
}
84+
85+
test("constructor -- if version >= 0 then both deltas and checkpoints cannot be empty") {
86+
val exMsg = intercept[IllegalArgumentException] {
87+
new LogSegment(
88+
logPath, 12, Collections.emptyList(), Collections.emptyList(), 1)
89+
}.getMessage
90+
assert(exMsg === "No files to read")
91+
}
92+
93+
test("constructor -- if deltas non-empty then first delta must equal checkpointVersion + 1") {
94+
val exMsg = intercept[IllegalArgumentException] {
95+
new LogSegment(
96+
logPath, 12, deltaFs12List, checkpointFs10List, 1)
97+
}.getMessage
98+
assert(exMsg === "First delta file version must equal checkpointVersion + 1")
99+
}
100+
101+
test("constructor -- if deltas non-empty then last delta must equal version") {
102+
val exMsg = intercept[IllegalArgumentException] {
103+
new LogSegment(
104+
logPath, 12, deltaFs11List, checkpointFs10List, 1)
105+
}.getMessage
106+
assert(exMsg === "Last delta file version must equal the version of this LogSegment")
107+
}
108+
109+
test("constructor -- if no deltas then checkpointVersion must equal version") {
110+
val exMsg = intercept[IllegalArgumentException] {
111+
new LogSegment(
112+
logPath, 11, Collections.emptyList(), checkpointFs10List, 1)
113+
}.getMessage
114+
assert(exMsg ===
115+
"If there are no deltas, then checkpointVersion must equal the version of this LogSegment")
116+
}
117+
}

0 commit comments

Comments
 (0)