From d8811c605f6caec4800ab4d2fb5eef8dfe292c53 Mon Sep 17 00:00:00 2001 From: atshow Date: Thu, 27 May 2021 11:06:00 +0800 Subject: [PATCH 1/4] rsocket object support get local address and remote address Signed-off-by: atshow --- .../java/io/rsocket/DuplexConnection.java | 11 +++++++ .../src/main/java/io/rsocket/RSocket.java | 31 +++++++++++++++++++ .../core/ClientServerInputMultiplexer.java | 5 +++ .../rsocket/core/LoggingDuplexConnection.java | 5 +++ .../io/rsocket/core/RSocketRequester.java | 12 +++++++ .../io/rsocket/core/RSocketResponder.java | 12 +++++++ .../core/SetupHandlingDuplexConnection.java | 5 +++ .../resume/ResumableDuplexConnection.java | 12 +++++++ .../java/io/rsocket/util/RSocketProxy.java | 12 +++++++ .../test/util/LocalDuplexConnection.java | 5 +++ .../io/rsocket/test/util/MockRSocket.java | 12 +++++++ .../test/util/TestDuplexConnection.java | 5 +++ .../client/LoadBalancedRSocketMono.java | 12 +++++++ .../client/filter/BackupRequestSocket.java | 12 +++++++ .../client/filter/RSocketSupplier.java | 12 +++++++ .../MicrometerDuplexConnection.java | 5 +++ .../rsocket/micrometer/MicrometerRSocket.java | 12 +++++++ .../java/io/rsocket/test/TransportTest.java | 10 ++++++ .../local/LocalDuplexConnection.java | 5 +++ .../transport/netty/TcpDuplexConnection.java | 5 +++ .../netty/WebsocketDuplexConnection.java | 5 +++ 21 files changed, 205 insertions(+) diff --git a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java index fe91f4bf0..7f8e2b43a 100644 --- a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java @@ -75,6 +75,17 @@ public interface DuplexConnection extends Availability, Closeable { */ ByteBufAllocator alloc(); + /** + * Return the local address that this connection is connected to. The returned {@link + * SocketAddress} varies by transport type and should be downcast to obtain more detailed + * information. For TCP and WebSocket, the address type is {@link java.net.InetSocketAddress}. For + * local transport, it is {@link io.rsocket.transport.local.LocalSocketAddress}. + * + * @return the address + * @since 1.1 + */ + SocketAddress localAddress(); + /** * Return the remote address that this connection is connected to. The returned {@link * SocketAddress} varies by transport type and should be downcast to obtain more detailed diff --git a/rsocket-core/src/main/java/io/rsocket/RSocket.java b/rsocket-core/src/main/java/io/rsocket/RSocket.java index b05241365..2bf9b04e1 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocket.java @@ -19,6 +19,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.SocketAddress; /** * A contract providing different interaction models for metadataPush(Payload payload) { return RSocketAdapter.metadataPush(payload); } + /** + * Returns the local address where this channel is bound to. The returned + * {@link SocketAddress} is supposed to be down-cast into more concrete + * type such as {@link java.net.InetSocketAddress} to retrieve the detailed + * information. + * + * @return the local address of this channel. + * {@code null} if this channel is not bound. + */ + default SocketAddress localAddress() { + return null; + } + + /** + * Returns the remote address where this channel is connected to. The + * returned {@link SocketAddress} is supposed to be down-cast into more + * concrete type such as {@link java.net.InetSocketAddress} to retrieve the detailed + * information. + * + * @return the remote address of this channel. + * {@code null} if this channel is not connected. + * If this channel is not connected but it can receive messages + * from arbitrary remote addresses to determine + * the origination of the received message as this method will + * return {@code null}. + */ + default SocketAddress remoteAddress() { + return null; + } + @Override default double availability() { return isDisposed() ? 0.0 : 1.0; diff --git a/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java b/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java index d6cb46d98..3e4fcc01d 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java @@ -276,6 +276,11 @@ public ByteBufAllocator alloc() { return source.alloc(); } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + @Override public SocketAddress remoteAddress() { return source.remoteAddress(); diff --git a/rsocket-core/src/main/java/io/rsocket/core/LoggingDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/core/LoggingDuplexConnection.java index 7b5d8f6c2..b2cb23dfe 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/LoggingDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/core/LoggingDuplexConnection.java @@ -57,6 +57,11 @@ public ByteBufAllocator alloc() { return source.alloc(); } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + @Override public SocketAddress remoteAddress() { return source.remoteAddress(); diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index c5853531b..7e084758c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -34,6 +34,8 @@ import io.rsocket.keepalive.KeepAliveHandler; import io.rsocket.keepalive.KeepAliveSupport; import io.rsocket.plugins.RequestInterceptor; + +import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; @@ -174,6 +176,16 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) { return nextStreamId; } + @Override + public SocketAddress localAddress() { + return getDuplexConnection().localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return getDuplexConnection().remoteAddress(); + } + @Override public double availability() { final RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker; diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 969353bd6..080aff160 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -32,6 +32,8 @@ import io.rsocket.frame.RequestStreamFrameCodec; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.plugins.RequestInterceptor; + +import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -151,6 +153,16 @@ public Mono metadataPush(Payload payload) { } } + @Override + public SocketAddress localAddress() { + return getDuplexConnection().localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return getDuplexConnection().remoteAddress(); + } + @Override public void dispose() { tryTerminate(() -> new CancellationException("Disposed")); diff --git a/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java index b6bc87513..36c6dd914 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java @@ -63,6 +63,11 @@ public Flux receive() { return this; } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + @Override public SocketAddress remoteAddress() { return source.remoteAddress(); diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index 6e90e6d63..de1310f6c 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -47,6 +47,7 @@ public class ResumableDuplexConnection extends Flux final UnboundedProcessor savableFramesSender; final Disposable framesSaverDisposable; final Sinks.Empty onClose; + final SocketAddress localAddress; final SocketAddress remoteAddress; final Sinks.Many onConnectionClosedSink; @@ -73,6 +74,7 @@ public ResumableDuplexConnection( this.savableFramesSender = new UnboundedProcessor(); this.framesSaverDisposable = resumableFramesStore.saveFrames(savableFramesSender).subscribe(); this.onClose = Sinks.empty(); + this.localAddress = initialConnection.localAddress(); this.remoteAddress = initialConnection.remoteAddress(); ACTIVE_CONNECTION.lazySet(this, initialConnection); @@ -219,6 +221,11 @@ public boolean isDisposed() { return onClose.scan(Scannable.Attr.TERMINATED) || onClose.scan(Scannable.Attr.CANCELLED); } + @Override + public SocketAddress localAddress() { + return localAddress; + } + @Override public SocketAddress remoteAddress() { return remoteAddress; @@ -278,6 +285,11 @@ public ByteBufAllocator alloc() { return ByteBufAllocator.DEFAULT; } + @Override + public SocketAddress localAddress() { + return null; + } + @Override @SuppressWarnings("ConstantConditions") public SocketAddress remoteAddress() { diff --git a/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java b/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java index 518b727c1..22439ab16 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java +++ b/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java @@ -22,6 +22,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.SocketAddress; + /** Wrapper/Proxy for a RSocket. This is useful when we want to override a specific method. */ public class RSocketProxy implements RSocket { protected final RSocket source; @@ -55,6 +57,16 @@ public Mono metadataPush(Payload payload) { return source.metadataPush(payload); } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return source.remoteAddress(); + } + @Override public double availability() { return source.availability(); diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java index cdfcefdc8..666eca165 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java @@ -101,6 +101,11 @@ public ByteBufAllocator alloc() { return allocator; } + @Override + public SocketAddress localAddress() { + return new TestLocalSocketAddress(name); + } + @Override public SocketAddress remoteAddress() { return new TestLocalSocketAddress(name); diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java b/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java index 179afff58..9bba17ec8 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java @@ -21,6 +21,8 @@ import io.rsocket.Payload; import io.rsocket.RSocket; + +import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -71,6 +73,16 @@ public final Mono metadataPush(Payload payload) { return delegate.metadataPush(payload).doOnSubscribe(s -> pushCount.incrementAndGet()); } + @Override + public SocketAddress localAddress() { + return delegate.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return delegate.remoteAddress(); + } + @Override public double availability() { return delegate.availability(); diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java index 8793d6ca4..59283e244 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java @@ -124,6 +124,11 @@ public LeaksTrackingByteBufAllocator alloc() { return allocator; } + @Override + public SocketAddress localAddress() { + return new TestLocalSocketAddress("TestDuplexConnection"); + } + @Override public SocketAddress remoteAddress() { return new TestLocalSocketAddress("TestDuplexConnection"); diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java index 6329da826..84953cebf 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java @@ -23,6 +23,8 @@ import io.rsocket.stat.Median; import io.rsocket.stat.Quantile; import io.rsocket.util.Clock; + +import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.ArrayList; @@ -518,6 +520,16 @@ public Mono metadataPush(Payload payload) { return errorVoid; } + @Override + public SocketAddress localAddress() { + throw new RuntimeException(NoAvailableRSocketException.INSTANCE); + } + + @Override + public SocketAddress remoteAddress() { + throw new RuntimeException(NoAvailableRSocketException.INSTANCE); + } + @Override public double availability() { return 0; diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java index beb424797..d6e31b4e6 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java @@ -21,6 +21,8 @@ import io.rsocket.stat.FrugalQuantile; import io.rsocket.stat.Quantile; import io.rsocket.util.Clock; + +import java.net.SocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -84,6 +86,16 @@ public Mono metadataPush(Payload payload) { return child.metadataPush(payload); } + @Override + public SocketAddress localAddress() { + return child.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return child.remoteAddress(); + } + @Override public double availability() { return child.availability(); diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java index aaf9f71e6..63f5f307f 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java @@ -23,6 +23,8 @@ import io.rsocket.stat.Ewma; import io.rsocket.util.Clock; import io.rsocket.util.RSocketProxy; + +import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.reactivestreams.Publisher; @@ -149,6 +151,16 @@ public Mono metadataPush(Payload payload) { .doOnSuccess(v -> updateErrorPercentage(1.0)); } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return source.remoteAddress(); + } + @Override public double availability() { // If the window is expired set success and failure to zero and return diff --git a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java index 7c7ac37b9..c9e100ea2 100644 --- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java +++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java @@ -89,6 +89,11 @@ public ByteBufAllocator alloc() { return delegate.alloc(); } + @Override + public SocketAddress localAddress() { + return delegate.localAddress(); + } + @Override public SocketAddress remoteAddress() { return delegate.remoteAddress(); diff --git a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java index 9e1abbc03..6fa0deb30 100644 --- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java +++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java @@ -29,6 +29,8 @@ import io.micrometer.core.instrument.Timer.Sample; import io.rsocket.Payload; import io.rsocket.RSocket; + +import java.net.SocketAddress; import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -95,6 +97,16 @@ public Mono metadataPush(Payload payload) { return delegate.metadataPush(payload).doFinally(metadataPush); } + @Override + public SocketAddress localAddress() { + return delegate.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return delegate.remoteAddress(); + } + @Override public Mono onClose() { return delegate.onClose(); diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index 0bae8cd69..3fcad1d2c 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -679,6 +679,11 @@ public ByteBufAllocator alloc() { return duplexConnection.alloc(); } + @Override + public SocketAddress localAddress() { + return duplexConnection.localAddress(); + } + @Override public SocketAddress remoteAddress() { return duplexConnection.remoteAddress(); @@ -754,6 +759,11 @@ public ByteBufAllocator alloc() { return source.alloc(); } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + @Override public SocketAddress remoteAddress() { return source.remoteAddress(); diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java index 5e18aa4cc..1a2176dfa 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java @@ -110,6 +110,11 @@ public ByteBufAllocator alloc() { return allocator; } + @Override + public SocketAddress localAddress() { + return address; + } + @Override public SocketAddress remoteAddress() { return address; diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java index f9ac705b1..a1783b428 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java @@ -57,6 +57,11 @@ public ByteBufAllocator alloc() { return connection.channel().alloc(); } + @Override + public SocketAddress localAddress() { + return connection.channel().localAddress(); + } + @Override public SocketAddress remoteAddress() { return connection.channel().remoteAddress(); diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index c81f040da..fc0d55ed3 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -62,6 +62,11 @@ public ByteBufAllocator alloc() { return connection.channel().alloc(); } + @Override + public SocketAddress localAddress() { + return connection.channel().localAddress(); + } + @Override public SocketAddress remoteAddress() { return connection.channel().remoteAddress(); From 0876b08d9b207b64a847f22947f70b615bffbe4e Mon Sep 17 00:00:00 2001 From: atshow Date: Thu, 27 May 2021 13:56:01 +0800 Subject: [PATCH 2/4] format code style Signed-off-by: atshow --- rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java | 1 - rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java | 1 - rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java | 1 - rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java | 1 - .../src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java | 1 - .../main/java/io/rsocket/client/filter/BackupRequestSocket.java | 1 - .../src/main/java/io/rsocket/client/filter/RSocketSupplier.java | 1 - .../src/main/java/io/rsocket/micrometer/MicrometerRSocket.java | 1 - 8 files changed, 8 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 7e084758c..f3b0c5905 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -34,7 +34,6 @@ import io.rsocket.keepalive.KeepAliveHandler; import io.rsocket.keepalive.KeepAliveSupport; import io.rsocket.plugins.RequestInterceptor; - import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 080aff160..c9232fcd0 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -32,7 +32,6 @@ import io.rsocket.frame.RequestStreamFrameCodec; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.plugins.RequestInterceptor; - import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.concurrent.CancellationException; diff --git a/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java b/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java index 22439ab16..81d0b242a 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java +++ b/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java @@ -21,7 +21,6 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; - import java.net.SocketAddress; /** Wrapper/Proxy for a RSocket. This is useful when we want to override a specific method. */ diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java b/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java index 9bba17ec8..db0daa1f7 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java @@ -21,7 +21,6 @@ import io.rsocket.Payload; import io.rsocket.RSocket; - import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Publisher; diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java index 84953cebf..2509e328a 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java @@ -23,7 +23,6 @@ import io.rsocket.stat.Median; import io.rsocket.stat.Quantile; import io.rsocket.util.Clock; - import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.time.Duration; diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java index d6e31b4e6..26160955d 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java @@ -21,7 +21,6 @@ import io.rsocket.stat.FrugalQuantile; import io.rsocket.stat.Quantile; import io.rsocket.util.Clock; - import java.net.SocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java index 63f5f307f..df4fe2507 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java @@ -23,7 +23,6 @@ import io.rsocket.stat.Ewma; import io.rsocket.util.Clock; import io.rsocket.util.RSocketProxy; - import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; diff --git a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java index 6fa0deb30..faed8b785 100644 --- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java +++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java @@ -29,7 +29,6 @@ import io.micrometer.core.instrument.Timer.Sample; import io.rsocket.Payload; import io.rsocket.RSocket; - import java.net.SocketAddress; import java.util.Objects; import java.util.function.BiConsumer; From a7ba79b2bd84c769653b4bcfd13faa9320675c87 Mon Sep 17 00:00:00 2001 From: atshow Date: Fri, 28 May 2021 11:18:03 +0800 Subject: [PATCH 3/4] format code style Signed-off-by: atshow --- rsocket-core/src/main/java/io/rsocket/DuplexConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java index 7f8e2b43a..2dc235355 100644 --- a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java @@ -82,7 +82,7 @@ public interface DuplexConnection extends Availability, Closeable { * local transport, it is {@link io.rsocket.transport.local.LocalSocketAddress}. * * @return the address - * @since 1.1 + * */ SocketAddress localAddress(); From f7038e6e1629e5516cd80c54cfae334f8d43219b Mon Sep 17 00:00:00 2001 From: atshow Date: Fri, 28 May 2021 17:01:45 +0800 Subject: [PATCH 4/4] code comment modification Signed-off-by: atshow --- rsocket-core/src/main/java/io/rsocket/DuplexConnection.java | 2 +- rsocket-core/src/main/java/io/rsocket/RSocket.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java index 2dc235355..db4eb3ec2 100644 --- a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java @@ -82,7 +82,7 @@ public interface DuplexConnection extends Availability, Closeable { * local transport, it is {@link io.rsocket.transport.local.LocalSocketAddress}. * * @return the address - * + * @since 1.1.1 */ SocketAddress localAddress(); diff --git a/rsocket-core/src/main/java/io/rsocket/RSocket.java b/rsocket-core/src/main/java/io/rsocket/RSocket.java index 2bf9b04e1..d4da18d9f 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocket.java @@ -88,6 +88,7 @@ default Mono metadataPush(Payload payload) { * * @return the local address of this channel. * {@code null} if this channel is not bound. + * @since 1.1.1 */ default SocketAddress localAddress() { return null; @@ -105,6 +106,7 @@ default SocketAddress localAddress() { * from arbitrary remote addresses to determine * the origination of the received message as this method will * return {@code null}. + * @since 1.1.1 */ default SocketAddress remoteAddress() { return null;