2727import org .apache .phoenix .jdbc .HAGroupStateListener ;
2828import org .apache .phoenix .jdbc .HAGroupStoreManager ;
2929import org .apache .phoenix .jdbc .HAGroupStoreRecord ;
30+ import org .apache .phoenix .replication .ReplicationLogGroup .ReplicationMode ;
3031import org .apache .phoenix .replication .metrics .MetricsReplicationLogDiscovery ;
3132import org .apache .phoenix .replication .metrics .MetricsReplicationLogForwarderSourceFactory ;
3233import org .apache .phoenix .thirdparty .com .google .common .annotations .VisibleForTesting ;
@@ -49,10 +50,12 @@ public class ReplicationLogDiscoveryForwarder extends ReplicationLogDiscovery {
4950 public static final String REPLICATION_LOG_COPY_THROUGHPUT_BYTES_PER_MS_KEY =
5051 "phoenix.replication.log.copy.throughput.bytes.per.ms" ;
5152 // TODO: come up with a better default after testing
52- public static final double DEFAULT_LOG_COPY_THROUGHPUT_BYTES_PER_MS = 1 ;
53+ public static final double DEFAULT_LOG_COPY_THROUGHPUT_BYTES_PER_MS = 1.0 ;
5354
5455 private final ReplicationLogGroup logGroup ;
55- private double copyThroughputThresholdBytesPerMs ;
56+ private final double copyThroughputThresholdBytesPerMs ;
57+ // the timestamp (in future) at which we will attempt to set the HAGroup state to SYNC
58+ private long syncUpdateTS ;
5659
5760 /**
5861 * Create a tracker for the replication logs in the fallback cluster.
@@ -76,6 +79,8 @@ public ReplicationLogDiscoveryForwarder(ReplicationLogGroup logGroup) {
7679 this .copyThroughputThresholdBytesPerMs =
7780 conf .getDouble (REPLICATION_LOG_COPY_THROUGHPUT_BYTES_PER_MS_KEY ,
7881 DEFAULT_LOG_COPY_THROUGHPUT_BYTES_PER_MS );
82+ // initialize to 0
83+ this .syncUpdateTS = 0 ;
7984 }
8085
8186 @ Override
@@ -102,14 +107,7 @@ public void init() throws IOException {
102107 && HAGroupStoreRecord .HAGroupState .ACTIVE_NOT_IN_SYNC .equals (toState )) {
103108 LOG .info ("Received ACTIVE_NOT_IN_SYNC event for {}" , logGroup );
104109 // If the current mode is SYNC only then switch to SYNC_AND_FORWARD mode
105- if (logGroup .checkAndSetMode (SYNC , SYNC_AND_FORWARD )) {
106- // replication mode switched, notify the event handler
107- try {
108- logGroup .sync ();
109- } catch (IOException e ) {
110- LOG .info ("Failed to send sync event to {}" , logGroup );
111- }
112- }
110+ checkAndSetModeAndNotify (SYNC , SYNC_AND_FORWARD );
113111 }
114112 };
115113
@@ -126,14 +124,7 @@ public void init() throws IOException {
126124 && HAGroupStoreRecord .HAGroupState .ACTIVE_IN_SYNC .equals (toState )) {
127125 LOG .info ("Received ACTIVE_IN_SYNC event for {}" , logGroup );
128126 // Set the current mode to SYNC
129- if (logGroup .checkAndSetMode (SYNC_AND_FORWARD , SYNC )) {
130- // replication mode switched, notify the event handler
131- try {
132- logGroup .sync ();
133- } catch (IOException e ) {
134- LOG .info ("Failed to send sync event to {}" , logGroup );
135- }
136- }
127+ checkAndSetModeAndNotify (SYNC_AND_FORWARD , SYNC );
137128 }
138129 };
139130
@@ -148,7 +139,7 @@ public void init() throws IOException {
148139 protected void processFile (Path src ) throws IOException {
149140 FileSystem srcFS = replicationLogTracker .getFileSystem ();
150141 FileStatus srcStat = srcFS .getFileStatus (src );
151- long ts = EnvironmentEdgeManager . currentTimeMillis ( );
142+ long ts = replicationLogTracker . getFileTimestamp ( srcStat . getPath () );
152143 ReplicationShardDirectoryManager remoteShardManager = logGroup .getStandbyShardManager ();
153144 Path dst = remoteShardManager .getWriterPath (ts , logGroup .getServerName ().getServerName ());
154145 long startTime = EnvironmentEdgeManager .currentTimeMillis ();
@@ -160,14 +151,7 @@ protected void processFile(Path src) throws IOException {
160151 if (logGroup .getMode () == STORE_AND_FORWARD
161152 && isLogCopyThroughputAboveThreshold (srcStat .getLen (), copyTime )) {
162153 // start recovery by switching to SYNC_AND_FORWARD
163- if (logGroup .checkAndSetMode (STORE_AND_FORWARD , SYNC_AND_FORWARD )) {
164- // replication mode switched, notify the event handler
165- try {
166- logGroup .sync ();
167- } catch (IOException e ) {
168- LOG .info ("Failed to send sync event to {}" , logGroup );
169- }
170- }
154+ checkAndSetModeAndNotify (STORE_AND_FORWARD , SYNC_AND_FORWARD );
171155 }
172156 }
173157
@@ -183,16 +167,22 @@ protected void processNoMoreRoundsLeft() throws IOException {
183167 LOG .info ("Processed all the replication log files for {}" , logGroup );
184168 // if this RS is still in STORE_AND_FORWARD mode like when it didn't process any file
185169 // move this RS to SYNC_AND_FORWARD
186- if (logGroup .checkAndSetMode (STORE_AND_FORWARD , SYNC_AND_FORWARD )) {
187- // replication mode switched, notify the event handler
170+ checkAndSetModeAndNotify (STORE_AND_FORWARD , SYNC_AND_FORWARD );
171+
172+ if (syncUpdateTS <= EnvironmentEdgeManager .currentTimeMillis ()) {
188173 try {
189- logGroup .sync ();
190- } catch (IOException e ) {
191- LOG .info ("Failed to send sync event to {}" , logGroup );
174+ long waitTime = logGroup .setHAGroupStatusToSync ();
175+ if (waitTime != 0 ) {
176+ syncUpdateTS = EnvironmentEdgeManager .currentTimeMillis () + waitTime ;
177+ LOG .info ("HAGroup {} will try to update HA state to sync at {}" ,
178+ logGroup , syncUpdateTS );
179+ } else {
180+ LOG .info ("HAGroup {} updated HA state to SYNC" , logGroup );
181+ }
182+ } catch (Exception e ) {
183+ LOG .info ("Could not update status to sync for {}" , logGroup , e );
192184 }
193185 }
194- // TODO ensure the mTime on the group store record is older than the wait sync timeout
195- logGroup .setHAGroupStatusToSync ();
196186 }
197187 }
198188
@@ -205,7 +195,7 @@ protected void processNoMoreRoundsLeft() throws IOException {
205195 * @return True if the throughput is good else false
206196 */
207197 private boolean isLogCopyThroughputAboveThreshold (long fileSize , long copyTime ) {
208- double actualThroughputBytesPerMs = copyTime != 0 ? fileSize /copyTime : 0 ;
198+ double actualThroughputBytesPerMs = copyTime != 0 ? (( double ) fileSize ) /copyTime : 0 ;
209199 return actualThroughputBytesPerMs >= copyThroughputThresholdBytesPerMs ;
210200 }
211201
@@ -219,4 +209,20 @@ protected MetricsReplicationLogDiscovery createMetricsSource() {
219209 protected ReplicationLogTracker getReplicationLogTracker () {
220210 return replicationLogTracker ;
221211 }
212+
213+ /**
214+ * Helper API to check and set the replication mode and then notify the disruptor
215+ */
216+ private boolean checkAndSetModeAndNotify (ReplicationMode expectedMode , ReplicationMode newMode ) {
217+ boolean ret = logGroup .checkAndSetMode (expectedMode , newMode );
218+ if (ret ) {
219+ // replication mode switched, notify the event handler
220+ try {
221+ logGroup .sync ();
222+ } catch (IOException e ) {
223+ LOG .info ("Failed to notify event handler for {}" , logGroup , e );
224+ }
225+ }
226+ return ret ;
227+ }
222228}
0 commit comments