diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java index 5838a9de4ba..6bc4bc353f4 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java @@ -70,9 +70,10 @@ public class ReplicationLogTracker { protected final String haGroupName; protected MetricsReplicationLogTracker metrics; - public ReplicationLogTracker(final Configuration conf, final String haGroupName, final FileSystem fileSystem, - final ReplicationShardDirectoryManager replicationShardDirectoryManager, - final MetricsReplicationLogTracker metrics) { + public ReplicationLogTracker(final Configuration conf, final String haGroupName, + final FileSystem fileSystem, + final ReplicationShardDirectoryManager replicationShardDirectoryManager, + final MetricsReplicationLogTracker metrics) { this.conf = conf; this.haGroupName = haGroupName; this.fileSystem = fileSystem; @@ -98,8 +99,9 @@ protected String getInProgressLogSubDirectoryName() { * exist. */ public void init() throws IOException { - this.inProgressDirPath = new Path(getReplicationShardDirectoryManager().getRootDirectoryPath().getParent(), - getInProgressLogSubDirectoryName()); + this.inProgressDirPath = new Path( + getReplicationShardDirectoryManager().getRootDirectoryPath().getParent(), + getInProgressLogSubDirectoryName()); createDirectoryIfNotExists(inProgressDirPath); } @@ -151,6 +153,35 @@ public List getNewFilesForRound(ReplicationRound replicationRound) throws return filesInRound; } + /** + * Retrieves new replication log files that belong to replication rounds from startRound to + * endRound (inclusive). Iterates through all rounds in the range and collects valid log files + * from each round's shard directory. + * + * @param startRound - The starting replication round (inclusive) + * @param endRound - The ending replication round (inclusive) + * @return List of valid log file paths from startRound to endRound, empty list if + * startRound > endRound + * @throws IOException if there's an error accessing the file system + */ + public List getNewFiles(ReplicationRound startRound, ReplicationRound endRound) + throws IOException { + List files = new ArrayList<>(); + // Early return if startRound is after endRound (invalid range) + if (startRound.getStartTime() > endRound.getStartTime()) { + return files; + } + // Iterate through all rounds from startRound to endRound (exclusive of endRound) + ReplicationRound firstRound = startRound; + while (!firstRound.equals(endRound)) { + files.addAll(getNewFilesForRound(firstRound)); + firstRound = replicationShardDirectoryManager.getNextRound(firstRound); + } + // Add the files for the endRound (inclusive) + files.addAll(getNewFilesForRound(endRound)); + return files; + } + /** * Retrieves all valid log files currently in the in-progress directory. * @return List of valid log file paths in the in-progress directory, empty list if directory diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java index faf078fa7ff..3911785a4dd 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java @@ -32,6 +32,7 @@ import org.apache.phoenix.replication.ReplicationLogDiscovery; import org.apache.phoenix.replication.ReplicationLogTracker; import org.apache.phoenix.replication.ReplicationRound; +import org.apache.phoenix.replication.ReplicationShardDirectoryManager; import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery; import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl; import org.slf4j.Logger; @@ -137,23 +138,28 @@ public void init() throws IOException { LOG.info("Initializing ReplicationLogDiscoveryReplay for haGroup: {}", haGroupName); - HAGroupStateListener degradedListener = (groupName, fromState, toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> { - if (clusterType == ClusterType.LOCAL && HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(toState)) { + HAGroupStateListener degradedListener = (groupName, fromState, toState, + modifiedTime, clusterType, lastSyncStateTimeInMs) -> { + if (clusterType == ClusterType.LOCAL + && HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(toState)) { replicationReplayState.set(ReplicationReplayState.DEGRADED); LOG.info("Cluster degraded detected for {}. replicationReplayState={}", haGroupName, ReplicationReplayState.DEGRADED); } }; - HAGroupStateListener recoveryListener = (groupName, fromState, toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> { - if (clusterType == ClusterType.LOCAL && HAGroupStoreRecord.HAGroupState.STANDBY.equals(toState)) { + HAGroupStateListener recoveryListener = (groupName, fromState, toState, + modifiedTime, clusterType, lastSyncStateTimeInMs) -> { + if (clusterType == ClusterType.LOCAL + && HAGroupStoreRecord.HAGroupState.STANDBY.equals(toState)) { replicationReplayState.set(ReplicationReplayState.SYNCED_RECOVERY); LOG.info("Cluster recovered detected for {}. replicationReplayState={}", haGroupName, getReplicationReplayState()); } }; - HAGroupStateListener triggerFailoverListner = (groupName, fromState, toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> { + HAGroupStateListener triggerFailoverListner = (groupName, fromState, toState, + modifiedTime, clusterType, lastSyncStateTimeInMs) -> { if (clusterType == ClusterType.LOCAL && HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE.equals(toState)) { failoverPending.set(true); @@ -163,7 +169,8 @@ public void init() throws IOException { } }; - HAGroupStateListener abortFailoverListner = (groupName, fromState, toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> { + HAGroupStateListener abortFailoverListner = (groupName, fromState, toState, + modifiedTime, clusterType, lastSyncStateTimeInMs) -> { if (clusterType == ClusterType.LOCAL && HAGroupStoreRecord.HAGroupState.ABORT_TO_STANDBY.equals(toState)) { failoverPending.set(false); @@ -228,7 +235,8 @@ protected void initializeLastRoundProcessed() throws IOException { HAGroupStoreRecord haGroupStoreRecord = getHAGroupRecord(); LOG.info("Found HA Group state during initialization as {} for haGroup: {}", haGroupStoreRecord.getHAGroupState(), haGroupName); - if (HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(haGroupStoreRecord.getHAGroupState())) { + if (HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY + .equals(haGroupStoreRecord.getHAGroupState())) { replicationReplayState.compareAndSet(ReplicationReplayState.NOT_INITIALIZED, ReplicationReplayState.DEGRADED); long minimumTimestampFromFiles = EnvironmentEdgeManager.currentTime(); @@ -474,19 +482,51 @@ protected HAGroupStoreRecord getHAGroupRecord() throws IOException { * Failover is safe to trigger when all of the following conditions are met: * 1. A failover has been requested (failoverPending is true) * 2. No files are currently in the in-progress directory - * 3. No new files exist for ongoing round + * 3. No new files exist from the next round to process up to the current timestamp round * - * These conditions ensure all replication logs have been processed before transitioning - * the cluster from STANDBY to ACTIVE state. + * The third condition checks for new files in the range from nextRoundToProcess (derived from + * getLastRoundProcessed()) to currentTimestampRound (derived from current time). This ensures + * all replication logs up to the current time have been processed before transitioning + * the cluster + * from STANDBY to ACTIVE state. * * @return true if all conditions are met and failover should be triggered, false otherwise * @throws IOException if there's an error checking file status */ protected boolean shouldTriggerFailover() throws IOException { - return failoverPending.get() && replicationLogTracker.getInProgressFiles().isEmpty() - && replicationLogTracker.getNewFilesForRound(replicationLogTracker - .getReplicationShardDirectoryManager() - .getNextRound(getLastRoundProcessed())).isEmpty(); + LOG.debug("Checking if failover should be triggered. failoverPending={}", failoverPending); + // Check if failover has been requested + if (!failoverPending.get()) { + LOG.debug("Failover not triggered. failoverPending is false."); + return false; + } + // Check if in-progress directory is empty + boolean isInProgressDirectoryEmpty = replicationLogTracker.getInProgressFiles().isEmpty(); + if (!isInProgressDirectoryEmpty) { + LOG.debug("Failover not triggered. In progress directory is not empty."); + return false; + } + // Check if there are any new files from next round to current timestamp round + ReplicationShardDirectoryManager replicationShardDirectoryManager = + replicationLogTracker.getReplicationShardDirectoryManager(); + ReplicationRound nextRoundToProcess = + replicationShardDirectoryManager.getNextRound(getLastRoundProcessed()); + ReplicationRound currentTimestampRound = + replicationShardDirectoryManager.getReplicationRoundFromStartTime( + EnvironmentEdgeManager.currentTime()); + LOG.debug("Checking the new files from next round {} to current timestamp round {}.", + nextRoundToProcess, currentTimestampRound); + boolean isInDirectoryEmpty = replicationLogTracker.getNewFiles(nextRoundToProcess, + currentTimestampRound).isEmpty(); + + if (!isInDirectoryEmpty) { + LOG.debug("Failover not triggered. New files exist from next round to current " + + "timestamp round."); + return false; + } + + LOG.info("Failover can be triggered."); + return true; } protected void triggerFailover() { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java index ca1171d76f6..7f13dad67e0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java @@ -1650,7 +1650,7 @@ public Path getInProgressDirPath() { /** * Tests the shouldTriggerFailover method with various combinations of failoverPending, - * in-progress files, and new files for next round. + * in-progress files, and new files from next round to current timestamp round. */ @Test public void testShouldTriggerFailover() throws IOException { @@ -1668,12 +1668,14 @@ public void testShouldTriggerFailover() throws IOException { try { // Create test rounds ReplicationRound testRound = new ReplicationRound(1704153600000L, 1704153660000L); - ReplicationRound nextRound = tracker.getReplicationShardDirectoryManager().getNextRound(testRound); + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + ReplicationRound nextRoundToProcess = shardManager.getNextRound(testRound); + ReplicationRound currentTimestampRound = shardManager.getReplicationRoundFromStartTime(currentTime); // Test Case 1: All conditions true - should return true { when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList()); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList()); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.emptyList()); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); discovery.setLastRoundProcessed(testRound); @@ -1686,7 +1688,7 @@ public void testShouldTriggerFailover() throws IOException { // Test Case 2: failoverPending is false - should return false { when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList()); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList()); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.emptyList()); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); discovery.setLastRoundProcessed(testRound); @@ -1699,7 +1701,7 @@ public void testShouldTriggerFailover() throws IOException { // Test Case 3: in-progress files not empty - should return false { when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new Path("test.plog"))); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList()); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.emptyList()); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); discovery.setLastRoundProcessed(testRound); @@ -1709,23 +1711,23 @@ public void testShouldTriggerFailover() throws IOException { discovery.shouldTriggerFailover()); } - // Test Case 4: new files exist for next round - should return false + // Test Case 4: new files exist from next round to current timestamp round - should return false { when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList()); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new Path("test.plog"))); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.singletonList(new Path("test.plog"))); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); discovery.setLastRoundProcessed(testRound); discovery.setFailoverPending(true); - assertFalse("Should not trigger failover when new files exist for next round", + assertFalse("Should not trigger failover when new files exist from next round to current timestamp round", discovery.shouldTriggerFailover()); } // Test Case 5: failoverPending false AND in-progress files not empty - should return false { when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new Path("test.plog"))); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList()); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.emptyList()); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); discovery.setLastRoundProcessed(testRound); @@ -1738,7 +1740,7 @@ public void testShouldTriggerFailover() throws IOException { // Test Case 6: failoverPending false AND new files exist - should return false { when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList()); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new Path("test.plog"))); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.singletonList(new Path("test.plog"))); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); discovery.setLastRoundProcessed(testRound); @@ -1751,7 +1753,7 @@ public void testShouldTriggerFailover() throws IOException { // Test Case 7: in-progress files not empty AND new files exist - should return false { when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new Path("test1.plog"))); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new Path("test2.plog"))); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.singletonList(new Path("test2.plog"))); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); discovery.setLastRoundProcessed(testRound); @@ -1764,7 +1766,7 @@ public void testShouldTriggerFailover() throws IOException { // Test Case 8: All conditions false - should return false { when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new Path("test.plog"))); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new Path("test2.plog"))); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.singletonList(new Path("test2.plog"))); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); discovery.setLastRoundProcessed(testRound); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java index f332ad7a8f3..784d1cd7ef2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java @@ -1283,6 +1283,283 @@ public void testGetOlderInProgressFilesWithMixedFileTypes() throws IOException { assertFalse("Should not contain invalidFile3", resultFilenames.contains(invalidFile3.getName())); } + @Test + public void testGetNewFilesWithStartAndEndRound() throws IOException { + // Initialize tracker + tracker.init(); + + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 1000L; + + // Create start round (60 seconds duration) + long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00 + long startRoundEndTime = startRoundStartTime + roundDurationMs; + ReplicationRound startRound = new ReplicationRound(startRoundStartTime, startRoundEndTime); + + // Create middle round + long middleRoundStartTime = startRoundEndTime; + long middleRoundEndTime = middleRoundStartTime + roundDurationMs; + ReplicationRound middleRound = new ReplicationRound(middleRoundStartTime, middleRoundEndTime); + + // Create end round + long endRoundStartTime = middleRoundEndTime; + long endRoundEndTime = endRoundStartTime + roundDurationMs; + ReplicationRound endRound = new ReplicationRound(endRoundStartTime, endRoundEndTime); + + // Get shard directories for each round + Path startRoundShardDir = shardManager.getShardDirectory(startRound); + Path middleRoundShardDir = shardManager.getShardDirectory(middleRound); + Path endRoundShardDir = shardManager.getShardDirectory(endRound); + + // Create shard directories + localFs.mkdirs(startRoundShardDir); + localFs.mkdirs(middleRoundShardDir); + localFs.mkdirs(endRoundShardDir); + + // Create files in start round + Path startRoundFile1 = new Path(startRoundShardDir, startRoundStartTime + "_rs1.plog"); + Path startRoundFile2 = new Path(startRoundShardDir, startRoundStartTime + 30000 + "_rs2.plog"); + + // Create files in middle round + Path middleRoundFile1 = new Path(middleRoundShardDir, middleRoundStartTime + "_rs3.plog"); + Path middleRoundFile2 = new Path(middleRoundShardDir, middleRoundStartTime + 30000 + "_rs4.plog"); + + // Create files in end round + Path endRoundFile1 = new Path(endRoundShardDir, endRoundStartTime + "_rs5.plog"); + Path endRoundFile2 = new Path(endRoundShardDir, endRoundStartTime + 30000 + "_rs6.plog"); + + // Create files outside the range (before start round) + ReplicationRound beforeStartRound = shardManager.getPreviousRound(startRound); + Path beforeStartRoundShardDir = shardManager.getShardDirectory(beforeStartRound); + localFs.mkdirs(beforeStartRoundShardDir); + Path beforeStartRoundFile = new Path(beforeStartRoundShardDir, beforeStartRound.getStartTime() + "_rs0.plog"); + + // Create files outside the range (after end round) + ReplicationRound afterEndRound = shardManager.getNextRound(endRound); + Path afterEndRoundShardDir = shardManager.getShardDirectory(afterEndRound); + localFs.mkdirs(afterEndRoundShardDir); + Path afterEndRoundFile = new Path(afterEndRoundShardDir, afterEndRound.getStartTime() + "_rs7.plog"); + + // Create all files + localFs.create(startRoundFile1, true).close(); + localFs.create(startRoundFile2, true).close(); + localFs.create(middleRoundFile1, true).close(); + localFs.create(middleRoundFile2, true).close(); + localFs.create(endRoundFile1, true).close(); + localFs.create(endRoundFile2, true).close(); + localFs.create(beforeStartRoundFile, true).close(); + localFs.create(afterEndRoundFile, true).close(); + + // Call getNewFiles with startRound and endRound + List result = tracker.getNewFiles(startRound, endRound); + + // Verify file system operations + // Should call getNewFilesForRound for each round (start, middle, end) + // Each call to getNewFilesForRound calls exists() and listStatus() on shard directories + // Note: init() already called exists() on in-progress directory + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir)); + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(middleRoundShardDir)); + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir)); + Mockito.verify(mockFs, times(3)).listStatus(Mockito.any(Path.class)); + + // Prepare expected set of file paths (should include files from start, middle, and end rounds) + Set expectedPaths = new HashSet<>(); + expectedPaths.add(startRoundFile1.toString()); + expectedPaths.add(startRoundFile2.toString()); + expectedPaths.add(middleRoundFile1.toString()); + expectedPaths.add(middleRoundFile2.toString()); + expectedPaths.add(endRoundFile1.toString()); + expectedPaths.add(endRoundFile2.toString()); + + // Create actual set of paths + Set actualPaths = result.stream().map(path -> path.toUri().getPath()).collect(Collectors.toSet()); + + // Verify all files from start to end rounds are returned + assertEquals("Should return exactly 6 files from start to end rounds", expectedPaths.size(), actualPaths.size()); + assertEquals("File paths do not match", expectedPaths, actualPaths); + + // Verify files outside the range are not included + assertFalse("Should not contain file from before start round", actualPaths.contains(beforeStartRoundFile.toString())); + assertFalse("Should not contain file from after end round", actualPaths.contains(afterEndRoundFile.toString())); + } + + @Test + public void testGetNewFilesWithSameStartAndEndRound() throws IOException { + // Initialize tracker + tracker.init(); + + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 1000L; + + // Create a single round + long roundStartTime = 1704153600000L; // 2024-01-02 00:00:00 + long roundEndTime = roundStartTime + roundDurationMs; + ReplicationRound round = new ReplicationRound(roundStartTime, roundEndTime); + + // Get shard directory for this round + Path roundShardDir = shardManager.getShardDirectory(round); + localFs.mkdirs(roundShardDir); + + // Create files in the round + Path file1 = new Path(roundShardDir, roundStartTime + "_rs1.plog"); + Path file2 = new Path(roundShardDir, roundStartTime + 30000 + "_rs2.plog"); + Path file3 = new Path(roundShardDir, roundStartTime + 50000 + "_rs3.plog"); + + // Create files + localFs.create(file1, true).close(); + localFs.create(file2, true).close(); + localFs.create(file3, true).close(); + + // Call getNewFiles with same start and end round + List result = tracker.getNewFiles(round, round); + + // Verify file system operations + // Should call getNewFilesForRound once for the round + // Note: init() already called exists() on in-progress directory + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(roundShardDir)); + Mockito.verify(mockFs, times(1)).listStatus(Mockito.any(Path.class)); + + // Prepare expected set of file paths + Set expectedPaths = new HashSet<>(); + expectedPaths.add(file1.toString()); + expectedPaths.add(file2.toString()); + expectedPaths.add(file3.toString()); + + // Create actual set of paths + Set actualPaths = result.stream().map(path -> path.toUri().getPath()).collect(Collectors.toSet()); + + // Verify all files from the round are returned + assertEquals("Should return exactly 3 files from the round", expectedPaths.size(), actualPaths.size()); + assertEquals("File paths do not match", expectedPaths, actualPaths); + } + + @Test + public void testGetNewFilesWithInvalidRange() throws IOException { + // Initialize tracker + tracker.init(); + + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 1000L; + + // Create end round (earlier time) + long endRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00 + long endRoundEndTime = endRoundStartTime + roundDurationMs; + ReplicationRound endRound = new ReplicationRound(endRoundStartTime, endRoundEndTime); + + // Create start round (later time) - invalid: start > end + long startRoundStartTime = endRoundEndTime; + long startRoundEndTime = startRoundStartTime + roundDurationMs; + ReplicationRound startRound = new ReplicationRound(startRoundStartTime, startRoundEndTime); + + // Get shard directories + Path startRoundShardDir = shardManager.getShardDirectory(startRound); + Path endRoundShardDir = shardManager.getShardDirectory(endRound); + + // Create shard directories + localFs.mkdirs(startRoundShardDir); + localFs.mkdirs(endRoundShardDir); + + // Create files in both rounds + Path startRoundFile = new Path(startRoundShardDir, startRoundStartTime + "_rs1.plog"); + Path endRoundFile = new Path(endRoundShardDir, endRoundStartTime + "_rs2.plog"); + + localFs.create(startRoundFile, true).close(); + localFs.create(endRoundFile, true).close(); + + // Call getNewFiles with invalid range (startRound.getStartTime() > endRound.getStartTime()) + List result = tracker.getNewFiles(startRound, endRound); + + // Verify empty list is returned when startRound.getStartTime() > endRound.getStartTime() + assertTrue("Should return empty list for invalid range", result.isEmpty()); + + // Verify no file system operations were performed on shard directories (early return) + // Note: init() already called exists() and mkdirs() on in-progress directory + Mockito.verify(mockFs, times(0)).exists(Mockito.eq(startRoundShardDir)); + Mockito.verify(mockFs, times(0)).exists(Mockito.eq(endRoundShardDir)); + Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class)); + } + + @Test + public void testGetNewFilesWithEmptyRounds() throws IOException { + // Initialize tracker + tracker.init(); + + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 1000L; + + // Create start round + long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00 + long startRoundEndTime = startRoundStartTime + roundDurationMs; + ReplicationRound startRound = new ReplicationRound(startRoundStartTime, startRoundEndTime); + + // Create end round + long endRoundStartTime = startRoundEndTime; + long endRoundEndTime = endRoundStartTime + roundDurationMs; + ReplicationRound endRound = new ReplicationRound(endRoundStartTime, endRoundEndTime); + + // Get shard directories + Path startRoundShardDir = shardManager.getShardDirectory(startRound); + Path endRoundShardDir = shardManager.getShardDirectory(endRound); + + // Create shard directories but leave them empty + localFs.mkdirs(startRoundShardDir); + localFs.mkdirs(endRoundShardDir); + + // Call getNewFiles with empty rounds + List result = tracker.getNewFiles(startRound, endRound); + + // Verify file system operations + // Should call getNewFilesForRound for each round (start and end) + // Note: init() already called exists() on in-progress directory + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir)); + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir)); + Mockito.verify(mockFs, times(2)).listStatus(Mockito.any(Path.class)); + + // Verify empty list is returned + assertTrue("Should return empty list for empty rounds", result.isEmpty()); + } + + @Test + public void testGetNewFilesWithNonExistentRounds() throws IOException { + // Initialize tracker + tracker.init(); + + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 1000L; + + // Create start round + long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00 + long startRoundEndTime = startRoundStartTime + roundDurationMs; + ReplicationRound startRound = new ReplicationRound(startRoundStartTime, startRoundEndTime); + + // Create end round + long endRoundStartTime = startRoundEndTime; + long endRoundEndTime = endRoundStartTime + roundDurationMs; + ReplicationRound endRound = new ReplicationRound(endRoundStartTime, endRoundEndTime); + + // Get shard directories + Path startRoundShardDir = shardManager.getShardDirectory(startRound); + Path endRoundShardDir = shardManager.getShardDirectory(endRound); + + // Assert that shard directories do not exist + assertFalse("Start round shard directory should not exist", localFs.exists(startRoundShardDir)); + assertFalse("End round shard directory should not exist", localFs.exists(endRoundShardDir)); + + // Call getNewFiles with non-existent rounds + List result = tracker.getNewFiles(startRound, endRound); + + // Verify file system operations + // Should call exists() for each round (start and end) + // Note: init() already called exists() on in-progress directory + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir)); + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir)); + // listStatus() should not be called when directories don't exist + Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class)); + + // Verify empty list is returned + assertTrue("Should return empty list for non-existent rounds", result.isEmpty()); + } + private int countDirectories(FileSystem fs, Path path) throws IOException { if (!fs.exists(path)) { return 0;