From 3b0e092b2ed81123d5acc35ef51faba5c0d849c5 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 27 Feb 2025 11:05:06 +0800 Subject: [PATCH] [client] Add timeout for inflight request to netty client. --- fluss-rpc/pom.xml | 9 +++ .../java/com/alibaba/fluss/rpc/RpcClient.java | 3 +- .../fluss/rpc/netty/client/NettyClient.java | 47 ++++++++++++++- .../rpc/netty/client/ServerConnection.java | 55 ++++++++++++++++-- .../fluss/rpc/TestingGatewayService.java | 18 ++++++ .../fluss/rpc/netty/NettyMetricsTest.java | 3 +- .../rpc/netty/client/NettyClientTest.java | 57 ++++++++++++++++++- .../fluss/server/ServerITCaseBase.java | 6 +- .../server/utils/RpcGatewayManagerTest.java | 5 +- 9 files changed, 190 insertions(+), 13 deletions(-) diff --git a/fluss-rpc/pom.xml b/fluss-rpc/pom.xml index 142f30e21..6737d3531 100644 --- a/fluss-rpc/pom.xml +++ b/fluss-rpc/pom.xml @@ -58,6 +58,15 @@ test-jar test + + + com.alibaba.fluss + fluss-test-utils + ${project.version} + test + + + diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/RpcClient.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/RpcClient.java index ea40a0ac2..77a91e28a 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/RpcClient.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/RpcClient.java @@ -22,6 +22,7 @@ import com.alibaba.fluss.rpc.metrics.ClientMetricGroup; import com.alibaba.fluss.rpc.netty.client.NettyClient; import com.alibaba.fluss.rpc.protocol.ApiKeys; +import com.alibaba.fluss.utils.clock.SystemClock; import java.util.concurrent.CompletableFuture; @@ -39,7 +40,7 @@ public interface RpcClient extends AutoCloseable { * @return The RPC client. */ static RpcClient create(Configuration conf, ClientMetricGroup clientMetricGroup) { - return new NettyClient(conf, clientMetricGroup); + return new NettyClient(conf, clientMetricGroup, SystemClock.getInstance()); } /** diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/NettyClient.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/NettyClient.java index 0484839c7..a13231683 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/NettyClient.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/NettyClient.java @@ -20,6 +20,7 @@ import com.alibaba.fluss.cluster.ServerNode; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.exception.TimeoutException; import com.alibaba.fluss.rpc.RpcClient; import com.alibaba.fluss.rpc.messages.ApiMessage; import com.alibaba.fluss.rpc.metrics.ClientMetricGroup; @@ -31,6 +32,7 @@ import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelOption; import com.alibaba.fluss.shaded.netty4.io.netty.channel.EventLoopGroup; import com.alibaba.fluss.utils.MapUtils; +import com.alibaba.fluss.utils.clock.Clock; import com.alibaba.fluss.utils.concurrent.FutureUtils; import org.slf4j.Logger; @@ -67,12 +69,16 @@ public final class NettyClient implements RpcClient { */ private final Map connections; + private final long requestTimeoutMs; + /** Metric groups for client. */ private final ClientMetricGroup clientMetricGroup; + private final Clock clock; + private volatile boolean isClosed = false; - public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup) { + public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup, Clock clock) { this.connections = MapUtils.newConcurrentHashMap(); // build bootstrap @@ -84,6 +90,7 @@ public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup) { int connectionMaxIdle = (int) conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds(); PooledByteBufAllocator pooledAllocator = PooledByteBufAllocator.DEFAULT; + this.requestTimeoutMs = conf.get(ConfigOptions.CLIENT_REQUEST_TIMEOUT).toMillis(); this.bootstrap = new Bootstrap() .group(eventGroup) @@ -93,6 +100,8 @@ public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup) { .handler(new ClientChannelInitializer(connectionMaxIdle)); this.clientMetricGroup = clientMetricGroup; NettyMetrics.registerNettyMetrics(clientMetricGroup, pooledAllocator); + this.clock = clock; + eventGroup.submit(this::handleTimeoutRequest); } /** @@ -117,11 +126,15 @@ public boolean connect(ServerNode node) { */ @Override public CompletableFuture disconnect(String serverUid) { + return disconnect(serverUid, null); + } + + private CompletableFuture disconnect(String serverUid, Throwable throwable) { LOG.debug("Disconnecting from server {}.", serverUid); checkArgument(!isClosed, "Netty client is closed."); ServerConnection connection = connections.remove(serverUid); if (connection != null) { - return connection.close(); + return throwable == null ? connection.close() : connection.close(throwable); } return FutureUtils.completedVoidFuture(); } @@ -176,12 +189,40 @@ private ServerConnection getOrCreateConnection(ServerNode node) { ignored -> { LOG.debug("Creating connection to server {}.", node); ServerConnection connection = - new ServerConnection(bootstrap, node, clientMetricGroup); + new ServerConnection( + bootstrap, node, requestTimeoutMs, clock, clientMetricGroup); connection.whenClose(ignore -> connections.remove(serverId, connection)); return connection; }); } + private void handleTimeoutRequest() { + if (isClosed || requestTimeoutMs <= 0) { + return; + } + long nextRequestTimeoutMs = requestTimeoutMs; + long now = clock.milliseconds(); + for (Map.Entry connection : connections.entrySet()) { + if (connection.getValue().hasExpiredRequest(now)) { + LOG.info("Disconnecting from node {} due to request timeout.", connection.getKey()); + disconnect( + connection.getKey(), + new TimeoutException( + String.format( + "Request from client %s timeout.", connection.getKey()))); + } else { + nextRequestTimeoutMs = + Math.min( + nextRequestTimeoutMs, + connection.getValue().nextTimeoutSinceNow(now)); + } + } + + assert nextRequestTimeoutMs > 0; + eventGroup.schedule( + this::handleTimeoutRequest, nextRequestTimeoutMs, TimeUnit.MILLISECONDS); + } + @VisibleForTesting Map connections() { return connections; diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java index 0bc998bd3..767745447 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java @@ -36,6 +36,7 @@ import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelFuture; import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelFutureListener; import com.alibaba.fluss.utils.MapUtils; +import com.alibaba.fluss.utils.clock.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,9 @@ final class ServerConnection { private final Map inflightRequests = MapUtils.newConcurrentHashMap(); private final CompletableFuture closeFuture = new CompletableFuture<>(); private final ConnectionMetricGroup connectionMetricGroup; + private final long requestTimeoutMs; + + private final Clock clock; private final Object lock = new Object(); @@ -83,13 +87,20 @@ final class ServerConnection { @GuardedBy("lock") private ServerApiVersions serverApiVersions; - ServerConnection(Bootstrap bootstrap, ServerNode node, ClientMetricGroup clientMetricGroup) { + ServerConnection( + Bootstrap bootstrap, + ServerNode node, + long requestTimeoutMs, + Clock clock, + ClientMetricGroup clientMetricGroup) { this.node = node; this.state = ConnectionState.CONNECTING; bootstrap .connect(node.host(), node.port()) .addListener((ChannelFutureListener) this::establishConnection); this.connectionMetricGroup = clientMetricGroup.createConnectionMetricGroup(node.uid()); + this.clock = clock; + this.requestTimeoutMs = requestTimeoutMs; } public ServerNode getServerNode() { @@ -117,7 +128,7 @@ public CompletableFuture close() { return close(new ClosedChannelException()); } - private CompletableFuture close(Throwable cause) { + CompletableFuture close(Throwable cause) { synchronized (lock) { if (state.isDisconnected()) { // the connection has been closed/closing. @@ -188,6 +199,30 @@ private CompletableFuture close(Throwable cause) { return closeFuture; } + /** Check if there is any request that has expired. */ + Boolean hasExpiredRequest(long now) { + for (InflightRequest request : inflightRequests.values()) { + if (request.timeElapsedSinceSendMs(now) >= requestTimeoutMs) { + return true; + } + } + + return false; + } + + /** Get the next timeout since now. */ + long nextTimeoutSinceNow(long now) { + long minRequestTimeoutMs = Integer.MAX_VALUE; + for (InflightRequest request : inflightRequests.values()) { + + minRequestTimeoutMs = + Math.min( + minRequestTimeoutMs, + requestTimeoutMs - request.timeElapsedSinceSendMs(now)); + } + return minRequestTimeoutMs; + } + // ------------------------------------------------------------------------------------------ /** @@ -299,7 +334,13 @@ private CompletableFuture doSend( InflightRequest inflight = new InflightRequest( - apiKey.id, version, requestCount++, rawRequest, responseFuture); + apiKey.id, + version, + requestCount++, + rawRequest, + clock.milliseconds(), + requestTimeoutMs, + responseFuture); inflightRequests.put(inflight.requestId, inflight); // TODO: maybe we need to add timeout for the inflight requests @@ -427,17 +468,23 @@ private static class InflightRequest { short apiVersion, int requestId, ApiMessage request, + long requestStartTime, + long requestTimeoutMs, CompletableFuture responseFuture) { this.apiKey = apiKey; this.apiVersion = apiVersion; this.requestId = requestId; this.request = request; this.responseFuture = responseFuture; - this.requestStartTime = System.currentTimeMillis(); + this.requestStartTime = requestStartTime; } ByteBuf toByteBuf(ByteBufAllocator allocator) { return MessageCodec.encodeRequest(allocator, apiKey, apiVersion, requestId, request); } + + public long timeElapsedSinceSendMs(long currentTimeMs) { + return Math.max(0, currentTimeMs - requestStartTime); + } } } diff --git a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingGatewayService.java b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingGatewayService.java index b7ec879cd..ea46bdb4b 100644 --- a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingGatewayService.java +++ b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingGatewayService.java @@ -35,12 +35,22 @@ public class TestingGatewayService extends RpcGatewayService { private final List processorThreadNames = Collections.synchronizedList(new ArrayList<>()); + private boolean suspending = false; + public List getProcessorThreadNames() { return new ArrayList<>(processorThreadNames); } @Override public CompletableFuture apiVersions(ApiVersionsRequest request) { + while (suspending) { + // suspend here to mock the network latency or broker is busy. + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ApiManager apiManager = new ApiManager(providerType()); Set apiKeys = apiManager.enabledApis(); List apiVersions = new ArrayList<>(); @@ -71,4 +81,12 @@ public ServerType providerType() { public void shutdown() { // do nothing. } + + public void resume() { + this.suspending = false; + } + + public void suspend() { + this.suspending = true; + } } diff --git a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/NettyMetricsTest.java b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/NettyMetricsTest.java index 51726c5a5..c86cb2f0b 100644 --- a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/NettyMetricsTest.java +++ b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/NettyMetricsTest.java @@ -31,6 +31,7 @@ import com.alibaba.fluss.rpc.netty.server.NettyServer; import com.alibaba.fluss.rpc.netty.server.RequestsMetrics; import com.alibaba.fluss.utils.NetUtils; +import com.alibaba.fluss.utils.clock.SystemClock; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -70,7 +71,7 @@ public void setup() throws Exception { } ClientMetricGroup clientMetricGroup = TestingClientMetricGroup.newInstance(); clientGroup = clientMetricGroup.addGroup(NettyMetrics.NETTY_METRIC_GROUP); - nettyClient = new NettyClient(conf, clientMetricGroup); + nettyClient = new NettyClient(conf, clientMetricGroup, SystemClock.getInstance()); } @AfterEach diff --git a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/client/NettyClientTest.java b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/client/NettyClientTest.java index 193589024..3f198ad89 100644 --- a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/client/NettyClientTest.java +++ b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/client/NettyClientTest.java @@ -20,6 +20,7 @@ import com.alibaba.fluss.cluster.ServerType; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.exception.TimeoutException; import com.alibaba.fluss.metrics.groups.MetricGroup; import com.alibaba.fluss.metrics.util.NOPMetricsGroup; import com.alibaba.fluss.rpc.TestingGatewayService; @@ -33,13 +34,16 @@ import com.alibaba.fluss.rpc.netty.server.NettyServer; import com.alibaba.fluss.rpc.netty.server.RequestsMetrics; import com.alibaba.fluss.rpc.protocol.ApiKeys; +import com.alibaba.fluss.testutils.common.CommonTestUtils; import com.alibaba.fluss.utils.NetUtils; +import com.alibaba.fluss.utils.clock.ManualClock; import com.alibaba.fluss.utils.concurrent.FutureUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -47,6 +51,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import static com.alibaba.fluss.utils.NetUtils.getAvailablePort; import static org.assertj.core.api.Assertions.assertThat; @@ -54,7 +59,7 @@ /** Test for {@link NettyClient}. */ final class NettyClientTest { - + private ManualClock clock; private Configuration conf; private NettyClient nettyClient; private ServerNode serverNode; @@ -63,10 +68,12 @@ final class NettyClientTest { @BeforeEach public void setup() throws Exception { + clock = new ManualClock(System.currentTimeMillis()); conf = new Configuration(); // 3 worker threads is enough for this test conf.setInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS, 3); - nettyClient = new NettyClient(conf, TestingClientMetricGroup.newInstance()); + conf.setString(ConfigOptions.CLIENT_REQUEST_TIMEOUT.key(), "300ms"); + nettyClient = new NettyClient(conf, TestingClientMetricGroup.newInstance(), clock); buildNettyServer(1); } @@ -181,6 +188,52 @@ void testBindFailureDetection() { assertThat(NettyUtils.isBindFailure(ex)).isFalse(); } + @Test + void testRequestTimeout() throws Exception { + nettyClient.connect(serverNode); + CommonTestUtils.waitUtil( + () -> nettyClient.isReady(serverNode.uid()), + Duration.ofMinutes(1), + "connection timeout"); + + // suspend the service to mock network or broker busy. + service.suspend(); + + int numRequests = 10; + List> futures = new ArrayList<>(); + for (int i = 0; i < numRequests; i++) { + ApiVersionsRequest request = + new ApiVersionsRequest() + .setClientSoftwareName("testing_client" + i) + .setClientSoftwareVersion("1.0"); + futures.add(nettyClient.sendRequest(serverNode, ApiKeys.API_VERSIONS, request)); + } + + clock.advanceTime(350, TimeUnit.MILLISECONDS); + futures.forEach( + future -> + assertThatThrownBy(future::get) + .cause() + .isExactlyInstanceOf(TimeoutException.class) + .hasMessageContaining("Request from client cs-1 timeout.")); + + assertThat(nettyClient.connections().size()).isEqualTo(0); + + // resume the service to mock network or broker norman. + service.resume(); + futures = new ArrayList<>(); + for (int i = 0; i < numRequests; i++) { + ApiVersionsRequest request = + new ApiVersionsRequest() + .setClientSoftwareName("testing_client" + i) + .setClientSoftwareVersion("1.0"); + futures.add(nettyClient.sendRequest(serverNode, ApiKeys.API_VERSIONS, request)); + } + + FutureUtils.waitForAll(futures).get(); + assertThat(nettyClient.connections().size()).isEqualTo(1); + } + private void buildNettyServer(int serverId) throws Exception { try (NetUtils.Port availablePort = getAvailablePort()) { serverNode = diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/ServerITCaseBase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/ServerITCaseBase.java index 18957e4de..394ff54d4 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/ServerITCaseBase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/ServerITCaseBase.java @@ -33,6 +33,7 @@ import com.alibaba.fluss.server.zk.ZooKeeperExtension; import com.alibaba.fluss.testutils.common.AllCallbackWrapper; import com.alibaba.fluss.testutils.common.CommonTestUtils; +import com.alibaba.fluss.utils.clock.SystemClock; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -125,7 +126,10 @@ private void waitUntilServerStartup(TestProcessBuilder.TestProcess process) { private void testConnectionToServer() throws Exception { try (NettyClient client = - new NettyClient(new Configuration(), TestingClientMetricGroup.newInstance())) { + new NettyClient( + new Configuration(), + TestingClientMetricGroup.newInstance(), + SystemClock.getInstance())) { RpcGateway gateway = GatewayClientProxy.createGatewayProxy( this::getServerNode, client, getRpcGatewayClass()); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/utils/RpcGatewayManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/utils/RpcGatewayManagerTest.java index 4b65e1943..64aef74d7 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/utils/RpcGatewayManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/utils/RpcGatewayManagerTest.java @@ -22,6 +22,7 @@ import com.alibaba.fluss.rpc.gateway.TabletServerGateway; import com.alibaba.fluss.rpc.metrics.TestingClientMetricGroup; import com.alibaba.fluss.rpc.netty.client.NettyClient; +import com.alibaba.fluss.utils.clock.SystemClock; import org.junit.jupiter.api.Test; @@ -35,7 +36,9 @@ void testRpcGatewayManage() throws Exception { RpcGatewayManager gatewayRpcGatewayManager = new RpcGatewayManager<>( new NettyClient( - new Configuration(), TestingClientMetricGroup.newInstance()), + new Configuration(), + TestingClientMetricGroup.newInstance(), + SystemClock.getInstance()), TabletServerGateway.class); ServerNode serverNode1 = new ServerNode(1, "localhost", 1234, ServerType.TABLET_SERVER);