diff --git a/build.gradle b/build.gradle index cdb1878..664e33b 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,9 @@ buildscript { } allprojects { - repositories { mavenCentral() } + repositories { + mavenCentral() + } } apply from: file('gradle/convention.gradle') @@ -17,9 +19,21 @@ subprojects { sourceSets.test.java.srcDir 'src/main/java' + dependencies { + compile "io.netty:netty-common:${netty_version}" + compile "io.netty:netty-transport:${netty_version}" + compile "com.netflix.rxnetty:rx-netty:${rxnetty_version}" + compile "com.netflix.rxnetty:rx-netty-servo:${rxnetty_version}" + compile "org.slf4j:slf4j-api:${slf4j_version}" + runtime "org.slf4j:slf4j-simple:${slf4j_version}" + compile "com.netflix.numerus:numerus:1.1" + testCompile "junit:junit-dep:4.10" + testCompile "org.mockito:mockito-core:1.8.5" + } + tasks.withType(Javadoc).each { it.classpath = sourceSets.main.compileClasspath - } + } } if (JavaVersion.current().isJava8Compatible()) { diff --git a/gradle.properties b/gradle.properties index 85d8530..7cec7c9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1,6 @@ version=0.1-SNAPSHOT + +slf4j_version=1.7.6 +netty_version=4.0.21.Final +rxnetty_version=0.3.8 +ribbon_version=2.0-RC1 diff --git a/settings.gradle b/settings.gradle index 222d4e7..2a05a98 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,4 +4,5 @@ include 'ws-backend-mock', \ 'ws-impls:ws-impl-utils', \ 'ws-impls:ws-java-servlet-blocking', \ 'ws-impls:ws-java-jetty', \ -'ws-impls:ws-java-rxnetty' +'ws-impls:ws-java-rxnetty', \ +'ws-impls:ws-java-ribbon' \ No newline at end of file diff --git a/ws-backend-mock/build.gradle b/ws-backend-mock/build.gradle index f120980..2244e3e 100644 --- a/ws-backend-mock/build.gradle +++ b/ws-backend-mock/build.gradle @@ -6,13 +6,8 @@ sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 dependencies { - compile 'com.netflix.rxnetty:rx-netty:0.3.8' compile 'com.netflix.numerus:numerus:1.1' - compile 'com.netflix.rxjava:rxjava-core:0.18.2' compile 'org.codehaus.jackson:jackson-core-asl:1.9.2' - compile 'org.slf4j:slf4j-api:1.7.0' - testCompile 'junit:junit-dep:4.10' - testCompile 'org.mockito:mockito-core:1.8.5' } /** diff --git a/ws-backend-mock/src/main/java/perf/backend/MockResponse.java b/ws-backend-mock/src/main/java/perf/backend/MockResponse.java index 2a827d9..dacb048 100644 --- a/ws-backend-mock/src/main/java/perf/backend/MockResponse.java +++ b/ws-backend-mock/src/main/java/perf/backend/MockResponse.java @@ -4,7 +4,6 @@ import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; import rx.Observable; import rx.Scheduler.Worker; @@ -44,16 +43,10 @@ private MockResponse() { } /** - * - * @param id - * ID from client used to assert correct client/server interaction - * @param delay - * How long to delay delivery to simulate server-side latency - * @param itemSize - * Length of each item String. - * @param numItems - * Number of items in response. - * + * @param id ID from client used to assert correct client/server interaction + * @param delay How long to delay delivery to simulate server-side latency + * @param itemSize Length of each item String. + * @param numItems Number of items in response. * @return String json */ public static Observable generateJson(long id, int delay, int itemSize, int numItems) { @@ -62,33 +55,8 @@ public static Observable generateJson(long id, int delay, int itemSize, subscriber.add(worker); worker.schedule(() -> { try { - ByteBuf buffer = Unpooled.buffer(); - ByteBufOutputStream jsonAsBytes = new ByteBufOutputStream(buffer); - JsonGenerator json = jsonFactory.createJsonGenerator(jsonAsBytes); - - json.writeStartObject(); - - // manipulate the ID such that we can know the response is from the server (client will know the logic) - long responseKey = getResponseKey(id); - - json.writeNumberField("responseKey", responseKey); - - json.writeNumberField("delay", delay); - if (itemSize > MAX_ITEM_LENGTH) { - throw new IllegalArgumentException("itemSize can not be larger than: " + MAX_ITEM_LENGTH); - } - json.writeNumberField("itemSize", itemSize); - json.writeNumberField("numItems", numItems); - - json.writeArrayFieldStart("items"); - for (int i = 0; i < numItems; i++) { - json.writeString(RAW_ITEM_LONG.substring(0, itemSize)); - } - json.writeEndArray(); - json.writeEndObject(); - json.close(); - - subscriber.onNext(buffer); + ByteBuf byteBuf = createJsonResponse(id, delay, itemSize, numItems, false); + subscriber.onNext(byteBuf); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); @@ -97,7 +65,49 @@ public static Observable generateJson(long id, int delay, int itemSize, }); } - /* package */static long getResponseKey(long id) { + public static Observable generateFallbackJson(long id, int delay, int itemSize, int numItems) { + try { + ByteBuf byteBuf = createJsonResponse(id, delay, itemSize, numItems, true); + return Observable.just(byteBuf); + } catch (IOException e) { + // We do not expect to get here + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public static ByteBuf createJsonResponse(long id, int delay, int itemSize, int numItems, boolean fallback) throws IOException { + ByteBuf buffer = Unpooled.buffer(); + ByteBufOutputStream jsonAsBytes = new ByteBufOutputStream(buffer); + JsonGenerator json = jsonFactory.createJsonGenerator(jsonAsBytes); + + json.writeStartObject(); + + // manipulate the ID such that we can know the response is from the server (client will know the logic) + long responseKey = getResponseKey(id); + + json.writeNumberField("responseKey", responseKey); + + json.writeNumberField("delay", delay); + if (itemSize > MAX_ITEM_LENGTH) { + throw new IllegalArgumentException("itemSize can not be larger than: " + MAX_ITEM_LENGTH); + } + json.writeNumberField("itemSize", itemSize); + json.writeNumberField("numItems", numItems); + json.writeBooleanField("fallback", fallback); + + json.writeArrayFieldStart("items"); + for (int i = 0; i < numItems; i++) { + json.writeString(RAW_ITEM_LONG.substring(0, itemSize)); + } + json.writeEndArray(); + json.writeEndObject(); + json.close(); + return jsonAsBytes.buffer(); + } + + /* package */ + static long getResponseKey(long id) { return (id / 37 + 5739375) * 7; } } diff --git a/ws-client/build.gradle b/ws-client/build.gradle index 6b97b66..13ba03c 100644 --- a/ws-client/build.gradle +++ b/ws-client/build.gradle @@ -6,14 +6,9 @@ sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 dependencies { - compile 'com.netflix.rxnetty:rx-netty:0.3.8' - compile 'com.netflix.rxjava:rxjava-core:0.18.2' - compile 'com.netflix.numerus:numerus:1.1' compile 'org.codehaus.jackson:jackson-core-asl:1.9.2' compile 'org.codehaus.jackson:jackson-mapper-asl:1.9.2' compile 'commons-cli:commons-cli:1.2' - testCompile 'junit:junit-dep:4.10' - testCompile 'org.mockito:mockito-core:1.8.5' } diff --git a/ws-impls/ws-impl-utils/build.gradle b/ws-impls/ws-impl-utils/build.gradle index 7191df8..fb94abc 100644 --- a/ws-impls/ws-impl-utils/build.gradle +++ b/ws-impls/ws-impl-utils/build.gradle @@ -2,15 +2,11 @@ apply plugin: 'java' apply plugin: 'eclipse' dependencies { - compile 'org.slf4j:slf4j-api:1.7.0' compile 'org.codehaus.jackson:jackson-core-asl:1.9.2' - compile 'junit:junit-dep:4.10' compile 'javax.servlet:javax.servlet-api:3.1.0' - compile 'com.netflix.rxjava:rxjava-core:0.18+' compile 'com.google.guava:guava:15.0' compile 'org.apache.commons:commons-lang3:3.1' - compile 'io.netty:netty-common:4.0.14.Final' - compile 'io.netty:netty-transport:4.0.14.Final' + compile 'junit:junit-dep:4.10' } eclipse { diff --git a/ws-impls/ws-impl-utils/src/main/java/perf/test/utils/BackendResponse.java b/ws-impls/ws-impl-utils/src/main/java/perf/test/utils/BackendResponse.java index 78d27f4..f2aa6c4 100644 --- a/ws-impls/ws-impl-utils/src/main/java/perf/test/utils/BackendResponse.java +++ b/ws-impls/ws-impl-utils/src/main/java/perf/test/utils/BackendResponse.java @@ -1,21 +1,21 @@ package perf.test.utils; -import static junit.framework.Assert.assertEquals; - -import java.io.IOException; -import java.io.InputStream; - import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonToken; import org.junit.Test; - import rx.Observable; import rx.Observable.OnSubscribe; import rx.Scheduler; import rx.Subscriber; import rx.schedulers.Schedulers; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +import static junit.framework.Assert.*; + /** * @author Nitesh Kant (nkant@netflix.com) */ @@ -26,13 +26,19 @@ public class BackendResponse { private final int numItems; private final int itemSize; private final String[] items; + private final boolean fallback; public BackendResponse(long responseKey, int delay, int numItems, int itemSize, String[] items) { + this(responseKey, delay, numItems, itemSize, items, false); + } + + public BackendResponse(long responseKey, int delay, int numItems, int itemSize, String[] items, boolean fallback) { this.responseKey = responseKey; this.delay = delay; this.numItems = numItems; this.itemSize = itemSize; this.items = items; + this.fallback = fallback; } public static BackendResponse fromJson(JsonFactory jsonFactory, byte[] content) throws Exception { @@ -83,6 +89,7 @@ public static BackendResponse parseBackendResponse(JsonParser parser) throws IOE int delay = 0; int numItems = 0; int itemSize = 0; + boolean fallback = false; String[] items = null; JsonToken current; @@ -98,6 +105,8 @@ public static BackendResponse parseBackendResponse(JsonParser parser) throws IOE itemSize = parser.getIntValue(); } else if (fieldName.equals("numItems")) { numItems = parser.getIntValue(); + } else if(fieldName.equals("fallback")) { + fallback = parser.getBooleanValue(); } else if (fieldName.equals("items")) { // expect numItems to be populated before hitting this if (numItems == 0) { @@ -117,8 +126,8 @@ public static BackendResponse parseBackendResponse(JsonParser parser) throws IOE } } - - return new BackendResponse(responseKey, delay, numItems, itemSize, items); + + return new BackendResponse(responseKey, delay, numItems, itemSize, items, fallback); } finally { parser.close(); } @@ -144,16 +153,33 @@ public String[] getItems() { return items; } + public boolean isFallback() { + return fallback; + } + + @Override + public String toString() { + return "BackendResponse{" + + "responseKey=" + responseKey + + ", delay=" + delay + + ", numItems=" + numItems + + ", itemSize=" + itemSize + + ", fallback=" + fallback + + ", items=" + Arrays.toString(items) + + '}'; + } + public static class UnitTest { @Test public void testJsonParse() throws Exception { JsonFactory jsonFactory = new JsonFactory(); - BackendResponse r = BackendResponse.fromJson(jsonFactory, "{ \"responseKey\": 9999, \"delay\": 50, \"itemSize\": 128, \"numItems\": 2, \"items\": [ \"Lorem\", \"Ipsum\" ]}"); + BackendResponse r = fromJson(jsonFactory, "{ \"responseKey\": 9999, \"delay\": 50, \"fallback\": false, \"itemSize\": 128, \"numItems\": 2, \"items\": [ \"Lorem\", \"Ipsum\" ]}"); assertEquals(9999, r.getResponseKey()); assertEquals(50, r.getDelay()); assertEquals(128, r.getItemSize()); assertEquals(2, r.getNumItems()); + assertFalse(r.fallback); String[] items = r.getItems(); assertEquals(2, items.length); assertEquals("Lorem", items[0]); diff --git a/ws-impls/ws-java-jetty/build.gradle b/ws-impls/ws-java-jetty/build.gradle index 72d633a..992b9b9 100644 --- a/ws-impls/ws-java-jetty/build.gradle +++ b/ws-impls/ws-java-jetty/build.gradle @@ -3,14 +3,11 @@ apply plugin: 'eclipse' apply plugin: 'application' dependencies { - compile 'org.slf4j:slf4j-api:1.7.0' - runtime 'org.slf4j:slf4j-simple:1.7.0' compile 'org.eclipse.jetty:jetty-server:9.0.3.v20130506' compile 'org.eclipse.jetty:jetty-client:9.0.3.v20130506' compile 'org.eclipse.jetty:jetty-continuation:9.0.3.v20130506' compile 'org.codehaus.jackson:jackson-core-asl:1.9.2' compile 'com.google.guava:guava:r05' - provided 'junit:junit-dep:4.10' compile project(':ws-impls:ws-impl-utils') } diff --git a/ws-impls/ws-java-ribbon/build.gradle b/ws-impls/ws-java-ribbon/build.gradle new file mode 100644 index 0000000..38bbe09 --- /dev/null +++ b/ws-impls/ws-java-ribbon/build.gradle @@ -0,0 +1,29 @@ +apply plugin: "eclipse" +apply plugin: "idea" +apply plugin: "java" +apply plugin: "application" + +sourceCompatibility = JavaVersion.VERSION_1_8 +targetCompatibility = JavaVersion.VERSION_1_8 + +dependencies { + compile "com.netflix.ribbon:ribbon:${ribbon_version}" + compile project(":ws-impls:ws-impl-utils") + compile project(":ws-backend-mock") +} + +/** + * ../../gradlew start + * ../../gradlew start -P'a=EVENTLOOPS PORT BACKEND_SERVER_LIST' + * ../../gradlew start -P'a=1 8888 host1:8989,host2:8989' + */ +task start(type:JavaExec) { + main = "perf.test.ribbon.StartServer" + classpath = sourceSets.main.runtimeClasspath +// jvmArgs(['-Dorg.slf4j.simpleLogger.defaultLogLevel=debug']) + if (project.hasProperty('a')) { + args(a.split(' ')) + } +} + +mainClassName = "perf.test.ribbon.StartServer" diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/ConnectionPoolMetricListener.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/ConnectionPoolMetricListener.java new file mode 100644 index 0000000..95d2c48 --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/ConnectionPoolMetricListener.java @@ -0,0 +1,86 @@ +package perf.test.ribbon; + +import io.reactivex.netty.metrics.HttpClientMetricEventsListener; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +/** + * @author Nitesh Kant + */ +public class ConnectionPoolMetricListener extends HttpClientMetricEventsListener { + + private LongAdder inUseCount = new LongAdder(); + private LongAdder idleCount = new LongAdder(); + private LongAdder totalConnections = new LongAdder(); + private LongAdder pendingAcquire = new LongAdder(); + private LongAdder pendingRelease = new LongAdder(); + + @Override + protected void onPoolReleaseFailed(long duration, TimeUnit timeUnit, Throwable throwable) { + pendingRelease.decrement(); + } + + @Override + protected void onPoolReleaseSuccess(long duration, TimeUnit timeUnit) { + pendingRelease.decrement(); + idleCount.increment(); + inUseCount.decrement(); + } + + @Override + protected void onPooledConnectionReuse(long duration, TimeUnit timeUnit) { + idleCount.decrement(); + } + + @Override + protected void onPoolReleaseStart() { + pendingRelease.increment(); + } + + @Override + protected void onPoolAcquireStart() { + pendingAcquire.increment(); + } + + @Override + protected void onPoolAcquireFailed(long duration, TimeUnit timeUnit, Throwable throwable) { + pendingAcquire.decrement(); + } + + @Override + protected void onPoolAcquireSuccess(long duration, TimeUnit timeUnit) { + pendingAcquire.decrement(); + inUseCount.increment(); + } + + @Override + protected void onConnectionCloseSuccess(long duration, TimeUnit timeUnit) { + totalConnections.decrement(); + } + + @Override + protected void onConnectSuccess(long duration, TimeUnit timeUnit) { + totalConnections.increment(); + } + + public long getInUseCount() { + return inUseCount.longValue(); + } + + public long getIdleCount() { + return idleCount.longValue(); + } + + public long getTotalConnections() { + return totalConnections.longValue(); + } + + public long getPendingAcquire() { + return pendingAcquire.longValue(); + } + + public long getPendingRelease() { + return pendingRelease.longValue(); + } +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/CounterEvent.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/CounterEvent.java new file mode 100644 index 0000000..485fb51 --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/CounterEvent.java @@ -0,0 +1,33 @@ +package perf.test.ribbon; + +import com.netflix.numerus.NumerusRollingNumberEvent; + +public enum CounterEvent implements NumerusRollingNumberEvent { + + REQUESTS(true), SUCCESS(true), HYSTRIX_FALLBACK(true), HTTP_ERROR(true), NETTY_ERROR(true), CLIENT_POOL_EXHAUSTION(true), + SOCKET_EXCEPTION(true), IO_EXCEPTION(true), CANCELLATION_EXCEPTION(true), PARSING_EXCEPTION(true), BYTES(true); + + private final boolean isCounter; + private final boolean isMaxUpdater; + + CounterEvent(boolean isCounter) { + this.isCounter = isCounter; + this.isMaxUpdater = !isCounter; + } + + @Override + public boolean isCounter() { + return isCounter; + } + + @Override + public boolean isMaxUpdater() { + return isMaxUpdater; + } + + @Override + public NumerusRollingNumberEvent[] getValues() { + return values(); + } + +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/DefaultFallbackHandler.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/DefaultFallbackHandler.java new file mode 100644 index 0000000..5fa404a --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/DefaultFallbackHandler.java @@ -0,0 +1,25 @@ +package perf.test.ribbon; + +import com.netflix.hystrix.HystrixExecutableInfo; +import com.netflix.ribbon.hystrix.FallbackHandler; +import io.netty.buffer.ByteBuf; +import perf.backend.MockResponse; +import rx.Observable; + +import java.util.Map; + +/** + * @author Tomasz Bak + */ +public class DefaultFallbackHandler implements FallbackHandler { + + @Override + public Observable getFallback(HystrixExecutableInfo hystrixExecutableInfo, Map stringObjectMap) { + return MockResponse.generateFallbackJson( + (Long) stringObjectMap.get("id"), + (Integer) stringObjectMap.get("delay"), + (Integer) stringObjectMap.get("itemSize"), + (Integer) stringObjectMap.get("numItems") + ); + } +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/MockBackendService.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/MockBackendService.java new file mode 100644 index 0000000..2f297de --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/MockBackendService.java @@ -0,0 +1,22 @@ +package perf.test.ribbon; + +import com.netflix.ribbon.RibbonRequest; +import com.netflix.ribbon.proxy.annotation.Http; +import com.netflix.ribbon.proxy.annotation.Http.HttpMethod; +import com.netflix.ribbon.proxy.annotation.Hystrix; +import com.netflix.ribbon.proxy.annotation.TemplateName; +import com.netflix.ribbon.proxy.annotation.Var; +import io.netty.buffer.ByteBuf; + +/** + * Maps mock backend request to Java interface. + * + * @author Tomasz Bak + */ +public interface MockBackendService { + + @TemplateName("test") + @Http(method = HttpMethod.GET, uriTemplate = "/mock.json?numItems={numItems}&itemSize={itemSize}&delay={delay}&id={id}") + @Hystrix(fallbackHandler = DefaultFallbackHandler.class, validator = SampleHystrixContentValidator.class) + RibbonRequest request(@Var("numItems") Integer numItems, @Var("itemSize") Integer itemSize, @Var("delay") Integer delay, @Var("id") Long id); +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/SampleHystrixContentValidator.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/SampleHystrixContentValidator.java new file mode 100644 index 0000000..5760eda --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/SampleHystrixContentValidator.java @@ -0,0 +1,19 @@ +package perf.test.ribbon; + +import com.netflix.ribbon.ServerError; +import com.netflix.ribbon.UnsuccessfulResponseException; +import com.netflix.ribbon.http.HttpResponseValidator; +import io.netty.buffer.ByteBuf; +import io.reactivex.netty.protocol.http.client.HttpClientResponse; + +/** + * @author Tomasz Bak + */ +public class SampleHystrixContentValidator implements HttpResponseValidator { + @Override + public void validate(HttpClientResponse httpClientResponse) throws UnsuccessfulResponseException, ServerError { + if (httpClientResponse.getStatus().code() / 100 != 2) { + throw new UnsuccessfulResponseException("Unexpected HTTP status code received: " + httpClientResponse.getStatus()); + } + } +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java new file mode 100644 index 0000000..437a499 --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java @@ -0,0 +1,173 @@ +package perf.test.ribbon; + +import com.netflix.numerus.NumerusRollingNumber; +import com.netflix.numerus.NumerusRollingPercentile; +import io.netty.buffer.ByteBuf; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.reactivex.netty.RxNetty; +import io.reactivex.netty.channel.SingleNioLoopProvider; +import io.reactivex.netty.client.PoolExhaustedException; +import io.reactivex.netty.protocol.http.client.HttpClient; +import perf.test.utils.JsonParseException; +import rx.Observable; + +import java.io.IOException; +import java.net.SocketException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; + +import static com.netflix.numerus.NumerusProperty.Factory.*; + +public final class StartServer { + + static final int rollingSeconds = 5; + + static final NumerusRollingNumber counter = new NumerusRollingNumber(CounterEvent.SUCCESS, + asProperty(rollingSeconds * 1000), asProperty(10)); + static final NumerusRollingPercentile latency = new NumerusRollingPercentile(asProperty(rollingSeconds * 1000), + asProperty(10), + asProperty(1000), asProperty(Boolean.TRUE)); + private static TestRouteBasic route; + private static TestRouteHello routeHello; + + public static void main(String[] args) { + + int eventLoops = Runtime.getRuntime().availableProcessors(); + int port = 8888; + String backendServerList = "127.0.0.1:8989"; + if (args.length == 0) { + // use defaults + } else if (args.length == 3) { + eventLoops = Integer.parseInt(args[0]); + port = Integer.parseInt(args[1]); + backendServerList = args[2]; + + } else { + System.err.println( + "Execute with either no argument (for defaults) or 4 arguments: EVENTLOOPS, PORT, BACKEND_HOST_LIST"); + System.exit(-1); + } + + System.out.println(String.format("Using eventloops: %d port: %d backend server list: %s", eventLoops, + port, backendServerList)); + + route = new TestRouteBasic(backendServerList); + routeHello = new TestRouteHello(); + + SingleNioLoopProvider provider = new SingleNioLoopProvider(eventLoops); + RxNetty.useEventLoopProvider(provider); + + System.out.println("Starting service on port " + port + " with backend servers " + backendServerList + " ..."); + startMonitoring(); + RxNetty.newHttpServerBuilder(port, (request, response) -> { + try { + if (request.getUri().startsWith("/testHello")) { + return routeHello.handle(request, response); + } + + long startTime = System.currentTimeMillis(); + counter.increment(CounterEvent.REQUESTS); + return route.handle(request, response) + .doOnCompleted(() -> { + counter.increment(CounterEvent.SUCCESS); + latency.addValue((int) (System.currentTimeMillis() - startTime)); + }) + .onErrorResumeNext(t -> { + if (t instanceof PoolExhaustedException) { + counter.increment(CounterEvent.CLIENT_POOL_EXHAUSTION); + } else if (t instanceof SocketException) { + counter.increment(CounterEvent.SOCKET_EXCEPTION); + } else if (t instanceof IOException) { + counter.increment(CounterEvent.IO_EXCEPTION); + } else if (t instanceof CancellationException) { + counter.increment(CounterEvent.CANCELLATION_EXCEPTION); + } else if (t instanceof JsonParseException) { + counter.increment(CounterEvent.PARSING_EXCEPTION); + } else { + counter.increment(CounterEvent.NETTY_ERROR); + } + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + response.writeStringAndFlush(""); + return Observable.empty(); + }); + } catch (Throwable e) { + System.err.println("Server => Error [" + request.getPath() + "] ==> " + e); + e.printStackTrace(); + counter.increment(CounterEvent.NETTY_ERROR); + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return response.writeStringAndFlush("Error 400: Bad Request\n" + e.getMessage() + '\n'); + } + }).eventLoops(new NioEventLoopGroup(1), provider.globalServerEventLoop()) + .build() + .withErrorHandler(throwable -> Observable.empty()) + .withErrorResponseGenerator((response, error) -> System.err.println("Error: " + error.getMessage())) + .startAndWait(); + } + + private static void startMonitoring() { + int interval = 5; + Observable.interval(interval, TimeUnit.SECONDS).doOnNext(l -> { + + long totalRequestsInLastWindow = getRollingSum(CounterEvent.REQUESTS); + + if (totalRequestsInLastWindow <= 0) { + return; // Don't print anything if there weren't any requests coming. + } + + StringBuilder msg = new StringBuilder(); + msg.append("########################################################################################").append( + '\n'); + msg.append("Time since start (seconds): " + l * interval).append('\n'); + msg.append("########################################################################################").append( + '\n'); + + long successSum = counter.getCumulativeSum(CounterEvent.SUCCESS); + long hystrixFallbackSum = counter.getCumulativeSum(CounterEvent.HYSTRIX_FALLBACK); + long successRolling = counter.getRollingSum(CounterEvent.SUCCESS); + long hystrixFallbackRolling = counter.getRollingSum(CounterEvent.HYSTRIX_FALLBACK); + + msg.append("Total => "); + msg.append(" Requests: ").append(counter.getCumulativeSum(CounterEvent.REQUESTS)); + msg.append(" Success: ").append(successSum).append('(').append(successSum - hystrixFallbackSum) + .append(" good/").append(hystrixFallbackSum).append(" fallbacks)"); + msg.append(" Error: ").append(counter.getCumulativeSum(CounterEvent.HTTP_ERROR)); + msg.append(" Netty Error: ").append(counter.getCumulativeSum(CounterEvent.NETTY_ERROR)); + msg.append(" Client Pool Exhausted: ").append(counter.getCumulativeSum(CounterEvent.CLIENT_POOL_EXHAUSTION)); + msg.append(" Socket Exception: ").append(counter.getCumulativeSum(CounterEvent.SOCKET_EXCEPTION)); + msg.append(" I/O Exception: ").append(counter.getCumulativeSum(CounterEvent.IO_EXCEPTION)); + msg.append(" Cancellation Exception: ").append(counter.getCumulativeSum(CounterEvent.CANCELLATION_EXCEPTION)); + msg.append(" Parsing Exception: ").append(counter.getCumulativeSum(CounterEvent.PARSING_EXCEPTION)); + msg.append(" Bytes: ").append(counter.getCumulativeSum(CounterEvent.BYTES) / 1024).append("kb"); + msg.append(" \n Rolling =>"); + msg.append(" Requests: ").append(getRollingSum(CounterEvent.REQUESTS)).append("/s"); + msg.append(" Success: ").append(successRolling).append('(').append(successRolling - hystrixFallbackRolling) + .append(" good/").append(hystrixFallbackRolling).append(" fallbacks)"); + msg.append(" Error: ").append(getRollingSum(CounterEvent.HTTP_ERROR)).append("/s"); + msg.append(" Netty Error: ").append(getRollingSum(CounterEvent.NETTY_ERROR)).append("/s"); + msg.append(" Bytes: ").append(getRollingSum(CounterEvent.BYTES) / 1024).append("kb/s"); + msg.append(" \n Latency (ms) => 50th: ").append(latency.getPercentile(50.0)).append( + " 90th: ").append(latency.getPercentile(90.0)); + msg.append(" 99th: ").append(latency.getPercentile(99.0)).append(" 100th: ").append(latency.getPercentile( + 100.0)); + System.out.println(msg.toString()); + + StringBuilder n = new StringBuilder(); + ConnectionPoolMetricListener stats = route.getStats(); + n.append(" Netty => Used: ").append(stats.getInUseCount()); + n.append(" Idle: ").append(stats.getIdleCount()); + n.append(" Total Conns: ").append(stats.getTotalConnections()); + n.append(" AcqReq: ").append(stats.getPendingAcquire()); + n.append(" RelReq: ").append(stats.getPendingRelease()); + System.out.println(n.toString()); + }).subscribe(); + } + + private static long getRollingSum(CounterEvent e) { + long s = counter.getRollingSum(e); + if (s > 0) { + s /= rollingSeconds; + } + return s; + } +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java new file mode 100644 index 0000000..577b4fa --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java @@ -0,0 +1,142 @@ +package perf.test.ribbon; + +import com.netflix.ribbon.ClientOptions; +import com.netflix.ribbon.http.HttpResourceGroup; +import com.netflix.ribbon.proxy.RibbonDynamicProxy; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.reactivex.netty.protocol.http.server.HttpServerRequest; +import io.reactivex.netty.protocol.http.server.HttpServerResponse; +import org.codehaus.jackson.JsonFactory; +import perf.test.utils.BackendResponse; +import perf.test.utils.JsonParseException; +import perf.test.utils.ServiceResponseBuilder; +import rx.Observable; + +import java.io.ByteArrayOutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * @author Tomasz Bak + */ +public class TestRouteBasic { + + private final ConnectionPoolMetricListener metricListener; + + private static JsonFactory jsonFactory = new JsonFactory(); + + private final HttpResourceGroup httpResourceGroup; + private final MockBackendService service; + + public TestRouteBasic(String backendServerList) { + httpResourceGroup = new HttpResourceGroup("performanceTest", ClientOptions.create() + .withMaxAutoRetries(0) + .withMaxAutoRetriesNextServer(0) + .withConnectTimeout(30000) + .withReadTimeout(30000) + .withMaxConnectionsPerHost(10000) + .withMaxTotalConnections(10000) + .withConfigurationBasedServerList(backendServerList)); + + metricListener = new ConnectionPoolMetricListener(); + httpResourceGroup.getClient().subscribe(metricListener); + + service = RibbonDynamicProxy.newInstance(MockBackendService.class, httpResourceGroup); + } + + public Observable handle(HttpServerRequest request, HttpServerResponse response) { + long startTime = System.currentTimeMillis(); + List _id = request.getQueryParameters().get("id"); + + if (_id == null || _id.size() != 1) { + return writeError(request, response, "Please provide a numerical 'id' value. It can be a random number (uuid)."); + } + long id = Long.parseLong(String.valueOf(_id.get(0))); + + + Observable> acd = getDataFromBackend(2, 50, 50, id) + .doOnError(Throwable::printStackTrace) + // Eclipse 20140224-0627 can't infer without this type hint even though the Java 8 compiler can + .>flatMap(responseA -> { + Observable responseC = getDataFromBackend(1, 5000, 80, responseA.getResponseKey()); + Observable responseD = getDataFromBackend(1, 1000, 1, responseA.getResponseKey()); + return Observable.zip(Observable.just(responseA), responseC, responseD, + Arrays::asList); + }).doOnError(Throwable::printStackTrace); + + Observable> be = getDataFromBackend(25, 30, 150, id) + // Eclipse 20140224-0627 can't infer without this type hint even though the Java 8 compiler can + .>flatMap(responseB -> { + Observable responseE = getDataFromBackend(100, 30, 4, responseB.getResponseKey()); + return Observable.zip(Observable.just(responseB), responseE, Arrays::asList); + }).doOnError(Throwable::printStackTrace); + + return Observable.zip(acd, be, (_acd, _be) -> { + BackendResponse responseA = _acd.get(0); + BackendResponse responseB = _be.get(0); + BackendResponse responseC = _acd.get(1); + BackendResponse responseD = _acd.get(2); + BackendResponse responseE = _be.get(1); + + BackendResponse[] backendResponses = {responseA, responseB, responseC, responseD, responseE}; + checkForHystrixCallbacks(backendResponses); + return backendResponses; + }).flatMap(backendResponses -> { + try { + ByteArrayOutputStream responseStream = ServiceResponseBuilder.buildTestAResponse(jsonFactory, backendResponses); + // set response header + response.getHeaders().addHeader("Content-Type", "application/json"); + // performance headers + addResponseHeaders(response, startTime); + int contentLength = responseStream.size(); + response.getHeaders().addHeader("Content-Length", contentLength); + return response.writeBytesAndFlush(responseStream.toByteArray()); + } catch (Exception e) { + e.printStackTrace(); + return writeError(request, response, "Failed: " + e.getMessage()); + } + }).doOnError(Throwable::printStackTrace); + } + + public ConnectionPoolMetricListener getStats() { + return metricListener; + } + + private void checkForHystrixCallbacks(BackendResponse[] backendResponses) { + for (BackendResponse r : backendResponses) { + if (r.isFallback()) { + StartServer.counter.increment(CounterEvent.HYSTRIX_FALLBACK); + return; + } + } + } + + /** + * Add various headers used for logging and statistics. + */ + private static void addResponseHeaders(HttpServerResponse response, long startTime) { + Map perfResponseHeaders = ServiceResponseBuilder.getPerfResponseHeaders(startTime); + for (Map.Entry entry : perfResponseHeaders.entrySet()) { + response.getHeaders().add(entry.getKey(), entry.getValue()); + } + } + + private Observable getDataFromBackend(Integer numItems, Integer itemSize, Integer delay, Long id) { + return service.request(numItems, itemSize, delay, id).toObservable().flatMap(b -> { + try { + return Observable.just(BackendResponse.fromJson(jsonFactory, new ByteBufInputStream(b))); + } catch (JsonParseException e) { + return Observable.error(e); + } + }); + } + + private static Observable writeError(HttpServerRequest request, HttpServerResponse response, String message) { + System.err.println("Server => Error [" + request.getPath() + "] => " + message); + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return response.writeStringAndFlush("Error 500: " + message + "\n"); + } +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteHello.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteHello.java new file mode 100644 index 0000000..0114cab --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteHello.java @@ -0,0 +1,23 @@ +package perf.test.ribbon; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.HttpHeaders; +import io.reactivex.netty.protocol.http.server.HttpServerRequest; +import io.reactivex.netty.protocol.http.server.HttpServerResponse; +import rx.Observable; + +/** + * @author Nitesh Kant + */ +public class TestRouteHello { + + private static final byte[] MSG = "Hello World".getBytes(); + public static final int HELLO_WORLD_LENGTH = MSG.length; + public static final String HELLO_WORLD_LENGTH_STR = String.valueOf(MSG.length); + + public Observable handle(HttpServerRequest request, HttpServerResponse response) { + response.getHeaders().set(HttpHeaders.Names.CONTENT_LENGTH, HELLO_WORLD_LENGTH_STR); + response.write(response.getAllocator().buffer(HELLO_WORLD_LENGTH).writeBytes(MSG)); + return response.close(); + } +} diff --git a/ws-impls/ws-java-rxnetty/build.gradle b/ws-impls/ws-java-rxnetty/build.gradle index 5222673..7606d91 100644 --- a/ws-impls/ws-java-rxnetty/build.gradle +++ b/ws-impls/ws-java-rxnetty/build.gradle @@ -7,9 +7,6 @@ sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 dependencies { - compile 'com.netflix.rxnetty:rx-netty:0.3.8' - compile 'com.netflix.rxjava:rxjava-core:0.18.2' - compile 'com.netflix.numerus:numerus:1.1' compile 'org.codehaus.jackson:jackson-core-asl:1.9.13' compile 'org.codehaus.jackson:jackson-mapper-asl:1.9.13' compile project(':ws-impls:ws-impl-utils') diff --git a/ws-impls/ws-java-servlet-blocking/build.gradle b/ws-impls/ws-java-servlet-blocking/build.gradle index 4eb7c71..73f9f15 100644 --- a/ws-impls/ws-java-servlet-blocking/build.gradle +++ b/ws-impls/ws-java-servlet-blocking/build.gradle @@ -5,10 +5,7 @@ apply plugin: 'jetty' dependencies { compile 'org.codehaus.jackson:jackson-core-asl:1.9.2' compile 'org.apache.httpcomponents:httpclient:4.3.1' - compile 'org.slf4j:slf4j-api:1.7.0' - runtime 'org.slf4j:slf4j-simple:1.7.0' compile project(':ws-impls:ws-impl-utils') - provided 'junit:junit-dep:4.10' compile 'javax.servlet:javax.servlet-api:3.0.1' }