From 002ee91d4e908a2da88c94ae46e1f080149b3882 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Mon, 22 Jan 2024 15:44:05 +0100 Subject: [PATCH] - Set SO_LINGER in client socket if SSLSocket (https://issues.redhat.com/browse/JGRP-2748) - Added TUNNEL.linger --- src/org/jgroups/blocks/cs/TcpConnection.java | 15 ++++++--------- src/org/jgroups/protocols/TUNNEL.java | 7 ++++++- src/org/jgroups/stack/RouterStub.java | 14 +++++++++++--- src/org/jgroups/stack/RouterStubManager.java | 6 +++++- tests/junit/org/jgroups/tests/FlushTest.java | 2 +- 5 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/org/jgroups/blocks/cs/TcpConnection.java b/src/org/jgroups/blocks/cs/TcpConnection.java index a01c2156635..a49e9e757d5 100644 --- a/src/org/jgroups/blocks/cs/TcpConnection.java +++ b/src/org/jgroups/blocks/cs/TcpConnection.java @@ -48,6 +48,8 @@ public TcpConnection(Address peer_addr, TcpBaseServer server) throws Exception { this.sock=server.socketFactory().createSocket("jgroups.tcp.sock"); setSocketParameters(sock); last_access=getTimestamp(); // last time a message was sent or received (ns) + if(sock instanceof SSLSocket) // https://issues.redhat.com/browse/JGRP-2748 + sock.setSoLinger(true, 0); } /** Called by {@link TcpServer.Acceptor#handleAccept(Socket)} */ @@ -63,6 +65,8 @@ public TcpConnection(Socket s, TcpServer server) throws Exception { this.peer_addr=server.usePeerConnections()? readPeerAddress(s) : new IpAddress((InetSocketAddress)s.getRemoteSocketAddress()); last_access=getTimestamp(); // last time a message was sent or received (ns) + if(sock instanceof SSLSocket) // https://issues.redhat.com/browse/JGRP-2748 + sock.setSoLinger(true, 0); } public Address localAddress() { @@ -225,15 +229,8 @@ protected void setSocketParameters(Socket client_sock) throws SocketException { client_sock.setKeepAlive(true); client_sock.setTcpNoDelay(server.tcp_nodelay); - try { // todo: remove try-catch clause once https://github.com/oracle/graal/issues/1087 has been fixed - if(server.linger > 0) - client_sock.setSoLinger(true, server.linger); - else - client_sock.setSoLinger(false, -1); - } - catch(Throwable t) { - server.log().warn("%s: failed setting SO_LINGER option: %s", server.localAddress(), t); - } + if(server.linger > 0) + client_sock.setSoLinger(true, server.linger); } diff --git a/src/org/jgroups/protocols/TUNNEL.java b/src/org/jgroups/protocols/TUNNEL.java index 6b2b1cfd3a5..4ed1ca3eb57 100644 --- a/src/org/jgroups/protocols/TUNNEL.java +++ b/src/org/jgroups/protocols/TUNNEL.java @@ -72,6 +72,9 @@ public interface TUNNELPolicy { "GossipRouter is closed. Ignored when heartbeat_interval is 0.", type=AttributeType.TIME) protected long heartbeat_timeout; + @Property(description="SO_LINGER in seconds. Default of -1 disables it") + protected int linger=-1; // SO_LINGER (number of seconds, -1 disables it) + /* ------------------------------------------ Fields ----------------------------------------------------- */ protected final List gossip_routers=new ArrayList<>(); @@ -95,6 +98,8 @@ public TUNNEL() { public TUNNEL useNio(boolean use_nio) {this.use_nio=use_nio; return this;} public TLS tls() {return tls;} public TUNNEL tls(TLS t) {this.tls=t; return this;} + public int getLinger() {return linger;} + public TUNNEL setLinger(int l) {this.linger=l; return this;} /** We can simply send a message with dest == null and the GossipRouter will take care of routing it to all * members in the cluster */ @@ -215,7 +220,7 @@ public Object down(Event evt) { try { InetSocketAddress target=gr.isUnresolved()? new InetSocketAddress(gr.getHostString(), gr.getPort()) : new InetSocketAddress(gr.getAddress(), gr.getPort()); - stubManager.createAndRegisterStub(new InetSocketAddress(bind_addr, bind_port), target) + stubManager.createAndRegisterStub(new InetSocketAddress(bind_addr, bind_port), target, linger) .receiver(this).tcpNoDelay(tcp_nodelay); } catch(Throwable t) { diff --git a/src/org/jgroups/stack/RouterStub.java b/src/org/jgroups/stack/RouterStub.java index b3ab7a29cb4..cffb9eae7e6 100644 --- a/src/org/jgroups/stack/RouterStub.java +++ b/src/org/jgroups/stack/RouterStub.java @@ -41,7 +41,7 @@ public interface CloseListener {void closed(RouterStub stub);} // max number of ms to wait for socket establishment to GossipRouter protected int sock_conn_timeout=3000; protected boolean tcp_nodelay=true; - + protected int linger=-1; // SO_LINGER (number of seconds, -1 disables it) protected boolean handle_heartbeats; // timestamp of last heartbeat (or message from GossipRouter) protected volatile long last_heartbeat; @@ -50,6 +50,10 @@ public interface CloseListener {void closed(RouterStub stub);} protected final Map> get_members_map=new HashMap<>(); + public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boolean use_nio, CloseListener l, SocketFactory sf) { + this(local_sa, remote_sa, use_nio, l, sf, -1); + } + /** * Creates a stub to a remote_sa {@link GossipRouter}. * @param local_sa The local_sa bind address and port @@ -57,14 +61,16 @@ public interface CloseListener {void closed(RouterStub stub);} * @param use_nio Whether to use ({@link org.jgroups.protocols.TCP_NIO2}) or {@link org.jgroups.protocols.TCP} * @param l The {@link CloseListener} * @param sf The {@link SocketFactory} to use to create the client socket + * @param linger SO_LINGER timeout */ - public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boolean use_nio, CloseListener l, SocketFactory sf) { + public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boolean use_nio, CloseListener l, SocketFactory sf, int linger) { this.local=local_sa != null? new IpAddress(local_sa.getAddress(), local_sa.getPort()) : new IpAddress((InetAddress)null,0); this.remote_sa=Objects.requireNonNull(remote_sa); this.use_nio=use_nio; this.close_listener=l; this.socket_factory=sf; + this.linger=linger; if(resolveRemoteAddress()) // sets remote client=createClient(sf); } @@ -86,6 +92,8 @@ public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boole public RouterStub handleHeartbeats(boolean f) {handle_heartbeats=f; return this;} public boolean handleHeartbeats() {return handle_heartbeats;} public long lastHeartbeat() {return last_heartbeat;} + public int getLinger() {return linger;} + public RouterStub setLinger(int l) {this.linger=l; return this;} @@ -255,7 +263,7 @@ protected BaseServer createClient(SocketFactory sf) { if(sf != null) cl.socketFactory(sf); cl.receiver(this); cl.addConnectionListener(this); - cl.socketConnectionTimeout(sock_conn_timeout).tcpNodelay(tcp_nodelay); + cl.socketConnectionTimeout(sock_conn_timeout).tcpNodelay(tcp_nodelay).linger(linger); return cl; } diff --git a/src/org/jgroups/stack/RouterStubManager.java b/src/org/jgroups/stack/RouterStubManager.java index b112ce6b6e0..e0956658e6d 100644 --- a/src/org/jgroups/stack/RouterStubManager.java +++ b/src/org/jgroups/stack/RouterStubManager.java @@ -114,7 +114,11 @@ public void forAny(Consumer action) { public RouterStub createAndRegisterStub(InetSocketAddress local, InetSocketAddress router_addr) { - RouterStub stub=new RouterStub(local, router_addr, use_nio, this, socket_factory) + return createAndRegisterStub(local, router_addr, -1); + } + + public RouterStub createAndRegisterStub(InetSocketAddress local, InetSocketAddress router_addr, int linger) { + RouterStub stub=new RouterStub(local, router_addr, use_nio, this, socket_factory, linger) .handleHeartbeats(heartbeat_interval > 0); synchronized(stubs) { this.stubs.add(stub); diff --git a/tests/junit/org/jgroups/tests/FlushTest.java b/tests/junit/org/jgroups/tests/FlushTest.java index a4acc60e784..eaf85a73ead 100644 --- a/tests/junit/org/jgroups/tests/FlushTest.java +++ b/tests/junit/org/jgroups/tests/FlushTest.java @@ -141,7 +141,7 @@ public void testFlushWithCrashedFlushCoordinator() throws Exception { Util.shutdown(b); Stream.of(a,c).forEach(ch -> ch.getProtocolStack().findProtocol(FLUSH.class).setLevel("debug")); - Util.waitUntilAllChannelsHaveSameView(100000, 500, a,c); + Util.waitUntilAllChannelsHaveSameView(10000, 500, a,c); // cluster should not hang and two remaining members should have a correct view assert a.getView().size() == 2 : String.format("A's view: %s", a.getView());