Skip to content

Commit

Permalink
- Set SO_LINGER in client socket if SSLSocket (https://issues.redhat.…
Browse files Browse the repository at this point in the history
…com/browse/JGRP-2748)

- Added TUNNEL.linger
  • Loading branch information
belaban committed Jan 22, 2024
1 parent 029ddce commit 002ee91
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 15 deletions.
15 changes: 6 additions & 9 deletions src/org/jgroups/blocks/cs/TcpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public TcpConnection(Address peer_addr, TcpBaseServer server) throws Exception {
this.sock=server.socketFactory().createSocket("jgroups.tcp.sock");
setSocketParameters(sock);
last_access=getTimestamp(); // last time a message was sent or received (ns)
if(sock instanceof SSLSocket) // https://issues.redhat.com/browse/JGRP-2748
sock.setSoLinger(true, 0);
}

/** Called by {@link TcpServer.Acceptor#handleAccept(Socket)} */
Expand All @@ -63,6 +65,8 @@ public TcpConnection(Socket s, TcpServer server) throws Exception {
this.peer_addr=server.usePeerConnections()? readPeerAddress(s)
: new IpAddress((InetSocketAddress)s.getRemoteSocketAddress());
last_access=getTimestamp(); // last time a message was sent or received (ns)
if(sock instanceof SSLSocket) // https://issues.redhat.com/browse/JGRP-2748
sock.setSoLinger(true, 0);
}

