Skip to content

Commit

Permalink
[client] Add timeout for inflight request to netty client.
Browse files Browse the repository at this point in the history
  • Loading branch information
loserwang1024 committed Feb 27, 2025
1 parent 83aef68 commit 3b0e092
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 13 deletions.
9 changes: 9 additions & 0 deletions fluss-rpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.alibaba.fluss</groupId>
<artifactId>fluss-test-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>


</dependencies>

<build>
Expand Down
3 changes: 2 additions & 1 deletion fluss-rpc/src/main/java/com/alibaba/fluss/rpc/RpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
import com.alibaba.fluss.rpc.netty.client.NettyClient;
import com.alibaba.fluss.rpc.protocol.ApiKeys;
import com.alibaba.fluss.utils.clock.SystemClock;

import java.util.concurrent.CompletableFuture;

Expand All @@ -39,7 +40,7 @@ public interface RpcClient extends AutoCloseable {
* @return The RPC client.
*/
static RpcClient create(Configuration conf, ClientMetricGroup clientMetricGroup) {
return new NettyClient(conf, clientMetricGroup);
return new NettyClient(conf, clientMetricGroup, SystemClock.getInstance());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.fluss.cluster.ServerNode;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.TimeoutException;
import com.alibaba.fluss.rpc.RpcClient;
import com.alibaba.fluss.rpc.messages.ApiMessage;
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
Expand All @@ -31,6 +32,7 @@
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelOption;
import com.alibaba.fluss.shaded.netty4.io.netty.channel.EventLoopGroup;
import com.alibaba.fluss.utils.MapUtils;
import com.alibaba.fluss.utils.clock.Clock;
import com.alibaba.fluss.utils.concurrent.FutureUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -67,12 +69,16 @@ public final class NettyClient implements RpcClient {
*/
private final Map<String, ServerConnection> connections;

private final long requestTimeoutMs;

/** Metric groups for client. */
private final ClientMetricGroup clientMetricGroup;

private final Clock clock;

private volatile boolean isClosed = false;

public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup) {
public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup, Clock clock) {
this.connections = MapUtils.newConcurrentHashMap();

// build bootstrap
Expand All @@ -84,6 +90,7 @@ public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup) {
int connectionMaxIdle =
(int) conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds();
PooledByteBufAllocator pooledAllocator = PooledByteBufAllocator.DEFAULT;
this.requestTimeoutMs = conf.get(ConfigOptions.CLIENT_REQUEST_TIMEOUT).toMillis();
this.bootstrap =
new Bootstrap()
.group(eventGroup)
Expand All @@ -93,6 +100,8 @@ public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup) {
.handler(new ClientChannelInitializer(connectionMaxIdle));
this.clientMetricGroup = clientMetricGroup;
NettyMetrics.registerNettyMetrics(clientMetricGroup, pooledAllocator);
this.clock = clock;
eventGroup.submit(this::handleTimeoutRequest);
}

/**
Expand All @@ -117,11 +126,15 @@ public boolean connect(ServerNode node) {
*/
@Override
public CompletableFuture<Void> disconnect(String serverUid) {
return disconnect(serverUid, null);
}

private CompletableFuture<Void> disconnect(String serverUid, Throwable throwable) {
LOG.debug("Disconnecting from server {}.", serverUid);
checkArgument(!isClosed, "Netty client is closed.");
ServerConnection connection = connections.remove(serverUid);
if (connection != null) {
return connection.close();
return throwable == null ? connection.close() : connection.close(throwable);
}
return FutureUtils.completedVoidFuture();
}
Expand Down Expand Up @@ -176,12 +189,40 @@ private ServerConnection getOrCreateConnection(ServerNode node) {
ignored -> {
LOG.debug("Creating connection to server {}.", node);
ServerConnection connection =
new ServerConnection(bootstrap, node, clientMetricGroup);
new ServerConnection(
bootstrap, node, requestTimeoutMs, clock, clientMetricGroup);
connection.whenClose(ignore -> connections.remove(serverId, connection));
return connection;
});
}

private void handleTimeoutRequest() {
if (isClosed || requestTimeoutMs <= 0) {
return;
}
long nextRequestTimeoutMs = requestTimeoutMs;
long now = clock.milliseconds();
for (Map.Entry<String, ServerConnection> connection : connections.entrySet()) {
if (connection.getValue().hasExpiredRequest(now)) {
LOG.info("Disconnecting from node {} due to request timeout.", connection.getKey());
disconnect(
connection.getKey(),
new TimeoutException(
String.format(
"Request from client %s timeout.", connection.getKey())));
} else {
nextRequestTimeoutMs =
Math.min(
nextRequestTimeoutMs,
connection.getValue().nextTimeoutSinceNow(now));
}
}

assert nextRequestTimeoutMs > 0;
eventGroup.schedule(
this::handleTimeoutRequest, nextRequestTimeoutMs, TimeUnit.MILLISECONDS);
}

@VisibleForTesting
Map<String, ServerConnection> connections() {
return connections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelFuture;
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelFutureListener;
import com.alibaba.fluss.utils.MapUtils;
import com.alibaba.fluss.utils.clock.Clock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -62,6 +63,9 @@ final class ServerConnection {
private final Map<Integer, InflightRequest> inflightRequests = MapUtils.newConcurrentHashMap();
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private final ConnectionMetricGroup connectionMetricGroup;
private final long requestTimeoutMs;

private final Clock clock;

private final Object lock = new Object();

Expand All @@ -83,13 +87,20 @@ final class ServerConnection {
@GuardedBy("lock")
private ServerApiVersions serverApiVersions;

ServerConnection(Bootstrap bootstrap, ServerNode node, ClientMetricGroup clientMetricGroup) {
ServerConnection(
Bootstrap bootstrap,
ServerNode node,
long requestTimeoutMs,
Clock clock,
ClientMetricGroup clientMetricGroup) {
this.node = node;
this.state = ConnectionState.CONNECTING;
bootstrap
.connect(node.host(), node.port())
.addListener((ChannelFutureListener) this::establishConnection);
this.connectionMetricGroup = clientMetricGroup.createConnectionMetricGroup(node.uid());
this.clock = clock;
this.requestTimeoutMs = requestTimeoutMs;
}

public ServerNode getServerNode() {
Expand Down Expand Up @@ -117,7 +128,7 @@ public CompletableFuture<Void> close() {
return close(new ClosedChannelException());
}

private CompletableFuture<Void> close(Throwable cause) {
CompletableFuture<Void> close(Throwable cause) {
synchronized (lock) {
if (state.isDisconnected()) {
// the connection has been closed/closing.
Expand Down Expand Up @@ -188,6 +199,30 @@ private CompletableFuture<Void> close(Throwable cause) {
return closeFuture;
}

/** Check if there is any request that has expired. */
Boolean hasExpiredRequest(long now) {
for (InflightRequest request : inflightRequests.values()) {
if (request.timeElapsedSinceSendMs(now) >= requestTimeoutMs) {
return true;
}
}

return false;
}

/** Get the next timeout since now. */
long nextTimeoutSinceNow(long now) {
long minRequestTimeoutMs = Integer.MAX_VALUE;
for (InflightRequest request : inflightRequests.values()) {

minRequestTimeoutMs =
Math.min(
minRequestTimeoutMs,
requestTimeoutMs - request.timeElapsedSinceSendMs(now));
}
return minRequestTimeoutMs;
}

// ------------------------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -299,7 +334,13 @@ private CompletableFuture<ApiMessage> doSend(

InflightRequest inflight =
new InflightRequest(
apiKey.id, version, requestCount++, rawRequest, responseFuture);
apiKey.id,
version,
requestCount++,
rawRequest,
clock.milliseconds(),
requestTimeoutMs,
responseFuture);
inflightRequests.put(inflight.requestId, inflight);

// TODO: maybe we need to add timeout for the inflight requests
Expand Down Expand Up @@ -427,17 +468,23 @@ private static class InflightRequest {
short apiVersion,
int requestId,
ApiMessage request,
long requestStartTime,
long requestTimeoutMs,
CompletableFuture<ApiMessage> responseFuture) {
this.apiKey = apiKey;
this.apiVersion = apiVersion;
this.requestId = requestId;
this.request = request;
this.responseFuture = responseFuture;
this.requestStartTime = System.currentTimeMillis();
this.requestStartTime = requestStartTime;
}

ByteBuf toByteBuf(ByteBufAllocator allocator) {
return MessageCodec.encodeRequest(allocator, apiKey, apiVersion, requestId, request);
}

public long timeElapsedSinceSendMs(long currentTimeMs) {
return Math.max(0, currentTimeMs - requestStartTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,22 @@ public class TestingGatewayService extends RpcGatewayService {
private final List<String> processorThreadNames =
Collections.synchronizedList(new ArrayList<>());

private boolean suspending = false;

public List<String> getProcessorThreadNames() {
return new ArrayList<>(processorThreadNames);
}

@Override
public CompletableFuture<ApiVersionsResponse> apiVersions(ApiVersionsRequest request) {
while (suspending) {
// suspend here to mock the network latency or broker is busy.
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
ApiManager apiManager = new ApiManager(providerType());
Set<ApiKeys> apiKeys = apiManager.enabledApis();
List<PbApiVersion> apiVersions = new ArrayList<>();
Expand Down Expand Up @@ -71,4 +81,12 @@ public ServerType providerType() {
public void shutdown() {
// do nothing.
}

public void resume() {
this.suspending = false;
}

public void suspend() {
this.suspending = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.alibaba.fluss.rpc.netty.server.NettyServer;
import com.alibaba.fluss.rpc.netty.server.RequestsMetrics;
import com.alibaba.fluss.utils.NetUtils;
import com.alibaba.fluss.utils.clock.SystemClock;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void setup() throws Exception {
}
ClientMetricGroup clientMetricGroup = TestingClientMetricGroup.newInstance();
clientGroup = clientMetricGroup.addGroup(NettyMetrics.NETTY_METRIC_GROUP);
nettyClient = new NettyClient(conf, clientMetricGroup);
nettyClient = new NettyClient(conf, clientMetricGroup, SystemClock.getInstance());
}

@AfterEach
Expand Down
Loading

0 comments on commit 3b0e092

Please sign in to comment.