Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ public class ReplicationLogTracker {
protected final String haGroupName;
protected MetricsReplicationLogTracker metrics;

public ReplicationLogTracker(final Configuration conf, final String haGroupName, final FileSystem fileSystem,
final ReplicationShardDirectoryManager replicationShardDirectoryManager,
final MetricsReplicationLogTracker metrics) {
public ReplicationLogTracker(final Configuration conf, final String haGroupName,
final FileSystem fileSystem,
final ReplicationShardDirectoryManager replicationShardDirectoryManager,
final MetricsReplicationLogTracker metrics) {
this.conf = conf;
this.haGroupName = haGroupName;
this.fileSystem = fileSystem;
Expand All @@ -98,8 +99,9 @@ protected String getInProgressLogSubDirectoryName() {
* exist.
*/
public void init() throws IOException {
this.inProgressDirPath = new Path(getReplicationShardDirectoryManager().getRootDirectoryPath().getParent(),
getInProgressLogSubDirectoryName());
this.inProgressDirPath = new Path(
getReplicationShardDirectoryManager().getRootDirectoryPath().getParent(),
getInProgressLogSubDirectoryName());
createDirectoryIfNotExists(inProgressDirPath);
}

Expand Down Expand Up @@ -151,6 +153,35 @@ public List<Path> getNewFilesForRound(ReplicationRound replicationRound) throws
return filesInRound;
}

/**
* Retrieves new replication log files that belong to replication rounds from startRound to
* endRound (inclusive). Iterates through all rounds in the range and collects valid log files
* from each round's shard directory.
*
* @param startRound - The starting replication round (inclusive)
* @param endRound - The ending replication round (inclusive)
* @return List of valid log file paths from startRound to endRound, empty list if
* startRound > endRound
* @throws IOException if there's an error accessing the file system
*/
public List<Path> getNewFiles(ReplicationRound startRound, ReplicationRound endRound)
throws IOException {
List<Path> files = new ArrayList<>();
// Early return if startRound is after endRound (invalid range)
if (startRound.getStartTime() > endRound.getStartTime()) {
return files;
}
// Iterate through all rounds from startRound to endRound (exclusive of endRound)
ReplicationRound firstRound = startRound;
while (!firstRound.equals(endRound)) {
files.addAll(getNewFilesForRound(firstRound));
firstRound = replicationShardDirectoryManager.getNextRound(firstRound);
}
// Add the files for the endRound (inclusive)
files.addAll(getNewFilesForRound(endRound));
return files;
}

/**
* Retrieves all valid log files currently in the in-progress directory.
* @return List of valid log file paths in the in-progress directory, empty list if directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.phoenix.replication.ReplicationLogDiscovery;
import org.apache.phoenix.replication.ReplicationLogTracker;
import org.apache.phoenix.replication.ReplicationRound;
import org.apache.phoenix.replication.ReplicationShardDirectoryManager;
import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery;
import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl;
import org.slf4j.Logger;
Expand Down Expand Up @@ -137,23 +138,28 @@ public void init() throws IOException {

LOG.info("Initializing ReplicationLogDiscoveryReplay for haGroup: {}", haGroupName);

HAGroupStateListener degradedListener = (groupName, fromState, toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
if (clusterType == ClusterType.LOCAL && HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(toState)) {
HAGroupStateListener degradedListener = (groupName, fromState, toState,
modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
if (clusterType == ClusterType.LOCAL
&& HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(toState)) {
replicationReplayState.set(ReplicationReplayState.DEGRADED);
LOG.info("Cluster degraded detected for {}. replicationReplayState={}",
haGroupName, ReplicationReplayState.DEGRADED);
}
};

HAGroupStateListener recoveryListener = (groupName, fromState, toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
if (clusterType == ClusterType.LOCAL && HAGroupStoreRecord.HAGroupState.STANDBY.equals(toState)) {
HAGroupStateListener recoveryListener = (groupName, fromState, toState,
modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
if (clusterType == ClusterType.LOCAL
&& HAGroupStoreRecord.HAGroupState.STANDBY.equals(toState)) {
replicationReplayState.set(ReplicationReplayState.SYNCED_RECOVERY);
LOG.info("Cluster recovered detected for {}. replicationReplayState={}",
haGroupName, getReplicationReplayState());
}
};

HAGroupStateListener triggerFailoverListner = (groupName, fromState, toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
HAGroupStateListener triggerFailoverListner = (groupName, fromState, toState,
modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
if (clusterType == ClusterType.LOCAL
&& HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE.equals(toState)) {
failoverPending.set(true);
Expand All @@ -163,7 +169,8 @@ public void init() throws IOException {
}
};

HAGroupStateListener abortFailoverListner = (groupName, fromState, toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
HAGroupStateListener abortFailoverListner = (groupName, fromState, toState,
modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
if (clusterType == ClusterType.LOCAL
&& HAGroupStoreRecord.HAGroupState.ABORT_TO_STANDBY.equals(toState)) {
failoverPending.set(false);
Expand Down Expand Up @@ -228,7 +235,8 @@ protected void initializeLastRoundProcessed() throws IOException {
HAGroupStoreRecord haGroupStoreRecord = getHAGroupRecord();
LOG.info("Found HA Group state during initialization as {} for haGroup: {}",
haGroupStoreRecord.getHAGroupState(), haGroupName);
if (HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(haGroupStoreRecord.getHAGroupState())) {
if (HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY
.equals(haGroupStoreRecord.getHAGroupState())) {
replicationReplayState.compareAndSet(ReplicationReplayState.NOT_INITIALIZED,
ReplicationReplayState.DEGRADED);
long minimumTimestampFromFiles = EnvironmentEdgeManager.currentTime();
Expand Down Expand Up @@ -474,19 +482,51 @@ protected HAGroupStoreRecord getHAGroupRecord() throws IOException {
* Failover is safe to trigger when all of the following conditions are met:
* 1. A failover has been requested (failoverPending is true)
* 2. No files are currently in the in-progress directory
* 3. No new files exist for ongoing round
* 3. No new files exist from the next round to process up to the current timestamp round
*
* These conditions ensure all replication logs have been processed before transitioning
* the cluster from STANDBY to ACTIVE state.
* The third condition checks for new files in the range from nextRoundToProcess (derived from
* getLastRoundProcessed()) to currentTimestampRound (derived from current time). This ensures
* all replication logs up to the current time have been processed before transitioning
* the cluster
* from STANDBY to ACTIVE state.
*
* @return true if all conditions are met and failover should be triggered, false otherwise
* @throws IOException if there's an error checking file status
*/
protected boolean shouldTriggerFailover() throws IOException {
return failoverPending.get() && replicationLogTracker.getInProgressFiles().isEmpty()
&& replicationLogTracker.getNewFilesForRound(replicationLogTracker
.getReplicationShardDirectoryManager()
.getNextRound(getLastRoundProcessed())).isEmpty();
LOG.debug("Checking if failover should be triggered. failoverPending={}", failoverPending);
// Check if failover has been requested
if (!failoverPending.get()) {
LOG.debug("Failover not triggered. failoverPending is false.");
return false;
}
// Check if in-progress directory is empty
boolean isInProgressDirectoryEmpty = replicationLogTracker.getInProgressFiles().isEmpty();
if (!isInProgressDirectoryEmpty) {
LOG.debug("Failover not triggered. In progress directory is not empty.");
return false;
}
// Check if there are any new files from next round to current timestamp round
ReplicationShardDirectoryManager replicationShardDirectoryManager =
replicationLogTracker.getReplicationShardDirectoryManager();
ReplicationRound nextRoundToProcess =
replicationShardDirectoryManager.getNextRound(getLastRoundProcessed());
ReplicationRound currentTimestampRound =
replicationShardDirectoryManager.getReplicationRoundFromStartTime(
EnvironmentEdgeManager.currentTime());
LOG.debug("Checking the new files from next round {} to current timestamp round {}.",
nextRoundToProcess, currentTimestampRound);
boolean isInDirectoryEmpty = replicationLogTracker.getNewFiles(nextRoundToProcess,
currentTimestampRound).isEmpty();

if (!isInDirectoryEmpty) {
LOG.debug("Failover not triggered. New files exist from next round to current "
+ "timestamp round.");
return false;
}

LOG.info("Failover can be triggered.");
return true;
}

protected void triggerFailover() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ public Path getInProgressDirPath() {

/**
* Tests the shouldTriggerFailover method with various combinations of failoverPending,
* in-progress files, and new files for next round.
* in-progress files, and new files from next round to current timestamp round.
*/
@Test
public void testShouldTriggerFailover() throws IOException {
Expand All @@ -1668,12 +1668,14 @@ public void testShouldTriggerFailover() throws IOException {
try {
// Create test rounds
ReplicationRound testRound = new ReplicationRound(1704153600000L, 1704153660000L);
ReplicationRound nextRound = tracker.getReplicationShardDirectoryManager().getNextRound(testRound);
ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager();
ReplicationRound nextRoundToProcess = shardManager.getNextRound(testRound);
ReplicationRound currentTimestampRound = shardManager.getReplicationRoundFromStartTime(currentTime);

// Test Case 1: All conditions true - should return true
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
Expand All @@ -1686,7 +1688,7 @@ public void testShouldTriggerFailover() throws IOException {
// Test Case 2: failoverPending is false - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
Expand All @@ -1699,7 +1701,7 @@ public void testShouldTriggerFailover() throws IOException {
// Test Case 3: in-progress files not empty - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new Path("test.plog")));
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
Expand All @@ -1709,23 +1711,23 @@ public void testShouldTriggerFailover() throws IOException {
discovery.shouldTriggerFailover());
}

// Test Case 4: new files exist for next round - should return false
// Test Case 4: new files exist from next round to current timestamp round - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new Path("test.plog")));
when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.singletonList(new Path("test.plog")));
TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(true);

assertFalse("Should not trigger failover when new files exist for next round",
assertFalse("Should not trigger failover when new files exist from next round to current timestamp round",
discovery.shouldTriggerFailover());
}

// Test Case 5: failoverPending false AND in-progress files not empty - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new Path("test.plog")));
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
Expand All @@ -1738,7 +1740,7 @@ public void testShouldTriggerFailover() throws IOException {
// Test Case 6: failoverPending false AND new files exist - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new Path("test.plog")));
when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.singletonList(new Path("test.plog")));
TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
Expand All @@ -1751,7 +1753,7 @@ public void testShouldTriggerFailover() throws IOException {
// Test Case 7: in-progress files not empty AND new files exist - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new Path("test1.plog")));
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new Path("test2.plog")));
when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.singletonList(new Path("test2.plog")));
TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
Expand All @@ -1764,7 +1766,7 @@ public void testShouldTriggerFailover() throws IOException {
// Test Case 8: All conditions false - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new Path("test.plog")));
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new Path("test2.plog")));
when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)).thenReturn(Collections.singletonList(new Path("test2.plog")));
TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
Expand Down
Loading