1717 */
1818package org .apache .phoenix .replication ;
1919
20- import static org .apache .hadoop .hbase .HConstants .DEFAULT_ZK_SESSION_TIMEOUT ;
21- import static org .apache .hadoop .hbase .HConstants .ZK_SESSION_TIMEOUT ;
22- import static org .apache .phoenix .replication .ReplicationLogGroup .LogEvent .EVENT_TYPE_DATA ;
23- import static org .apache .phoenix .replication .ReplicationLogGroup .LogEvent .EVENT_TYPE_SYNC ;
24- import static org .apache .phoenix .replication .ReplicationLogGroup .ReplicationMode .INIT ;
25- import static org .apache .phoenix .replication .ReplicationLogGroup .ReplicationMode .STORE_AND_FORWARD ;
26- import static org .apache .phoenix .replication .ReplicationLogGroup .ReplicationMode .SYNC ;
27- import static org .apache .phoenix .replication .ReplicationLogGroup .ReplicationMode .SYNC_AND_FORWARD ;
28-
2920import java .io .IOException ;
3021import java .io .InterruptedIOException ;
3122import java .net .URI ;
3728import java .util .concurrent .CompletableFuture ;
3829import java .util .concurrent .ConcurrentHashMap ;
3930import java .util .concurrent .ExecutionException ;
31+ import java .util .concurrent .ExecutorService ;
32+ import java .util .concurrent .Executors ;
4033import java .util .concurrent .TimeUnit ;
4134import java .util .concurrent .TimeoutException ;
4235import java .util .concurrent .atomic .AtomicBoolean ;
7063import com .lmax .disruptor .dsl .Disruptor ;
7164import com .lmax .disruptor .dsl .ProducerType ;
7265
66+ import static org .apache .hadoop .hbase .HConstants .DEFAULT_ZK_SESSION_TIMEOUT ;
67+ import static org .apache .hadoop .hbase .HConstants .ZK_SESSION_TIMEOUT ;
68+ import static org .apache .phoenix .replication .ReplicationLogGroup .LogEvent .EVENT_TYPE_DATA ;
69+ import static org .apache .phoenix .replication .ReplicationLogGroup .LogEvent .EVENT_TYPE_SYNC ;
70+ import static org .apache .phoenix .replication .ReplicationLogGroup .ReplicationMode .INIT ;
71+ import static org .apache .phoenix .replication .ReplicationLogGroup .ReplicationMode .STORE_AND_FORWARD ;
72+ import static org .apache .phoenix .replication .ReplicationLogGroup .ReplicationMode .SYNC ;
73+ import static org .apache .phoenix .replication .ReplicationLogGroup .ReplicationMode .SYNC_AND_FORWARD ;
74+
7375/**
7476 * ReplicationLogGroup manages replication logs for a given HA Group.
7577 * <p>
@@ -166,7 +168,7 @@ public class ReplicationLogGroup {
166168 public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
167169 "phoenix.replication.log.retry.delay.ms" ;
168170 public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L ;
169- private static final long DEFAULT_HDFS_WRITE_RPC_TIMEOUT_MS = 30 * 1000 ;
171+ private static final long DEFAULT_HDFS_WRITE_RPC_TIMEOUT_MS = 30 * 1000 ;
170172
171173 public static final String STANDBY_DIR = "in" ;
172174 public static final String FALLBACK_DIR = "out" ;
@@ -277,7 +279,7 @@ ReplicationMode getMode() {
277279 }
278280 }
279281
280- private static final ImmutableMap <ReplicationMode , EnumSet <ReplicationMode >> allowedTransition =
282+ private static final ImmutableMap <ReplicationMode , EnumSet <ReplicationMode >> VALID_TRANSITIONS =
281283 Maps .immutableEnumMap (ImmutableMap .of (
282284 INIT , EnumSet .of (SYNC , STORE_AND_FORWARD ),
283285 SYNC , EnumSet .of (STORE_AND_FORWARD , SYNC_AND_FORWARD ),
@@ -317,6 +319,8 @@ public Record(String tableName, long commitId, Mutation mutation) {
317319 this .mutation = mutation ;
318320 }
319321 }
322+ // executor service used to do asynchronous close
323+ protected ExecutorService disruptorExecutor ;
320324 protected Disruptor <LogEvent > disruptor ;
321325 protected RingBuffer <LogEvent > ringBuffer ;
322326 protected LogEventHandler eventHandler ;
@@ -448,8 +452,8 @@ protected long calculateSyncTimeout() {
448452 DEFAULT_HDFS_WRITE_RPC_TIMEOUT_MS );
449453 // account for HAGroupStore update when we switch replication mode
450454 long zkTimeoutMs = conf .getLong (ZK_SESSION_TIMEOUT , DEFAULT_ZK_SESSION_TIMEOUT );
451- long totalRpcTimeout = maxAttempts * wrtiteRpcTimeout + (maxAttempts - 1 )* retryDelayMs ;
452- return 2 * totalRpcTimeout + zkTimeoutMs ;
455+ long totalRpcTimeout = maxAttempts * wrtiteRpcTimeout + (maxAttempts - 1 ) * retryDelayMs ;
456+ return 2 * totalRpcTimeout + zkTimeoutMs ;
453457 }
454458
455459 /**
@@ -468,14 +472,14 @@ protected void initializeReplicationMode() throws IOException {
468472 } else if (haGroupState .equals (HAGroupState .ACTIVE_NOT_IN_SYNC )) {
469473 setMode (STORE_AND_FORWARD );
470474 } else {
471- String message = String .format ("HAGroup %s got an unexpected state %s while " +
472- "initializing mode" , this , haGroupState );
475+ String message = String .format ("HAGroup %s got an unexpected state %s while "
476+ + "initializing mode" , this , haGroupState );
473477 LOG .error (message );
474478 throw new IOException (message );
475479 }
476480 } else {
477- String message = String .format ("HAGroup %s got an empty group store record while " +
478- "initializing mode" , this );
481+ String message = String .format ("HAGroup %s got an empty group store record while "
482+ + "initializing mode" , this );
479483 LOG .error (message );
480484 throw new IOException (message );
481485 }
@@ -486,10 +490,11 @@ protected void initializeReplicationMode() throws IOException {
486490 protected void initializeDisruptor () throws IOException {
487491 int ringBufferSize = conf .getInt (REPLICATION_LOG_RINGBUFFER_SIZE_KEY ,
488492 DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE );
489- disruptor = new Disruptor <>( LogEvent . EVENT_FACTORY , ringBufferSize ,
493+ this . disruptorExecutor = Executors . newCachedThreadPool (
490494 new ThreadFactoryBuilder ()
491495 .setNameFormat ("ReplicationLogGroup-" + getHAGroupName () + "-%d" )
492- .setDaemon (true ).build (),
496+ .setDaemon (true ).build ());
497+ disruptor = new Disruptor <>(LogEvent .EVENT_FACTORY , ringBufferSize , disruptorExecutor ,
493498 ProducerType .MULTI , new YieldingWaitStrategy ());
494499 eventHandler = new LogEventHandler ();
495500 eventHandler .init ();
@@ -629,6 +634,7 @@ protected void closeOnError() {
629634 if (closed ) {
630635 return ;
631636 }
637+ // setting closed to true prevents future producers to add events to the ring buffer
632638 closed = true ;
633639 }
634640 // Directly halt the disruptor. shutdown() would wait for events to drain. We are expecting
@@ -651,34 +657,49 @@ public void close() {
651657 if (closed ) {
652658 return ;
653659 }
660+ // setting closed to true prevents future producers to add events to the ring buffer
654661 closed = true ;
655662 // Remove from instances cache
656663 INSTANCES .remove (haGroupName );
657664 // Sync before shutting down to flush all pending appends.
658665 try {
659666 syncInternal ();
660667 gracefulShutdownEventHandlerFlag .set (true );
661- disruptor .shutdown (); // Wait for a clean shutdown.
668+ // waits until all the events in the disruptor have been processed
669+ disruptor .shutdown ();
662670 } catch (IOException e ) {
663671 LOG .warn ("Error during final sync on close" , e );
664672 gracefulShutdownEventHandlerFlag .set (false );
665673 disruptor .halt (); // Go directly to halt.
666674 }
667- // TODO revisit close logic and the below comment
668- // We must wait for the disruptor before closing the writers.
675+ // wait for the disruptor threads to finish
676+ shutdownDisruptorExecutor ();
669677 metrics .close ();
670678 LOG .info ("HAGroup {} closed" , this );
671679 }
672680 }
673681
682+ private void shutdownDisruptorExecutor () {
683+ disruptorExecutor .shutdown ();
684+ try {
685+ if (!disruptorExecutor .awaitTermination (5 , TimeUnit .SECONDS )) {
686+ LOG .warn ("HAGroup {} shutdown of disruptor executor service timed out " , this );
687+ disruptorExecutor .shutdownNow ();
688+ }
689+ } catch (InterruptedException e ) {
690+ Thread .currentThread ().interrupt ();
691+ disruptorExecutor .shutdownNow ();
692+ }
693+ LOG .info ("HAGroup {} shutdown disruptor executor service" , this );
694+ }
674695 /**
675696 * Switch the replication mode to the new mode
676697 *
677698 * @param newReplicationMode The new replication mode
678699 * @return previous replication mode
679700 */
680701 protected ReplicationMode setMode (ReplicationMode newReplicationMode ) {
681- ReplicationMode previous = mode .getAndUpdate ( current -> newReplicationMode );
702+ ReplicationMode previous = mode .getAndUpdate (current -> newReplicationMode );
682703 if (previous != newReplicationMode ) {
683704 LOG .info ("HAGroup {} switched from {} to {}" , this , previous , newReplicationMode );
684705 }
@@ -699,8 +720,8 @@ protected boolean checkAndSetMode(ReplicationMode expectedReplicationMode,
699720 LOG .info ("HAGroup {} conditionally switched from {} to {}" , this ,
700721 expectedReplicationMode , newReplicationMode );
701722 } else {
702- LOG .info ("HAGroup {} ignoring attempt to switch replication mode to {} " +
703- "because expected={} != actual={}" , this , newReplicationMode ,
723+ LOG .info ("HAGroup {} ignoring attempt to switch replication mode to {} "
724+ + "because expected={} != actual={}" , this , newReplicationMode ,
704725 expectedReplicationMode , getMode ());
705726 }
706727 return updated ;
@@ -831,8 +852,7 @@ protected ReplicationLog getActiveLog() {
831852 protected void setHAGroupStatusToStoreAndForward () throws Exception {
832853 try {
833854 haGroupStoreManager .setHAGroupStatusToStoreAndForward (haGroupName );
834- }
835- catch (Exception ex ) {
855+ } catch (Exception ex ) {
836856 LOG .info ("HAGroup {} failed to set status to STORE_AND_FORWARD" , this , ex );
837857 throw ex ;
838858 }
@@ -844,8 +864,7 @@ protected void setHAGroupStatusToSync() throws IOException {
844864 } catch (IOException ex ) {
845865 // TODO logging
846866 throw ex ;
847- }
848- catch (Exception ex ) {
867+ } catch (Exception ex ) {
849868 // TODO logging
850869 throw new IOException (ex );
851870 }
@@ -906,7 +925,9 @@ private void updateModeOnFailure(IOException e) throws IOException {
906925 // send the failed event to the current mode
907926 ReplicationMode newMode = currentModeImpl .onFailure (e );
908927 setMode (newMode );
909- currentModeImpl .onExit (true );
928+ // on failure call the exit asynchronously
929+ disruptorExecutor .execute (() ->
930+ currentModeImpl .onExit (true ));
910931 initializeMode (newMode );
911932 }
912933
@@ -918,7 +939,7 @@ private void updateModeOnFailure(IOException e) throws IOException {
918939 * <li>Syncs the current writer to ensure all data is durably written.</li>
919940 * <li>Completes all pending sync futures successfully.</li>
920941 * <li>Clears the list of pending sync futures.</li>
921- * <li>Clears the current batch of records since they have been successfully synced. </li>
942+ * <li>Checks if we need to switch mode </li>
922943 * </ol>
923944 * @param sequence The sequence number of the last processed event
924945 * @throws IOException if the sync operation fails
@@ -935,6 +956,22 @@ private void processPendingSyncs(long sequence) throws IOException {
935956 }
936957 pendingSyncFutures .clear ();
937958 LOG .info ("Sync operation completed successfully up to sequence {}" , sequence );
959+ // after a successful sync check the mode set on the replication group
960+ // Doing the mode check on sync points makes the implementation more robust
961+ // since we can guarantee that all unsynced appends have been flushed to the
962+ // replication log before we switch the replication mode
963+ ReplicationMode newMode = getMode ();
964+ if (newMode != currentModeImpl .getMode ()) {
965+ // some other thread switched the mode on the replication group
966+ LOG .info ("Mode switched at sequence {} from {} to {}" ,
967+ sequence , currentModeImpl , newMode );
968+ // call exit on the last mode here since we can guarantee that the lastMode
969+ // is not processing any event like append/sync because this is the only thread
970+ // that is consuming the events from the ring buffer and handing them off to the
971+ // mode
972+ currentModeImpl .onExit (true );
973+ initializeMode (newMode );
974+ }
938975 }
939976
940977 /**
@@ -1050,22 +1087,6 @@ public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Ex
10501087 if (endOfBatch ) {
10511088 processPendingSyncs (sequence );
10521089 }
1053- // after a successful sync check the mode set on the replication group
1054- // Doing the mode check on sync points makes the implementation more robust
1055- // since we can guarantee that all unsynced appends have been flushed to the
1056- // replication log before we switch the replication mode
1057- ReplicationMode newMode = getMode ();
1058- if (newMode != currentModeImpl .getMode ()) {
1059- // some other thread switched the mode on the replication group
1060- LOG .info ("Mode switched at sequence {} from {} to {}" ,
1061- sequence , currentModeImpl , newMode );
1062- // call exit on the last mode here since we can guarantee that the lastMode
1063- // is not processing any event like append/sync because this is the only thread
1064- // that is consuming the events from the ring buffer and handing them off to the
1065- // mode
1066- currentModeImpl .onExit (true );
1067- initializeMode (newMode );
1068- }
10691090 return ;
10701091 default :
10711092 throw new UnsupportedOperationException ("Unknown event type: "
0 commit comments