Skip to content

Commit e01010d

Browse files
Oleh DokukaOlegDokuka
authored andcommitted
drafts initial version of standard router API
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent eab6754 commit e01010d

File tree

9 files changed

+540
-0
lines changed

9 files changed

+540
-0
lines changed

rsocket-router/build.gradle

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2015-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
plugins {
18+
id 'java-library'
19+
id 'maven-publish'
20+
id 'com.jfrog.artifactory'
21+
id 'com.jfrog.bintray'
22+
}
23+
24+
dependencies {
25+
api project(':rsocket-core')
26+
27+
implementation 'org.slf4j:slf4j-api'
28+
29+
testImplementation project(':rsocket-test')
30+
testImplementation 'org.junit.jupiter:junit-jupiter-api'
31+
testImplementation 'org.junit.jupiter:junit-jupiter-params'
32+
testImplementation 'org.mockito:mockito-core'
33+
testImplementation 'org.assertj:assertj-core'
34+
testImplementation 'io.projectreactor:reactor-test'
35+
36+
// TODO: Remove after JUnit5 migration
37+
testCompileOnly 'junit:junit'
38+
testImplementation 'org.hamcrest:hamcrest-library'
39+
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'
40+
testRuntimeOnly 'ch.qos.logback:logback-classic'
41+
}
42+
43+
description = 'Transparent Load Balancer for RSocket'
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.router;
17+
18+
import io.netty.buffer.ByteBuf;
19+
import io.rsocket.frame.FrameType;
20+
import io.rsocket.metadata.CompositeMetadata;
21+
import io.rsocket.metadata.CompositeMetadata.Entry;
22+
import io.rsocket.metadata.CompositeMetadata.WellKnownMimeTypeEntry;
23+
import io.rsocket.metadata.RoutingMetadata;
24+
import io.rsocket.metadata.WellKnownMimeType;
25+
import reactor.util.annotation.Nullable;
26+
27+
public class CompositeMetadataRouteCodec implements RouteCodec {
28+
29+
@Override
30+
@Nullable
31+
public Route decode(ByteBuf metadataByteBuf, FrameType requestType) {
32+
final CompositeMetadata compositeMetadata = new CompositeMetadata(metadataByteBuf, false);
33+
34+
String route = null;
35+
String mimeType = null;
36+
37+
for (Entry compositeMetadatum : compositeMetadata) {
38+
if (compositeMetadatum instanceof WellKnownMimeTypeEntry) {
39+
final WellKnownMimeTypeEntry wellKnownMimeTypeEntry = (WellKnownMimeTypeEntry) compositeMetadatum;
40+
final WellKnownMimeType type = wellKnownMimeTypeEntry.getType();
41+
42+
if (type == WellKnownMimeType.MESSAGE_RSOCKET_ROUTING) {
43+
final RoutingMetadata routingMetadata = new RoutingMetadata(compositeMetadatum.getContent());
44+
for(String routeEntry : routingMetadata) {
45+
route = routeEntry;
46+
break;
47+
}
48+
} else if (type == WellKnownMimeType.MESSAGE_RSOCKET_MIMETYPE) {
49+
// FIXME: once codecs are available
50+
// mimeType = compositeMetadatum
51+
}
52+
}
53+
}
54+
55+
if (route != null) {
56+
return new Route(requestType, route, mimeType);
57+
}
58+
59+
return null;
60+
}
61+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.rsocket.router;
2+
3+
import io.rsocket.Payload;
4+
import org.reactivestreams.Publisher;
5+
import reactor.core.publisher.Flux;
6+
import reactor.util.annotation.Nullable;
7+
8+
public interface HandlerFunction {
9+
10+
Route route();
11+
12+
@SuppressWarnings("rawtypes")
13+
default Publisher handle(Payload payload) {
14+
return handle(payload, null);
15+
}
16+
17+
@SuppressWarnings("rawtypes")
18+
Publisher handle(Payload firstPayload, @Nullable Flux<Payload> payloads);
19+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.rsocket.router;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
class ImmutableRoutingRSocket extends RoutingRSocket {
7+
8+
private final Map<Route, HandlerFunction> mapping;
9+
10+
ImmutableRoutingRSocket(Map<Route, HandlerFunction> mapping, RouteCodec routeCodec) {
11+
super(routeCodec);
12+
this.mapping = mapping;
13+
}
14+
15+
@Override
16+
protected HandlerFunction handlerFor(Route route) {
17+
return mapping.get(route);
18+
}
19+
20+
static final class ImmutableRouterBuilder implements RoutingRSocket.Builder<ImmutableRouterBuilder> {
21+
22+
final HashMap<Route, HandlerFunction> mapping = new HashMap<>();
23+
final RouteCodec routeCodec;
24+
25+
ImmutableRouterBuilder(RouteCodec routeCodec) {
26+
this.routeCodec = routeCodec;
27+
}
28+
29+
@Override
30+
public ImmutableRouterBuilder addHandler(HandlerFunction handler) {
31+
this.mapping.put(handler.route(), handler);
32+
33+
return this;
34+
}
35+
36+
@Override
37+
public RoutingRSocket build() {
38+
return new ImmutableRoutingRSocket(this.mapping, routeCodec);
39+
}
40+
}
41+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.rsocket.router;
2+
3+
import io.rsocket.frame.FrameType;
4+
import reactor.util.annotation.Nullable;
5+
6+
public final class Route {
7+
8+
final String route;
9+
final String mimeType;
10+
final FrameType requestType;
11+
12+
public Route(FrameType requestType, String route) {
13+
this(requestType, route, null);
14+
}
15+
16+
public Route(FrameType requestType, String route, @Nullable String mimeType) {
17+
this.route = route;
18+
this.mimeType = mimeType;
19+
this.requestType = requestType;
20+
}
21+
22+
public String route() {
23+
return this.route;
24+
}
25+
26+
@Nullable
27+
public String mimeType() {
28+
return this.mimeType;
29+
}
30+
31+
public FrameType requestType() {
32+
return requestType;
33+
}
34+
35+
@Override
36+
public boolean equals(Object o) {
37+
if (this == o) {
38+
return true;
39+
}
40+
if (o == null || getClass() != o.getClass()) {
41+
return false;
42+
}
43+
44+
Route route1 = (Route) o;
45+
46+
if (!route.equals(route1.route)) {
47+
return false;
48+
}
49+
if (mimeType != null ? !mimeType.equals(route1.mimeType) : route1.mimeType != null) {
50+
return false;
51+
}
52+
return requestType == route1.requestType;
53+
}
54+
55+
@Override
56+
public int hashCode() {
57+
int result = route.hashCode();
58+
result = 31 * result + (mimeType != null ? mimeType.hashCode() : 0);
59+
result = 31 * result + requestType.hashCode();
60+
return result;
61+
}
62+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.router;
17+
18+
import io.netty.buffer.ByteBuf;
19+
import io.rsocket.frame.FrameType;
20+
import reactor.util.annotation.Nullable;
21+
22+
public interface RouteCodec {
23+
24+
@Nullable
25+
Route decode(ByteBuf metadataByteBuf, FrameType requestType);
26+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package io.rsocket.router;
2+
3+
import io.rsocket.Payload;
4+
import io.rsocket.frame.FrameType;
5+
import java.util.function.BiFunction;
6+
import java.util.function.Consumer;
7+
import java.util.function.Function;
8+
import org.reactivestreams.Publisher;
9+
import reactor.core.publisher.Flux;
10+
import reactor.core.publisher.Mono;
11+
import reactor.util.annotation.Nullable;
12+
13+
public final class Router {
14+
15+
public static RequestSpec route(String route) {
16+
return route(route, null);
17+
}
18+
19+
public static RequestSpec route(String route, @Nullable String mimeType) {
20+
return new RequestSpec(route, mimeType);
21+
}
22+
23+
@SuppressWarnings("rawtypes")
24+
static final class RequestSpec {
25+
final String route;
26+
final String mimeType;
27+
28+
RequestSpec(String route, @Nullable String mimeType) {
29+
this.route = route;
30+
this.mimeType = mimeType;
31+
}
32+
33+
public HandlerFunction fireAndForget(Function<Payload, Mono<Void>> handler) {
34+
final Route route = new Route(FrameType.REQUEST_FNF, this.route, mimeType);
35+
return new HandlerFunction() {
36+
@Override
37+
public Route route() {
38+
return route;
39+
}
40+
41+
@Override
42+
public Publisher handle(Payload firstPayload, Flux<Payload> payloads) {
43+
return handler.apply(firstPayload);
44+
}
45+
};
46+
}
47+
48+
public HandlerFunction fireAndForget(Consumer<Payload> handler) {
49+
final Route route = new Route(FrameType.REQUEST_FNF, this.route, this.mimeType);
50+
return new HandlerFunction() {
51+
@Override
52+
public Route route() {
53+
return route;
54+
}
55+
56+
@Override
57+
public Publisher handle(Payload firstPayload, Flux<Payload> payloads) {
58+
handler.accept(firstPayload);
59+
return Mono.empty();
60+
}
61+
};
62+
}
63+
64+
public HandlerFunction requestResponse(Function<Payload, Mono<Payload>> handler) {
65+
final Route route = new Route(FrameType.REQUEST_RESPONSE, this.route, this.mimeType);
66+
return new HandlerFunction() {
67+
@Override
68+
public Route route() {
69+
return route;
70+
}
71+
72+
@Override
73+
public Publisher handle(Payload firstPayload, Flux<Payload> payloads) {
74+
return handler.apply(firstPayload);
75+
}
76+
};
77+
}
78+
79+
public HandlerFunction requestStream(Function<Payload, Flux<Payload>> handler) {
80+
final Route route = new Route(FrameType.REQUEST_STREAM, this.route, this.mimeType);
81+
return new HandlerFunction() {
82+
@Override
83+
public Route route() {
84+
return route;
85+
}
86+
87+
@Override
88+
public Publisher handle(Payload firstPayload, Flux<Payload> payloads) {
89+
return handler.apply(firstPayload);
90+
}
91+
};
92+
}
93+
94+
public HandlerFunction requestChannel(Function<Flux<Payload>, Flux<Payload>> handler) {
95+
final Route route = new Route(FrameType.REQUEST_CHANNEL, this.route, this.mimeType);
96+
return new HandlerFunction() {
97+
@Override
98+
public Route route() {
99+
return route;
100+
}
101+
102+
@Override
103+
public Publisher handle(Payload firstPayload, Flux<Payload> payloads) {
104+
return handler.apply(payloads);
105+
}
106+
};
107+
}
108+
109+
public HandlerFunction requestChannel(BiFunction<Payload, Flux<Payload>, Flux<Payload>> handler) {
110+
final Route route = new Route(FrameType.REQUEST_CHANNEL, this.route, this.mimeType);
111+
return new HandlerFunction() {
112+
@Override
113+
public Route route() {
114+
return route;
115+
}
116+
117+
@Override
118+
public Publisher handle(Payload firstPayload, Flux<Payload> payloads) {
119+
return handler.apply(firstPayload, payloads);
120+
}
121+
};
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)