diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/session/SessionLifecycleManager.java b/core/src/main/java/com/datastax/oss/driver/api/core/session/SessionLifecycleManager.java new file mode 100644 index 00000000000..813555136e8 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/session/SessionLifecycleManager.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.api.core.session; + +import com.datastax.oss.driver.internal.core.session.DefaultSession; +import com.datastax.oss.driver.internal.core.session.DefaultSessionLifecycleManager; +import java.util.concurrent.CompletionStage; + +/** + * Provides extra methods for {@link Session} lifecycle. In a suspended state the session should not + * keep any connections to cluster nodes open. + */ +public interface SessionLifecycleManager { + /** + * Creates a new manager for session lifecycle. + * + * @param session Session that should be managed. + * @return The new session lifecycle manager. + * @throws IllegalArgumentException if the session cannot be managed. + */ + static SessionLifecycleManager of(Session session) { + if (session instanceof DefaultSession) { + return new DefaultSessionLifecycleManager((DefaultSession) session); + } else { + throw new IllegalArgumentException(session + " is not an instance of DefaultSession"); + } + } + + /** + * Terminates all connections to cluster nodes. + * + * @return Stage that completes when all connections are terminated. + */ + CompletionStage suspendAsync(); + + /** Helper method invoking {@link #suspendAsync()} in a synchronous way. */ + default void suspend() { + suspendAsync().toCompletableFuture().join(); + } + + /** + * Triggers reconnection to the cluster. This reconnection proceeds asynchronously; the invocation + * does not wait for connections establishment. + */ + void resume(); + + /** + * @return True if the session is {@link #suspendAsync()} was called, until {@link #resumeAsync()} + * is called. + */ + boolean isSuspended(); +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSessionLifecycleManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSessionLifecycleManager.java new file mode 100644 index 00000000000..dbbdf2b18e0 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSessionLifecycleManager.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.session; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeState; +import com.datastax.oss.driver.api.core.session.SessionLifecycleManager; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; +import com.datastax.oss.driver.internal.core.metadata.DefaultNode; +import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent; +import com.datastax.oss.driver.internal.core.pool.ChannelPool; +import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +public class DefaultSessionLifecycleManager implements SessionLifecycleManager { + private final DefaultSession session; + private final InternalDriverContext context; + private CompletableFuture suspendFuture; + private Map lastState; + + public DefaultSessionLifecycleManager(DefaultSession session) { + this.session = session; + this.context = (InternalDriverContext) session.getContext(); + } + + @Override + public synchronized CompletionStage suspendAsync() { + if (suspendFuture != null) { + return suspendFuture; + } + suspendFuture = new CompletableFuture<>(); + // ControlConnection would try to reconnect when it receives the event that + // node was brought down; closing the very channel to this node prevents that. + this.context + .getControlConnection() + .channel() + .close() + .addListener( + f -> { + if (f.isSuccess()) { + forceNodesDown() + .whenComplete( + (ignored, throwable) -> { + if (throwable != null) { + suspendFuture.completeExceptionally(throwable); + } else { + suspendFuture.complete(null); + } + }); + } else { + suspendFuture.completeExceptionally(f.cause()); + } + }); + return suspendFuture; + } + + private CompletionStage forceNodesDown() { + lastState = new HashMap<>(); + ArrayList> closeFutures = new ArrayList<>(); + for (Map.Entry e : session.getPools().entrySet()) { + Node node = e.getKey(); + NodeState currentState = node.getState(); + lastState.put(node, currentState); + closeFutures.add(e.getValue().closeFuture()); + context + .getEventBus() + .fire(NodeStateEvent.changed(currentState, NodeState.FORCED_DOWN, (DefaultNode) node)); + } + return CompletableFutures.allDone(closeFutures); + } + + @Override + public void resume() { + if (suspendFuture == null) { + return; + } + suspendFuture.whenComplete( + (ignored, throwable) -> { + if (throwable != null || lastState == null) { + return; + } + synchronized (this) { + for (Map.Entry e : lastState.entrySet()) { + NodeStateEvent changed = + NodeStateEvent.changed( + NodeState.FORCED_DOWN, e.getValue(), (DefaultNode) e.getKey()); + this.context.getEventBus().fire(changed); + } + lastState = null; + suspendFuture = null; + context.getControlConnection().reconnectNow(); + } + }); + } + + @Override + public synchronized boolean isSuspended() { + return suspendFuture != null; + } +} diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/session/SuspendIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/session/SuspendIT.java new file mode 100644 index 00000000000..71f0a613cec --- /dev/null +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/session/SuspendIT.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.core.session; + +import static com.datastax.oss.simulacron.common.stubbing.PrimeDsl.noRows; +import static com.datastax.oss.simulacron.common.stubbing.PrimeDsl.when; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.NoNodeAvailableException; +import com.datastax.oss.driver.api.core.session.SessionLifecycleManager; +import com.datastax.oss.driver.api.testinfra.session.SessionUtils; +import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule; +import com.datastax.oss.driver.categories.ParallelizableTests; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; +import com.datastax.oss.driver.internal.core.session.PoolManager; +import com.datastax.oss.simulacron.common.cluster.ClusterSpec; +import java.util.concurrent.TimeUnit; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(ParallelizableTests.class) +public class SuspendIT { + @ClassRule + public static final SimulacronRule SIMULACRON_RULE = + new SimulacronRule(ClusterSpec.builder().withNodes(2)); + + private static final String QUERY_STRING = "select * from foo"; + + @Test + public void should_resume_after_suspend() throws Exception { + SIMULACRON_RULE.cluster().prime(when(QUERY_STRING).then(noRows())); + + CqlSession session = SessionUtils.newSession(SIMULACRON_RULE); + assertThat(session.execute(QUERY_STRING).all().size()).isEqualTo(0); + + SessionLifecycleManager manager = SessionLifecycleManager.of(session); + manager.suspend(); + + PoolManager poolManager = ((InternalDriverContext) session.getContext()).getPoolManager(); + assertThat(poolManager.getPools().size()).isEqualTo(0); + assertThatThrownBy(() -> session.execute(QUERY_STRING).all()) + .isInstanceOf(NoNodeAvailableException.class); + + manager.resume(); + + // Busy waiting - PoolManager does not expose any listeners on added node. + // After ChannelEvent.Type.OPEN the future is added to PoolManager.SingleThreaded.pending + // but this map is not exposed (and not synchronized) + await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> poolManager.getPools().size() > 0); + + assertThat(session.execute(QUERY_STRING).all().size()).isEqualTo(0); + } +}