Skip to content

Commit

Permalink
[MINOR] Reorder HoodieTimeline#compareTimestamp arguments for better …
Browse files Browse the repository at this point in the history
…readability

 - reads nicely as (instantTime1, GREATER_OR_EQUAL, instantTime2) etc
  • Loading branch information
vinothchandar committed Apr 30, 2020
1 parent 9059bce commit 0af43bb
Show file tree
Hide file tree
Showing 26 changed files with 64 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public String compareCommits(@CliOption(key = {"path"}, help = "Path of the tabl
sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp();

if (sourceLatestCommit != null
&& HoodieTimeline.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) {
&& HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) {
// source is behind the target
List<String> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,9 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m
if (!maxInstant.isEmpty()) {
final BiPredicate<String, String> predicate;
if (includeMaxInstant) {
predicate = HoodieTimeline.GREATER_OR_EQUAL;
predicate = HoodieTimeline.GREATER_THAN_OR_EQUALS;
} else {
predicate = HoodieTimeline.GREATER;
predicate = HoodieTimeline.GREATER_THAN;
}
instantsStream = instantsStream.filter(is -> predicate.test(maxInstant, is.getTimestamp()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public String validateSync(
sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp();

if (sourceLatestCommit != null
&& HoodieTimeline.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) {
&& HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) {
// source is behind the target
return getString(target, targetTimeline, source, sourceCount, targetCount, sourceLatestCommit);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ public boolean rollback(final String commitInstantTime) throws HoodieRollbackExc
try {
HoodieTable<T> table = HoodieTable.create(config, jsc);
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
.filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitInstantTime))
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
if (commitInstantOpt.isPresent()) {
HoodieRollbackMetadata rollbackMetadata = table.rollback(jsc, rollbackInstantTime, commitInstantOpt.get(), true);
Expand Down Expand Up @@ -537,7 +537,7 @@ private void startCommit(String instantTime) {
// if there are pending compactions, their instantTime must not be greater than that of this instant time
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending ->
ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER),
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
+ latestPending + ", Ingesting at " + instantTime));
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
// 2) is less than the first commit ts in the timeline
return !commitTimeline.empty()
&& (commitTimeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs))
|| HoodieTimeline.compareTimestamps(commitTimeline.firstInstant().get().getTimestamp(), commitTs,
HoodieTimeline.GREATER));
|| HoodieTimeline.compareTimestamps(commitTimeline.firstInstant().get().getTimestamp(), HoodieTimeline.GREATER_THAN, commitTs
));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ private Stream<HoodieInstant> getInstantsToArchive(JavaSparkContext jsc) {
instants = Stream.concat(instants, commitTimeline.getInstants().filter(s -> {
// if no savepoint present, then dont filter
return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(),
s.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
HoodieTimeline.LESSER_THAN_OR_EQUALS, s.getTimestamp()));
}).filter(s -> {
// Ensure commits >= oldest pending compaction commit is retained
return oldestPendingCompactionInstant.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), s.getTimestamp(), HoodieTimeline.GREATER)).orElse(true);
return oldestPendingCompactionInstant.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, s.getTimestamp())).orElse(true);
}).limit(commitTimeline.countInstants() - minCommitsToKeep));
}

Expand Down Expand Up @@ -243,7 +243,7 @@ private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thre

List<HoodieInstant> instantsToBeDeleted =
instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(),
thresholdInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)).collect(Collectors.toList());
HoodieTimeline.LESSER_THAN_OR_EQUALS, thresholdInstant.getTimestamp())).collect(Collectors.toList());

for (HoodieInstant deleteInstant : instantsToBeDeleted) {
LOG.info("Deleting instant " + deleteInstant + " in auxiliary meta path " + metaClient.getMetaAuxiliaryPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToR
LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> {
return hoodieTable.getCompletedCommitsTimeline().getInstants()
.filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, cleanMetadata.getEarliestCommitToRetain())
&& HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())
).flatMap(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
Expand Down Expand Up @@ -252,7 +254,7 @@ private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) {

// Always keep the last commit
if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
.compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) {
.compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
// this is a commit, that should be cleaned.
aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName()));
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
Expand All @@ -273,7 +275,7 @@ private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) {
private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, HoodieInstant instantTime) {
for (FileSlice file : fileSliceList) {
String fileCommitTime = file.getBaseInstantTime();
if (HoodieTimeline.compareTimestamps(instantTime.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) {
if (HoodieTimeline.compareTimestamps(instantTime.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
// fileList is sorted on the reverse, so the first commit we find <= instantTime is the
// one we want
return fileCommitTime;
Expand Down Expand Up @@ -324,8 +326,8 @@ private boolean isFileSliceNeededForPendingCompaction(FileSlice fileSlice) {
CompactionOperation op = fgIdToPendingCompactionOperations.get(fileSlice.getFileGroupId());
if (null != op) {
// If file slice's instant time is newer or same as that of operation, do not clean
return HoodieTimeline.compareTimestamps(fileSlice.getBaseInstantTime(), op.getBaseInstantTime(),
HoodieTimeline.GREATER_OR_EQUAL);
return HoodieTimeline.compareTimestamps(fileSlice.getBaseInstantTime(), HoodieTimeline.GREATER_THAN_OR_EQUALS, op.getBaseInstantTime()
);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ public Option<HoodieCompactionPlan> execute() {
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER),
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime));

// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getCommitsAndCompactionTimeline().getInstants()
.filter(instant -> HoodieTimeline.compareTimestamps(
instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL))
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public HoodieRestoreMetadata execute() {
// Get all the commits on the timeline after the provided commit time
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline()
.getReverseOrderedInstants()
.filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), restoreInstantTime))
.filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
.collect(Collectors.toList());

Map<String, List<HoodieRollbackMetadata>> instantToMetadata = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,13 @@ private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitio
// For sanity, log instant time can never be less than base-commit on which we are rolling back
ValidationUtils
.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()),
rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()));
}

