Skip to content

Commit 4683fbb

Browse files
Oleh DokukaOlegDokuka
Oleh Dokuka
authored andcommitted
drafts initial version of standard router API
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent eab6754 commit 4683fbb

9 files changed

+616
-0
lines changed

rsocket-router/build.gradle

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2015-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
plugins {
18+
id 'java-library'
19+
id 'maven-publish'
20+
id 'com.jfrog.artifactory'
21+
id 'com.jfrog.bintray'
22+
}
23+
24+
dependencies {
25+
api project(':rsocket-core')
26+
27+
implementation 'org.slf4j:slf4j-api'
28+
29+
testImplementation project(':rsocket-test')
30+
testImplementation 'org.junit.jupiter:junit-jupiter-api'
31+
testImplementation 'org.junit.jupiter:junit-jupiter-params'
32+
testImplementation 'org.mockito:mockito-core'
33+
testImplementation 'org.assertj:assertj-core'
34+
testImplementation 'io.projectreactor:reactor-test'
35+
36+
// TODO: Remove after JUnit5 migration
37+
testCompileOnly 'junit:junit'
38+
testImplementation 'org.hamcrest:hamcrest-library'
39+
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'
40+
testRuntimeOnly 'ch.qos.logback:logback-classic'
41+
}
42+
43+
description = 'Transparent Load Balancer for RSocket'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2015-Present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.router;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.rsocket.frame.FrameType;
21+
import io.rsocket.metadata.CompositeMetadata;
22+
import io.rsocket.metadata.CompositeMetadata.Entry;
23+
import io.rsocket.metadata.CompositeMetadata.WellKnownMimeTypeEntry;
24+
import io.rsocket.metadata.RoutingMetadata;
25+
import io.rsocket.metadata.WellKnownMimeType;
26+
import reactor.util.annotation.Nullable;
27+
28+
public class CompositeMetadataRouteCodec implements RouteCodec {
29+
30+
@Override
31+
@Nullable
32+
public Route decode(ByteBuf metadataByteBuf, FrameType requestType) {
33+
final CompositeMetadata compositeMetadata = new CompositeMetadata(metadataByteBuf, false);
34+
35+
String route = null;
36+
String mimeType = null;
37+
38+
for (Entry compositeMetadatum : compositeMetadata) {
39+
if (compositeMetadatum instanceof WellKnownMimeTypeEntry) {
40+
final WellKnownMimeTypeEntry wellKnownMimeTypeEntry =
41+
(WellKnownMimeTypeEntry) compositeMetadatum;
42+
final WellKnownMimeType type = wellKnownMimeTypeEntry.getType();
43+
44+
if (type == WellKnownMimeType.MESSAGE_RSOCKET_ROUTING) {
45+
final RoutingMetadata routingMetadata =
46+
new RoutingMetadata(compositeMetadatum.getContent());
47+
for (String routeEntry : routingMetadata) {
48+
route = routeEntry;
49+
break;
50+
}
51+
} else if (type == WellKnownMimeType.MESSAGE_RSOCKET_MIMETYPE) {
52+
// FIXME: once codecs are available
53+
// mimeType = compositeMetadatum
54+
}
55+
}
56+
}
57+
58+
if (route != null) {
59+
return new Route(requestType, route, mimeType);
60+
}
61+
62+
return null;
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2015-Present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.router;
18+
19+
import io.rsocket.Payload;
20+
import org.reactivestreams.Publisher;
21+
import reactor.core.publisher.Flux;
22+
import reactor.util.annotation.Nullable;
23+
24+
public interface HandlerFunction {
25+
26+
Route route();
27+
28+
@SuppressWarnings("rawtypes")
29+
default Publisher handle(Payload payload) {
30+
return handle(payload, null);
31+
}
32+
33+
@SuppressWarnings("rawtypes")
34+
Publisher handle(Payload firstPayload, @Nullable Flux<Payload> payloads);
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2015-Present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.router;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
class ImmutableRoutingRSocket extends RoutingRSocket {
23+
24+
private final Map<Route, HandlerFunction> mapping;
25+
26+
ImmutableRoutingRSocket(Map<Route, HandlerFunction> mapping, RouteCodec routeCodec) {
27+
super(routeCodec);
28+
this.mapping = mapping;
29+
}
30+
31+
@Override
32+
protected HandlerFunction handlerFor(Route route) {
33+
return mapping.get(route);
34+
}
35+
36+
static final class ImmutableRouterBuilder
37+
implements RoutingRSocket.Builder<ImmutableRouterBuilder> {
38+
39+
final HashMap<Route, HandlerFunction> mapping = new HashMap<>();
40+
final RouteCodec routeCodec;
41+
42+
ImmutableRouterBuilder(RouteCodec routeCodec) {
43+
this.routeCodec = routeCodec;
44+
}
45+
46+
@Override
47+
public ImmutableRouterBuilder addHandler(HandlerFunction handler) {
48+
this.mapping.put(handler.route(), handler);
49+
50+
return this;
51+
}
52+
53+
@Override
54+
public RoutingRSocket build() {
55+
return new ImmutableRoutingRSocket(this.mapping, routeCodec);
56+
}
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2015-Present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.router;
18+
19+
import io.rsocket.frame.FrameType;
20+
import reactor.util.annotation.Nullable;
21+
22+
public final class Route {
23+
24+
final String route;
25+
final String mimeType;
26+
final FrameType requestType;
27+
28+
public Route(FrameType requestType, String route) {
29+
this(requestType, route, null);
30+
}
31+
32+
public Route(FrameType requestType, String route, @Nullable String mimeType) {
33+
this.route = route;
34+
this.mimeType = mimeType;
35+
this.requestType = requestType;
36+
}
37+
38+
public String route() {
39+
return this.route;
40+
}
41+
42+
@Nullable
43+
public String mimeType() {
44+
return this.mimeType;
45+
}
46+
47+
public FrameType requestType() {
48+
return requestType;
49+
}
50+
51+
@Override
52+
public boolean equals(Object o) {
53+
if (this == o) {
54+
return true;
55+
}
56+
if (o == null || getClass() != o.getClass()) {
57+
return false;
58+
}
59+
60+
Route route1 = (Route) o;
61+
62+
if (!route.equals(route1.route)) {
63+
return false;
64+
}
65+
if (mimeType != null ? !mimeType.equals(route1.mimeType) : route1.mimeType != null) {
66+
return false;
67+
}
68+
return requestType == route1.requestType;
69+
}
70+
71+
@Override
72+
public int hashCode() {
73+
int result = route.hashCode();
74+
result = 31 * result + (mimeType != null ? mimeType.hashCode() : 0);
75+
result = 31 * result + requestType.hashCode();
76+
return result;
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2015-Present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.router;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.rsocket.frame.FrameType;
21+
import reactor.util.annotation.Nullable;
22+
23+
public interface RouteCodec {
24+
25+
@Nullable
26+
Route decode(ByteBuf metadataByteBuf, FrameType requestType);
27+
}

0 commit comments

Comments
 (0)