public Address localAddress() {
Expand Down Expand Up @@ -225,15 +229,8 @@ protected void setSocketParameters(Socket client_sock) throws SocketException {

client_sock.setKeepAlive(true);
client_sock.setTcpNoDelay(server.tcp_nodelay);
try { // todo: remove try-catch clause once https://github.com/oracle/graal/issues/1087 has been fixed
if(server.linger > 0)
client_sock.setSoLinger(true, server.linger);
else
client_sock.setSoLinger(false, -1);
}
catch(Throwable t) {
server.log().warn("%s: failed setting SO_LINGER option: %s", server.localAddress(), t);
}
if(server.linger > 0)
client_sock.setSoLinger(true, server.linger);
}


Expand Down
7 changes: 6 additions & 1 deletion src/org/jgroups/protocols/TUNNEL.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public interface TUNNELPolicy {
"GossipRouter is closed. Ignored when heartbeat_interval is 0.", type=AttributeType.TIME)
protected long heartbeat_timeout;

@Property(description="SO_LINGER in seconds. Default of -1 disables it")
protected int linger=-1; // SO_LINGER (number of seconds, -1 disables it)

/* ------------------------------------------ Fields ----------------------------------------------------- */

protected final List<InetSocketAddress> gossip_routers=new ArrayList<>();
Expand All @@ -95,6 +98,8 @@ public TUNNEL() {
public TUNNEL useNio(boolean use_nio) {this.use_nio=use_nio; return this;}
public TLS tls() {return tls;}
public TUNNEL tls(TLS t) {this.tls=t; return this;}
public int getLinger() {return linger;}
public TUNNEL setLinger(int l) {this.linger=l; return this;}

/** We can simply send a message with dest == null and the GossipRouter will take care of routing it to all
* members in the cluster */
Expand Down Expand Up @@ -215,7 +220,7 @@ public Object down(Event evt) {
try {
InetSocketAddress target=gr.isUnresolved()? new InetSocketAddress(gr.getHostString(), gr.getPort())
: new InetSocketAddress(gr.getAddress(), gr.getPort());
stubManager.createAndRegisterStub(new InetSocketAddress(bind_addr, bind_port), target)
stubManager.createAndRegisterStub(new InetSocketAddress(bind_addr, bind_port), target, linger)
.receiver(this).tcpNoDelay(tcp_nodelay);
}
catch(Throwable t) {
Expand Down
14 changes: 11 additions & 3 deletions src/org/jgroups/stack/RouterStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public interface CloseListener {void closed(RouterStub stub);}
// max number of ms to wait for socket establishment to GossipRouter
protected int sock_conn_timeout=3000;
protected boolean tcp_nodelay=true;

protected int linger=-1; // SO_LINGER (number of seconds, -1 disables it)
protected boolean handle_heartbeats;
// timestamp of last heartbeat (or message from GossipRouter)
protected volatile long last_heartbeat;
Expand All @@ -50,21 +50,27 @@ public interface CloseListener {void closed(RouterStub stub);}
protected final Map<String,List<MembersNotification>> get_members_map=new HashMap<>();


public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boolean use_nio, CloseListener l, SocketFactory sf) {
this(local_sa, remote_sa, use_nio, l, sf, -1);
}

/**
* Creates a stub to a remote_sa {@link GossipRouter}.
* @param local_sa The local_sa bind address and port
* @param remote_sa The address:port of the GossipRouter
* @param use_nio Whether to use ({@link org.jgroups.protocols.TCP_NIO2}) or {@link org.jgroups.protocols.TCP}
* @param l The {@link CloseListener}
* @param sf The {@link SocketFactory} to use to create the client socket
* @param linger SO_LINGER timeout
*/
public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boolean use_nio, CloseListener l, SocketFactory sf) {
public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boolean use_nio, CloseListener l, SocketFactory sf, int linger) {
this.local=local_sa != null? new IpAddress(local_sa.getAddress(), local_sa.getPort())
: new IpAddress((InetAddress)null,0);
this.remote_sa=Objects.requireNonNull(remote_sa);
this.use_nio=use_nio;
this.close_listener=l;
this.socket_factory=sf;
this.linger=linger;
if(resolveRemoteAddress()) // sets remote
client=createClient(sf);
}
Expand All @@ -86,6 +92,8 @@ public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boole
public RouterStub handleHeartbeats(boolean f) {handle_heartbeats=f; return this;}
public boolean handleHeartbeats() {return handle_heartbeats;}
public long lastHeartbeat() {return last_heartbeat;}
public int getLinger() {return linger;}
public RouterStub setLinger(int l) {this.linger=l; return this;}



Expand Down Expand Up @@ -255,7 +263,7 @@ protected BaseServer createClient(SocketFactory sf) {
if(sf != null) cl.socketFactory(sf);
cl.receiver(this);
cl.addConnectionListener(this);
cl.socketConnectionTimeout(sock_conn_timeout).tcpNodelay(tcp_nodelay);
cl.socketConnectionTimeout(sock_conn_timeout).tcpNodelay(tcp_nodelay).linger(linger);
return cl;
}

Expand Down
6 changes: 5 additions & 1 deletion src/org/jgroups/stack/RouterStubManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ public void forAny(Consumer<RouterStub> action) {


public RouterStub createAndRegisterStub(InetSocketAddress local, InetSocketAddress router_addr) {
RouterStub stub=new RouterStub(local, router_addr, use_nio, this, socket_factory)
return createAndRegisterStub(local, router_addr, -1);
}

public RouterStub createAndRegisterStub(InetSocketAddress local, InetSocketAddress router_addr, int linger) {
RouterStub stub=new RouterStub(local, router_addr, use_nio, this, socket_factory, linger)
.handleHeartbeats(heartbeat_interval > 0);
synchronized(stubs) {
this.stubs.add(stub);
Expand Down
2 changes: 1 addition & 1 deletion tests/junit/org/jgroups/tests/FlushTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void testFlushWithCrashedFlushCoordinator() throws Exception {
Util.shutdown(b);
Stream.of(a,c).forEach(ch -> ch.getProtocolStack().findProtocol(FLUSH.class).setLevel("debug"));

Util.waitUntilAllChannelsHaveSameView(100000, 500, a,c);
Util.waitUntilAllChannelsHaveSameView(10000, 500, a,c);

// cluster should not hang and two remaining members should have a correct view
assert a.getView().size() == 2 : String.format("A's view: %s", a.getView());
Expand Down

0 comments on commit 002ee91

Please sign in to comment.