return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
// Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option
// to delete and we should not step on it
wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER);
wStat.getFileId()), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
}).map(wStat -> {
String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public HoodieSavepointMetadata execute() {
}

// Cannot allow savepoint time on a commit that could have been cleaned
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained),
"Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);

Map<String, List<String>> latestFilesMap = jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDel
if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
assertTrue(HoodieTimeline.compareTimestamps(
fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()),
fileIdWithCommitTime.getValue(), HoodieTimeline.GREATER),
HoodieTimeline.GREATER_THAN, fileIdWithCommitTime.getValue()),
"Deleted instant time must be less than pending compaction");
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testSimpleInsertAndUpdate() throws Exception {
assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(),
"Expecting a single commit.");
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
assertTrue(HoodieTimeline.compareTimestamps("000", latestCompactionCommitTime, HoodieTimeline.LESSER));
assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime));

assertEquals(200, HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
"Must contain 200 records");
Expand Down Expand Up @@ -877,7 +877,7 @@ public void testLogFileCountsAfterCompaction() throws Exception {
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();

assertTrue(HoodieTimeline
.compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime, HoodieTimeline.GREATER),
.compareTimestamps(timeline.lastInstant().get().getTimestamp(), HoodieTimeline.GREATER_THAN, newCommitTime),
"Compaction commit should be > than last insert");

for (String partitionPath : dataGen.getPartitionPaths()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public HoodieFileGroupId getFileGroupId() {
private boolean isFileSliceCommitted(FileSlice slice) {
String maxCommitTime = lastInstant.get().getTimestamp();
return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime())
&& HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), maxCommitTime, HoodieTimeline.LESSER_OR_EQUAL);
&& HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime);

}

Expand Down Expand Up @@ -164,7 +164,7 @@ public Option<HoodieBaseFile> getLatestDataFile() {
*/
public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime) {
return Option.fromJavaOptional(getAllFileSlices().filter(slice -> HoodieTimeline
.compareTimestamps(slice.getBaseInstantTime(), maxInstantTime, HoodieTimeline.LESSER_OR_EQUAL)).findFirst());
.compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxInstantTime)).findFirst());
}

/**
Expand All @@ -175,7 +175,7 @@ public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime) {
*/
public Option<FileSlice> getLatestFileSliceBefore(String maxInstantTime) {
return Option.fromJavaOptional(getAllFileSlices().filter(
slice -> HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), maxInstantTime, HoodieTimeline.LESSER))
slice -> HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, maxInstantTime))
.findFirst());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public void scan() {
HoodieLogBlock r = logFormatReaderWrapper.next();
totalLogBlocks.incrementAndGet();
if (r.getBlockType() != CORRUPT_BLOCK
&& !HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME), this.latestInstantTime,
HoodieTimeline.LESSER_OR_EQUAL)) {
&& !HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
)) {
// hit a block with instant time greater than should be processed, stop processing further
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static String createNewInstantTime() {
String newCommitTime;
do {
newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date());
} while (HoodieTimeline.compareTimestamps(newCommitTime, oldVal, LESSER_OR_EQUAL));
} while (HoodieTimeline.compareTimestamps(newCommitTime, LESSER_THAN_OR_EQUALS, oldVal));
return newCommitTime;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ public HoodieDefaultTimeline findInstantsInRange(String startTs, String endTs) {
@Override
public HoodieDefaultTimeline findInstantsAfter(String instantTime, int numCommits) {
return new HoodieDefaultTimeline(instants.stream()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), instantTime, GREATER)).limit(numCommits),
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)).limit(numCommits),
details);
}

@Override
public HoodieDefaultTimeline findInstantsBefore(String instantTime) {
return new HoodieDefaultTimeline(instants.stream()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), instantTime, LESSER)),
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantTime)),
details);
}

Expand Down Expand Up @@ -288,7 +288,7 @@ public Stream<HoodieInstant> getReverseOrderedInstants() {
public boolean isBeforeTimelineStarts(String instant) {
Option<HoodieInstant> firstCommit = firstInstant();
return firstCommit.isPresent()
&& HoodieTimeline.compareTimestamps(instant, firstCommit.get().getTimestamp(), LESSER);
&& HoodieTimeline.compareTimestamps(instant, LESSER_THAN, firstCommit.get().getTimestamp());
}

@Override
Expand Down
Loading

0 comments on commit 0af43bb

Please sign in to comment.