From 84393df7c225d835989969d40a1a69aa8c9c94a9 Mon Sep 17 00:00:00 2001 From: Tomasz Bak Date: Sun, 6 Jul 2014 14:18:13 -0700 Subject: [PATCH 1/4] Ribbon proxy based server implementation. It is largely based on ws-java-rxnetty project, adopted to use Ribbon proxy client for communication with the backend mock. It collects Hystrix fallback events, so it is possibly to differentiate end-to-end success calls from short circuited. --- build.gradle | 13 +- settings.gradle | 3 +- ws-backend-mock/build.gradle | 5 - .../main/java/perf/backend/MockResponse.java | 88 +++++---- ws-client/build.gradle | 5 - ws-impls/ws-impl-utils/build.gradle | 6 +- .../java/perf/test/utils/BackendResponse.java | 44 ++++- ws-impls/ws-java-jetty/build.gradle | 3 - ws-impls/ws-java-ribbon/build.gradle | 31 ++++ .../java/perf/test/ribbon/CounterEvent.java | 33 ++++ .../test/ribbon/DefaultFallbackHandler.java | 25 +++ .../perf/test/ribbon/MockBackendService.java | 28 +++ .../ribbon/SampleHystrixContentValidator.java | 19 ++ .../java/perf/test/ribbon/StartServer.java | 172 ++++++++++++++++++ .../java/perf/test/ribbon/TestRouteBasic.java | 136 ++++++++++++++ .../java/perf/test/ribbon/TestRouteHello.java | 23 +++ ws-impls/ws-java-rxnetty/build.gradle | 3 - .../ws-java-servlet-blocking/build.gradle | 3 - 18 files changed, 566 insertions(+), 74 deletions(-) create mode 100644 ws-impls/ws-java-ribbon/build.gradle create mode 100644 ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/CounterEvent.java create mode 100644 ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/DefaultFallbackHandler.java create mode 100644 ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/MockBackendService.java create mode 100644 ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/SampleHystrixContentValidator.java create mode 100644 ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java create mode 100644 ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java create mode 100644 ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteHello.java diff --git a/build.gradle b/build.gradle index cdb1878..b4cf392 100644 --- a/build.gradle +++ b/build.gradle @@ -17,9 +17,20 @@ subprojects { sourceSets.test.java.srcDir 'src/main/java' + dependencies { + compile 'io.netty:netty-common:4.0.21.Final' + compile 'io.netty:netty-transport:4.0.21.Final' + compile 'com.netflix.rxnetty:rx-netty:0.3.7' + compile 'org.slf4j:slf4j-api:1.7.0' + runtime 'org.slf4j:slf4j-simple:1.7.0' + compile 'com.netflix.numerus:numerus:1.1' + testCompile 'junit:junit-dep:4.10' + testCompile 'org.mockito:mockito-core:1.8.5' + } + tasks.withType(Javadoc).each { it.classpath = sourceSets.main.compileClasspath - } + } } if (JavaVersion.current().isJava8Compatible()) { diff --git a/settings.gradle b/settings.gradle index 222d4e7..be0b7dc 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,4 +4,5 @@ include 'ws-backend-mock', \ 'ws-impls:ws-impl-utils', \ 'ws-impls:ws-java-servlet-blocking', \ 'ws-impls:ws-java-jetty', \ -'ws-impls:ws-java-rxnetty' +'ws-impls:ws-java-rxnetty', \ +'ws-impls/ws-java-ribbon' \ No newline at end of file diff --git a/ws-backend-mock/build.gradle b/ws-backend-mock/build.gradle index 3a20fd8..2244e3e 100644 --- a/ws-backend-mock/build.gradle +++ b/ws-backend-mock/build.gradle @@ -6,13 +6,8 @@ sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 dependencies { - compile 'com.netflix.rxnetty:rx-netty:0.3.5' compile 'com.netflix.numerus:numerus:1.1' - compile 'com.netflix.rxjava:rxjava-core:0.18.2' compile 'org.codehaus.jackson:jackson-core-asl:1.9.2' - compile 'org.slf4j:slf4j-api:1.7.0' - testCompile 'junit:junit-dep:4.10' - testCompile 'org.mockito:mockito-core:1.8.5' } /** diff --git a/ws-backend-mock/src/main/java/perf/backend/MockResponse.java b/ws-backend-mock/src/main/java/perf/backend/MockResponse.java index 2a827d9..dacb048 100644 --- a/ws-backend-mock/src/main/java/perf/backend/MockResponse.java +++ b/ws-backend-mock/src/main/java/perf/backend/MockResponse.java @@ -4,7 +4,6 @@ import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; import rx.Observable; import rx.Scheduler.Worker; @@ -44,16 +43,10 @@ private MockResponse() { } /** - * - * @param id - * ID from client used to assert correct client/server interaction - * @param delay - * How long to delay delivery to simulate server-side latency - * @param itemSize - * Length of each item String. - * @param numItems - * Number of items in response. - * + * @param id ID from client used to assert correct client/server interaction + * @param delay How long to delay delivery to simulate server-side latency + * @param itemSize Length of each item String. + * @param numItems Number of items in response. * @return String json */ public static Observable generateJson(long id, int delay, int itemSize, int numItems) { @@ -62,33 +55,8 @@ public static Observable generateJson(long id, int delay, int itemSize, subscriber.add(worker); worker.schedule(() -> { try { - ByteBuf buffer = Unpooled.buffer(); - ByteBufOutputStream jsonAsBytes = new ByteBufOutputStream(buffer); - JsonGenerator json = jsonFactory.createJsonGenerator(jsonAsBytes); - - json.writeStartObject(); - - // manipulate the ID such that we can know the response is from the server (client will know the logic) - long responseKey = getResponseKey(id); - - json.writeNumberField("responseKey", responseKey); - - json.writeNumberField("delay", delay); - if (itemSize > MAX_ITEM_LENGTH) { - throw new IllegalArgumentException("itemSize can not be larger than: " + MAX_ITEM_LENGTH); - } - json.writeNumberField("itemSize", itemSize); - json.writeNumberField("numItems", numItems); - - json.writeArrayFieldStart("items"); - for (int i = 0; i < numItems; i++) { - json.writeString(RAW_ITEM_LONG.substring(0, itemSize)); - } - json.writeEndArray(); - json.writeEndObject(); - json.close(); - - subscriber.onNext(buffer); + ByteBuf byteBuf = createJsonResponse(id, delay, itemSize, numItems, false); + subscriber.onNext(byteBuf); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); @@ -97,7 +65,49 @@ public static Observable generateJson(long id, int delay, int itemSize, }); } - /* package */static long getResponseKey(long id) { + public static Observable generateFallbackJson(long id, int delay, int itemSize, int numItems) { + try { + ByteBuf byteBuf = createJsonResponse(id, delay, itemSize, numItems, true); + return Observable.just(byteBuf); + } catch (IOException e) { + // We do not expect to get here + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public static ByteBuf createJsonResponse(long id, int delay, int itemSize, int numItems, boolean fallback) throws IOException { + ByteBuf buffer = Unpooled.buffer(); + ByteBufOutputStream jsonAsBytes = new ByteBufOutputStream(buffer); + JsonGenerator json = jsonFactory.createJsonGenerator(jsonAsBytes); + + json.writeStartObject(); + + // manipulate the ID such that we can know the response is from the server (client will know the logic) + long responseKey = getResponseKey(id); + + json.writeNumberField("responseKey", responseKey); + + json.writeNumberField("delay", delay); + if (itemSize > MAX_ITEM_LENGTH) { + throw new IllegalArgumentException("itemSize can not be larger than: " + MAX_ITEM_LENGTH); + } + json.writeNumberField("itemSize", itemSize); + json.writeNumberField("numItems", numItems); + json.writeBooleanField("fallback", fallback); + + json.writeArrayFieldStart("items"); + for (int i = 0; i < numItems; i++) { + json.writeString(RAW_ITEM_LONG.substring(0, itemSize)); + } + json.writeEndArray(); + json.writeEndObject(); + json.close(); + return jsonAsBytes.buffer(); + } + + /* package */ + static long getResponseKey(long id) { return (id / 37 + 5739375) * 7; } } diff --git a/ws-client/build.gradle b/ws-client/build.gradle index 435cc9c..13ba03c 100644 --- a/ws-client/build.gradle +++ b/ws-client/build.gradle @@ -6,14 +6,9 @@ sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 dependencies { - compile 'com.netflix.rxnetty:rx-netty:0.3.5' - compile 'com.netflix.rxjava:rxjava-core:0.18.2' - compile 'com.netflix.numerus:numerus:1.1' compile 'org.codehaus.jackson:jackson-core-asl:1.9.2' compile 'org.codehaus.jackson:jackson-mapper-asl:1.9.2' compile 'commons-cli:commons-cli:1.2' - testCompile 'junit:junit-dep:4.10' - testCompile 'org.mockito:mockito-core:1.8.5' } diff --git a/ws-impls/ws-impl-utils/build.gradle b/ws-impls/ws-impl-utils/build.gradle index 7191df8..fb94abc 100644 --- a/ws-impls/ws-impl-utils/build.gradle +++ b/ws-impls/ws-impl-utils/build.gradle @@ -2,15 +2,11 @@ apply plugin: 'java' apply plugin: 'eclipse' dependencies { - compile 'org.slf4j:slf4j-api:1.7.0' compile 'org.codehaus.jackson:jackson-core-asl:1.9.2' - compile 'junit:junit-dep:4.10' compile 'javax.servlet:javax.servlet-api:3.1.0' - compile 'com.netflix.rxjava:rxjava-core:0.18+' compile 'com.google.guava:guava:15.0' compile 'org.apache.commons:commons-lang3:3.1' - compile 'io.netty:netty-common:4.0.14.Final' - compile 'io.netty:netty-transport:4.0.14.Final' + compile 'junit:junit-dep:4.10' } eclipse { diff --git a/ws-impls/ws-impl-utils/src/main/java/perf/test/utils/BackendResponse.java b/ws-impls/ws-impl-utils/src/main/java/perf/test/utils/BackendResponse.java index 78d27f4..f2aa6c4 100644 --- a/ws-impls/ws-impl-utils/src/main/java/perf/test/utils/BackendResponse.java +++ b/ws-impls/ws-impl-utils/src/main/java/perf/test/utils/BackendResponse.java @@ -1,21 +1,21 @@ package perf.test.utils; -import static junit.framework.Assert.assertEquals; - -import java.io.IOException; -import java.io.InputStream; - import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonToken; import org.junit.Test; - import rx.Observable; import rx.Observable.OnSubscribe; import rx.Scheduler; import rx.Subscriber; import rx.schedulers.Schedulers; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +import static junit.framework.Assert.*; + /** * @author Nitesh Kant (nkant@netflix.com) */ @@ -26,13 +26,19 @@ public class BackendResponse { private final int numItems; private final int itemSize; private final String[] items; + private final boolean fallback; public BackendResponse(long responseKey, int delay, int numItems, int itemSize, String[] items) { + this(responseKey, delay, numItems, itemSize, items, false); + } + + public BackendResponse(long responseKey, int delay, int numItems, int itemSize, String[] items, boolean fallback) { this.responseKey = responseKey; this.delay = delay; this.numItems = numItems; this.itemSize = itemSize; this.items = items; + this.fallback = fallback; } public static BackendResponse fromJson(JsonFactory jsonFactory, byte[] content) throws Exception { @@ -83,6 +89,7 @@ public static BackendResponse parseBackendResponse(JsonParser parser) throws IOE int delay = 0; int numItems = 0; int itemSize = 0; + boolean fallback = false; String[] items = null; JsonToken current; @@ -98,6 +105,8 @@ public static BackendResponse parseBackendResponse(JsonParser parser) throws IOE itemSize = parser.getIntValue(); } else if (fieldName.equals("numItems")) { numItems = parser.getIntValue(); + } else if(fieldName.equals("fallback")) { + fallback = parser.getBooleanValue(); } else if (fieldName.equals("items")) { // expect numItems to be populated before hitting this if (numItems == 0) { @@ -117,8 +126,8 @@ public static BackendResponse parseBackendResponse(JsonParser parser) throws IOE } } - - return new BackendResponse(responseKey, delay, numItems, itemSize, items); + + return new BackendResponse(responseKey, delay, numItems, itemSize, items, fallback); } finally { parser.close(); } @@ -144,16 +153,33 @@ public String[] getItems() { return items; } + public boolean isFallback() { + return fallback; + } + + @Override + public String toString() { + return "BackendResponse{" + + "responseKey=" + responseKey + + ", delay=" + delay + + ", numItems=" + numItems + + ", itemSize=" + itemSize + + ", fallback=" + fallback + + ", items=" + Arrays.toString(items) + + '}'; + } + public static class UnitTest { @Test public void testJsonParse() throws Exception { JsonFactory jsonFactory = new JsonFactory(); - BackendResponse r = BackendResponse.fromJson(jsonFactory, "{ \"responseKey\": 9999, \"delay\": 50, \"itemSize\": 128, \"numItems\": 2, \"items\": [ \"Lorem\", \"Ipsum\" ]}"); + BackendResponse r = fromJson(jsonFactory, "{ \"responseKey\": 9999, \"delay\": 50, \"fallback\": false, \"itemSize\": 128, \"numItems\": 2, \"items\": [ \"Lorem\", \"Ipsum\" ]}"); assertEquals(9999, r.getResponseKey()); assertEquals(50, r.getDelay()); assertEquals(128, r.getItemSize()); assertEquals(2, r.getNumItems()); + assertFalse(r.fallback); String[] items = r.getItems(); assertEquals(2, items.length); assertEquals("Lorem", items[0]); diff --git a/ws-impls/ws-java-jetty/build.gradle b/ws-impls/ws-java-jetty/build.gradle index 72d633a..992b9b9 100644 --- a/ws-impls/ws-java-jetty/build.gradle +++ b/ws-impls/ws-java-jetty/build.gradle @@ -3,14 +3,11 @@ apply plugin: 'eclipse' apply plugin: 'application' dependencies { - compile 'org.slf4j:slf4j-api:1.7.0' - runtime 'org.slf4j:slf4j-simple:1.7.0' compile 'org.eclipse.jetty:jetty-server:9.0.3.v20130506' compile 'org.eclipse.jetty:jetty-client:9.0.3.v20130506' compile 'org.eclipse.jetty:jetty-continuation:9.0.3.v20130506' compile 'org.codehaus.jackson:jackson-core-asl:1.9.2' compile 'com.google.guava:guava:r05' - provided 'junit:junit-dep:4.10' compile project(':ws-impls:ws-impl-utils') } diff --git a/ws-impls/ws-java-ribbon/build.gradle b/ws-impls/ws-java-ribbon/build.gradle new file mode 100644 index 0000000..61ad0b6 --- /dev/null +++ b/ws-impls/ws-java-ribbon/build.gradle @@ -0,0 +1,31 @@ +apply plugin: "eclipse" +apply plugin: "idea" +apply plugin: 'java' +apply plugin: 'application' + +sourceCompatibility = JavaVersion.VERSION_1_8 +targetCompatibility = JavaVersion.VERSION_1_8 + +dependencies { + compile 'com.netflix.ribbon:ribbon:2.0.0' + compile 'org.codehaus.jackson:jackson-core-asl:1.9.13' + compile 'org.codehaus.jackson:jackson-mapper-asl:1.9.13' + compile project(':ws-impls:ws-impl-utils') + compile project(':ws-backend-mock') +} + + +/** + * ../../gradlew start + * ../../gradlew start -P'a=EVENTLOOPS PORT BACKEND_HOST BACKEND_PORT' + * ../../gradlew start -P'a=1 8888 ec2-54-87-12-221.compute-1.amazonaws.com 8989' + */ +task start(type:JavaExec) { + main = "perf.test.ribbon.StartServer" + classpath = sourceSets.main.runtimeClasspath + if (project.hasProperty('a')) { + args(a.split(' ')) + } +} + +mainClassName = "perf.test.ribbon.StartServer" diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/CounterEvent.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/CounterEvent.java new file mode 100644 index 0000000..485fb51 --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/CounterEvent.java @@ -0,0 +1,33 @@ +package perf.test.ribbon; + +import com.netflix.numerus.NumerusRollingNumberEvent; + +public enum CounterEvent implements NumerusRollingNumberEvent { + + REQUESTS(true), SUCCESS(true), HYSTRIX_FALLBACK(true), HTTP_ERROR(true), NETTY_ERROR(true), CLIENT_POOL_EXHAUSTION(true), + SOCKET_EXCEPTION(true), IO_EXCEPTION(true), CANCELLATION_EXCEPTION(true), PARSING_EXCEPTION(true), BYTES(true); + + private final boolean isCounter; + private final boolean isMaxUpdater; + + CounterEvent(boolean isCounter) { + this.isCounter = isCounter; + this.isMaxUpdater = !isCounter; + } + + @Override + public boolean isCounter() { + return isCounter; + } + + @Override + public boolean isMaxUpdater() { + return isMaxUpdater; + } + + @Override + public NumerusRollingNumberEvent[] getValues() { + return values(); + } + +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/DefaultFallbackHandler.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/DefaultFallbackHandler.java new file mode 100644 index 0000000..5fa404a --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/DefaultFallbackHandler.java @@ -0,0 +1,25 @@ +package perf.test.ribbon; + +import com.netflix.hystrix.HystrixExecutableInfo; +import com.netflix.ribbon.hystrix.FallbackHandler; +import io.netty.buffer.ByteBuf; +import perf.backend.MockResponse; +import rx.Observable; + +import java.util.Map; + +/** + * @author Tomasz Bak + */ +public class DefaultFallbackHandler implements FallbackHandler { + + @Override + public Observable getFallback(HystrixExecutableInfo hystrixExecutableInfo, Map stringObjectMap) { + return MockResponse.generateFallbackJson( + (Long) stringObjectMap.get("id"), + (Integer) stringObjectMap.get("delay"), + (Integer) stringObjectMap.get("itemSize"), + (Integer) stringObjectMap.get("numItems") + ); + } +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/MockBackendService.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/MockBackendService.java new file mode 100644 index 0000000..7b82289 --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/MockBackendService.java @@ -0,0 +1,28 @@ +package perf.test.ribbon; + +import com.netflix.ribbon.RibbonRequest; +import com.netflix.ribbon.proxy.annotation.Http; +import com.netflix.ribbon.proxy.annotation.Http.HttpMethod; +import com.netflix.ribbon.proxy.annotation.Hystrix; +import com.netflix.ribbon.proxy.annotation.TemplateName; +import com.netflix.ribbon.proxy.annotation.Var; +import io.netty.buffer.ByteBuf; + +/** + * Maps mock backend request to Java interface. + *

+ * A) GET http://hostname:9100/mock.json?numItems=2&itemSize=50&delay=50&id={uuid} + * B) GET http://hostname:9100/mock.json?numItems=25&itemSize=30&delay=150&id={uuid} + * C) GET http://hostname:9100/mock.json?numItems=1&itemSize=5000&delay=80&id={a.responseKey} + * D) GET http://hostname:9100/mock.json?numItems=1&itemSize=1000&delay=1&id={a.responseKey} + * E) GET http://hostname:9100/mock.json?numItems=100&itemSize=30&delay=4&id={b.responseKey} + * + * @author Tomasz Bak + */ +public interface MockBackendService { + + @TemplateName("test") + @Http(method = HttpMethod.GET, uriTemplate = "/mock.json?numItems={numItems}&itemSize={itemSize}&delay={delay}&id={id}") + @Hystrix(fallbackHandler = DefaultFallbackHandler.class, validator = SampleHystrixContentValidator.class) + RibbonRequest request(@Var("numItems") Integer numItems, @Var("itemSize") Integer itemSize, @Var("delay") Integer delay, @Var("id") Long id); +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/SampleHystrixContentValidator.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/SampleHystrixContentValidator.java new file mode 100644 index 0000000..5760eda --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/SampleHystrixContentValidator.java @@ -0,0 +1,19 @@ +package perf.test.ribbon; + +import com.netflix.ribbon.ServerError; +import com.netflix.ribbon.UnsuccessfulResponseException; +import com.netflix.ribbon.http.HttpResponseValidator; +import io.netty.buffer.ByteBuf; +import io.reactivex.netty.protocol.http.client.HttpClientResponse; + +/** + * @author Tomasz Bak + */ +public class SampleHystrixContentValidator implements HttpResponseValidator { + @Override + public void validate(HttpClientResponse httpClientResponse) throws UnsuccessfulResponseException, ServerError { + if (httpClientResponse.getStatus().code() / 100 != 2) { + throw new UnsuccessfulResponseException("Unexpected HTTP status code received: " + httpClientResponse.getStatus()); + } + } +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java new file mode 100644 index 0000000..f7c7943 --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java @@ -0,0 +1,172 @@ +package perf.test.ribbon; + +import com.netflix.numerus.NumerusRollingNumber; +import com.netflix.numerus.NumerusRollingPercentile; +import io.netty.buffer.ByteBuf; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.reactivex.netty.RxNetty; +import io.reactivex.netty.channel.SingleNioLoopProvider; +import io.reactivex.netty.client.PoolExhaustedException; +import io.reactivex.netty.protocol.http.client.HttpClient; +import perf.test.utils.JsonParseException; +import rx.Observable; + +import java.io.IOException; +import java.net.SocketException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; + +import static com.netflix.numerus.NumerusProperty.Factory.*; + +public final class StartServer { + + static final int rollingSeconds = 5; + + static final NumerusRollingNumber counter = new NumerusRollingNumber(CounterEvent.SUCCESS, + asProperty(rollingSeconds * 1000), asProperty(10)); + static final NumerusRollingPercentile latency = new NumerusRollingPercentile(asProperty(rollingSeconds * 1000), + asProperty(10), + asProperty(1000), asProperty(Boolean.TRUE)); + private static TestRouteBasic route; + private static TestRouteHello routeHello; + + public static void main(String[] args) { + + int eventLoops = Runtime.getRuntime().availableProcessors(); + int port = 8888; + String backendHost = "127.0.0.1"; + int backendPort = 8989; + if (args.length == 0) { + // use defaults + } else if (args.length == 4) { + eventLoops = Integer.parseInt(args[0]); + port = Integer.parseInt(args[1]); + backendHost = args[2]; + backendPort = Integer.parseInt(args[3]); + + } else { + System.err.println( + "Execute with either no argument (for defaults) or 4 arguments: EVENTLOOPS, PORT, BACKEND_HOST, BACKEND_PORT"); + System.exit(-1); + } + + System.out.println(String.format("Using eventloops: %d port: %d backend host: %s backend port: %d", eventLoops, + port, backendHost, backendPort)); + + route = new TestRouteBasic(backendHost, backendPort); + routeHello = new TestRouteHello(); + + SingleNioLoopProvider provider = new SingleNioLoopProvider(eventLoops); + RxNetty.useEventLoopProvider(provider); + + System.out.println("Starting service on port " + port + " with backend at " + backendHost + ':' + backendPort + " ..."); + startMonitoring(); + RxNetty.newHttpServerBuilder(port, (request, response) -> { + try { + if (request.getUri().startsWith("/testHello")) { + return routeHello.handle(request, response); + } + + long startTime = System.currentTimeMillis(); + counter.increment(CounterEvent.REQUESTS); + return route.handle(request, response) + .doOnCompleted(() -> { + counter.increment(CounterEvent.SUCCESS); + latency.addValue((int) (System.currentTimeMillis() - startTime)); + }) + .onErrorResumeNext(t -> { + if (t instanceof PoolExhaustedException) { + counter.increment(CounterEvent.CLIENT_POOL_EXHAUSTION); + } else if (t instanceof SocketException) { + counter.increment(CounterEvent.SOCKET_EXCEPTION); + } else if (t instanceof IOException) { + counter.increment(CounterEvent.IO_EXCEPTION); + } else if (t instanceof CancellationException) { + counter.increment(CounterEvent.CANCELLATION_EXCEPTION); + } else if (t instanceof JsonParseException) { + counter.increment(CounterEvent.PARSING_EXCEPTION); + } else { + counter.increment(CounterEvent.NETTY_ERROR); + } + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + response.writeStringAndFlush(""); + return Observable.empty(); + }); + } catch (Throwable e) { + System.err.println("Server => Error [" + request.getPath() + "] ==> " + e); + e.printStackTrace(); + counter.increment(CounterEvent.NETTY_ERROR); + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return response.writeStringAndFlush("Error 400: Bad Request\n" + e.getMessage() + '\n'); + } + }).eventLoops(new NioEventLoopGroup(1), provider.globalServerEventLoop()) + .build() + .withErrorHandler(throwable -> Observable.empty()) + .withErrorResponseGenerator((response, error) -> System.err.println("Error: " + error.getMessage())) + .startAndWait(); + } + + private static void startMonitoring() { + int interval = 5; + Observable.interval(interval, TimeUnit.SECONDS).doOnNext(l -> { + + long totalRequestsInLastWindow = getRollingSum(CounterEvent.REQUESTS); + + if (totalRequestsInLastWindow <= 0) { + return; // Don't print anything if there weren't any requests coming. + } + + StringBuilder msg = new StringBuilder(); + msg.append("########################################################################################").append( + '\n'); + msg.append("Time since start (seconds): " + l * interval).append('\n'); + msg.append("########################################################################################").append( + '\n'); + + long successSum = counter.getCumulativeSum(CounterEvent.SUCCESS); + long hystrixFallbackSum = counter.getCumulativeSum(CounterEvent.HYSTRIX_FALLBACK); + long realSuccessSum = successSum - hystrixFallbackSum; + + msg.append("Total => "); + msg.append(" Requests: ").append(counter.getCumulativeSum(CounterEvent.REQUESTS)); + msg.append(" Success: ").append(successSum).append('(').append(realSuccessSum).append(" good/").append(hystrixFallbackSum).append(" fallbacks)"); + msg.append(" Error: ").append(counter.getCumulativeSum(CounterEvent.HTTP_ERROR)); + msg.append(" Netty Error: ").append(counter.getCumulativeSum(CounterEvent.NETTY_ERROR)); + msg.append(" Client Pool Exhausted: ").append(counter.getCumulativeSum(CounterEvent.CLIENT_POOL_EXHAUSTION)); + msg.append(" Socket Exception: ").append(counter.getCumulativeSum(CounterEvent.SOCKET_EXCEPTION)); + msg.append(" I/O Exception: ").append(counter.getCumulativeSum(CounterEvent.IO_EXCEPTION)); + msg.append(" Cancellation Exception: ").append(counter.getCumulativeSum(CounterEvent.CANCELLATION_EXCEPTION)); + msg.append(" Parsing Exception: ").append(counter.getCumulativeSum(CounterEvent.PARSING_EXCEPTION)); + msg.append(" Bytes: ").append(counter.getCumulativeSum(CounterEvent.BYTES) / 1024).append("kb"); + msg.append(" \n Rolling =>"); + msg.append(" Requests: ").append(getRollingSum(CounterEvent.REQUESTS)).append("/s"); + msg.append(" Success: ").append(getRollingSum(CounterEvent.SUCCESS)).append("/s"); + msg.append(" Error: ").append(getRollingSum(CounterEvent.HTTP_ERROR)).append("/s"); + msg.append(" Netty Error: ").append(getRollingSum(CounterEvent.NETTY_ERROR)).append("/s"); + msg.append(" Bytes: ").append(getRollingSum(CounterEvent.BYTES) / 1024).append("kb/s"); + msg.append(" \n Latency (ms) => 50th: ").append(latency.getPercentile(50.0)).append( + " 90th: ").append(latency.getPercentile(90.0)); + msg.append(" 99th: ").append(latency.getPercentile(99.0)).append(" 100th: ").append(latency.getPercentile( + 100.0)); + System.out.println(msg.toString()); + + StringBuilder n = new StringBuilder(); + HttpClient httpClient = route.getClient(); + n.append(" Netty => Used: ").append(httpClient.getStats().getInUseCount()); + n.append(" Idle: ").append(httpClient.getStats().getIdleCount()); + n.append(" Total Conns: ").append(httpClient.getStats().getTotalConnectionCount()); + n.append(" AcqReq: ").append(httpClient.getStats().getPendingAcquireRequestCount()); + n.append(" RelReq: ").append(httpClient.getStats().getPendingReleaseRequestCount()); + System.out.println(n.toString()); + }).subscribe(); + } + + private static long getRollingSum(CounterEvent e) { + long s = counter.getRollingSum(e); + if (s > 0) { + s /= rollingSeconds; + } + return s; + } +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java new file mode 100644 index 0000000..b42c6ad --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java @@ -0,0 +1,136 @@ +package perf.test.ribbon; + +import com.netflix.ribbon.ClientOptions; +import com.netflix.ribbon.http.HttpResourceGroup; +import com.netflix.ribbon.proxy.RibbonDynamicProxy; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.reactivex.netty.protocol.http.client.HttpClient; +import io.reactivex.netty.protocol.http.server.HttpServerRequest; +import io.reactivex.netty.protocol.http.server.HttpServerResponse; +import org.codehaus.jackson.JsonFactory; +import perf.test.utils.BackendResponse; +import perf.test.utils.JsonParseException; +import perf.test.utils.ServiceResponseBuilder; +import rx.Observable; + +import java.io.ByteArrayOutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * @author Tomasz Bak + */ +public class TestRouteBasic { + private static JsonFactory jsonFactory = new JsonFactory(); + + private final HttpResourceGroup httpResourceGroup; + private final MockBackendService service; + + public TestRouteBasic(String backendHost, int backendPort) { + httpResourceGroup = new HttpResourceGroup("performanceTest", ClientOptions.create() + .withMaxAutoRetries(0) + .withMaxAutoRetriesNextServer(0) + .withConnectTimeout(30000) + .withReadTimeout(30000) + .withMaxConnectionsPerHost(10000) + .withMaxTotalConnections(10000) + .withConfigurationBasedServerList(backendHost + ":" + backendPort)); + service = RibbonDynamicProxy.newInstance(MockBackendService.class, httpResourceGroup); + } + + public Observable handle(HttpServerRequest request, HttpServerResponse response) { + long startTime = System.currentTimeMillis(); + List _id = request.getQueryParameters().get("id"); + + if (_id == null || _id.size() != 1) { + return writeError(request, response, "Please provide a numerical 'id' value. It can be a random number (uuid)."); + } + long id = Long.parseLong(String.valueOf(_id.get(0))); + + + Observable> acd = getDataFromBackend(2, 50, 50, id) + .doOnError(Throwable::printStackTrace) + // Eclipse 20140224-0627 can't infer without this type hint even though the Java 8 compiler can + .>flatMap(responseA -> { + Observable responseC = getDataFromBackend(1, 5000, 80, responseA.getResponseKey()); + Observable responseD = getDataFromBackend(1, 1000, 1, responseA.getResponseKey()); + return Observable.zip(Observable.just(responseA), responseC, responseD, + Arrays::asList); + }).doOnError(Throwable::printStackTrace); + + Observable> be = getDataFromBackend(25, 30, 150, id) + // Eclipse 20140224-0627 can't infer without this type hint even though the Java 8 compiler can + .>flatMap(responseB -> { + Observable responseE = getDataFromBackend(100, 30, 4, responseB.getResponseKey()); + return Observable.zip(Observable.just(responseB), responseE, Arrays::asList); + }).doOnError(Throwable::printStackTrace); + + return Observable.zip(acd, be, (_acd, _be) -> { + BackendResponse responseA = _acd.get(0); + BackendResponse responseB = _be.get(0); + BackendResponse responseC = _acd.get(1); + BackendResponse responseD = _acd.get(2); + BackendResponse responseE = _be.get(1); + + BackendResponse[] backendResponses = {responseA, responseB, responseC, responseD, responseE}; + checkForHystrixCallbacks(backendResponses); + return backendResponses; + }).flatMap(backendResponses -> { + try { + ByteArrayOutputStream responseStream = ServiceResponseBuilder.buildTestAResponse(jsonFactory, backendResponses); + // set response header + response.getHeaders().addHeader("Content-Type", "application/json"); + // performance headers + addResponseHeaders(response, startTime); + int contentLength = responseStream.size(); + response.getHeaders().addHeader("Content-Length", contentLength); + return response.writeBytesAndFlush(responseStream.toByteArray()); + } catch (Exception e) { + e.printStackTrace(); + return writeError(request, response, "Failed: " + e.getMessage()); + } + }).doOnError(Throwable::printStackTrace); + } + + private void checkForHystrixCallbacks(BackendResponse[] backendResponses) { + for (BackendResponse r : backendResponses) { + if (r.isFallback()) { + StartServer.counter.increment(CounterEvent.HYSTRIX_FALLBACK); + return; + } + } + } + + public HttpClient getClient() { + return httpResourceGroup.getClient(); + } + + /** + * Add various headers used for logging and statistics. + */ + private static void addResponseHeaders(HttpServerResponse response, long startTime) { + Map perfResponseHeaders = ServiceResponseBuilder.getPerfResponseHeaders(startTime); + for (Map.Entry entry : perfResponseHeaders.entrySet()) { + response.getHeaders().add(entry.getKey(), entry.getValue()); + } + } + + private Observable getDataFromBackend(Integer numItems, Integer itemSize, Integer delay, Long id) { + return service.request(numItems, itemSize, delay, id).observe().flatMap(b -> { + try { + return Observable.just(BackendResponse.fromJson(jsonFactory, new ByteBufInputStream(b))); + } catch (JsonParseException e) { + return Observable.error(e); + } + }); + } + + private static Observable writeError(HttpServerRequest request, HttpServerResponse response, String message) { + System.err.println("Server => Error [" + request.getPath() + "] => " + message); + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return response.writeStringAndFlush("Error 500: " + message + "\n"); + } +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteHello.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteHello.java new file mode 100644 index 0000000..0114cab --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteHello.java @@ -0,0 +1,23 @@ +package perf.test.ribbon; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.HttpHeaders; +import io.reactivex.netty.protocol.http.server.HttpServerRequest; +import io.reactivex.netty.protocol.http.server.HttpServerResponse; +import rx.Observable; + +/** + * @author Nitesh Kant + */ +public class TestRouteHello { + + private static final byte[] MSG = "Hello World".getBytes(); + public static final int HELLO_WORLD_LENGTH = MSG.length; + public static final String HELLO_WORLD_LENGTH_STR = String.valueOf(MSG.length); + + public Observable handle(HttpServerRequest request, HttpServerResponse response) { + response.getHeaders().set(HttpHeaders.Names.CONTENT_LENGTH, HELLO_WORLD_LENGTH_STR); + response.write(response.getAllocator().buffer(HELLO_WORLD_LENGTH).writeBytes(MSG)); + return response.close(); + } +} diff --git a/ws-impls/ws-java-rxnetty/build.gradle b/ws-impls/ws-java-rxnetty/build.gradle index 1d741bc..7606d91 100644 --- a/ws-impls/ws-java-rxnetty/build.gradle +++ b/ws-impls/ws-java-rxnetty/build.gradle @@ -7,9 +7,6 @@ sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 dependencies { - compile 'com.netflix.rxnetty:rx-netty:0.3.6' - compile 'com.netflix.rxjava:rxjava-core:0.18.2' - compile 'com.netflix.numerus:numerus:1.1' compile 'org.codehaus.jackson:jackson-core-asl:1.9.13' compile 'org.codehaus.jackson:jackson-mapper-asl:1.9.13' compile project(':ws-impls:ws-impl-utils') diff --git a/ws-impls/ws-java-servlet-blocking/build.gradle b/ws-impls/ws-java-servlet-blocking/build.gradle index 4eb7c71..73f9f15 100644 --- a/ws-impls/ws-java-servlet-blocking/build.gradle +++ b/ws-impls/ws-java-servlet-blocking/build.gradle @@ -5,10 +5,7 @@ apply plugin: 'jetty' dependencies { compile 'org.codehaus.jackson:jackson-core-asl:1.9.2' compile 'org.apache.httpcomponents:httpclient:4.3.1' - compile 'org.slf4j:slf4j-api:1.7.0' - runtime 'org.slf4j:slf4j-simple:1.7.0' compile project(':ws-impls:ws-impl-utils') - provided 'junit:junit-dep:4.10' compile 'javax.servlet:javax.servlet-api:3.0.1' } From e84ba4887a7c9864f4721281ce0d9afb356f460c Mon Sep 17 00:00:00 2001 From: Tomasz Bak Date: Mon, 7 Jul 2014 14:58:14 -0700 Subject: [PATCH 2/4] Ribbon module further updates. --- build.gradle | 22 ++++++----- gradle.properties | 5 +++ settings.gradle | 2 +- .../src/main/java/perf/client/WSClient.java | 31 ++++++++-------- ws-impls/ws-java-ribbon/build.gradle | 5 ++- .../perf/test/ribbon/MockBackendService.java | 6 --- .../java/perf/test/ribbon/StartServer.java | 37 ++++++++++--------- .../java/perf/test/ribbon/TestRouteBasic.java | 6 +-- .../java/perf/test/rxnetty/StartServer.java | 13 ++++--- 9 files changed, 66 insertions(+), 61 deletions(-) diff --git a/build.gradle b/build.gradle index b4cf392..1df933c 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,10 @@ buildscript { } allprojects { - repositories { mavenCentral() } + repositories { + mavenLocal() + mavenCentral() + } } apply from: file('gradle/convention.gradle') @@ -18,14 +21,15 @@ subprojects { sourceSets.test.java.srcDir 'src/main/java' dependencies { - compile 'io.netty:netty-common:4.0.21.Final' - compile 'io.netty:netty-transport:4.0.21.Final' - compile 'com.netflix.rxnetty:rx-netty:0.3.7' - compile 'org.slf4j:slf4j-api:1.7.0' - runtime 'org.slf4j:slf4j-simple:1.7.0' - compile 'com.netflix.numerus:numerus:1.1' - testCompile 'junit:junit-dep:4.10' - testCompile 'org.mockito:mockito-core:1.8.5' + compile "io.netty:netty-common:${netty_version}" + compile "io.netty:netty-transport:${netty_version}" + compile "com.netflix.rxnetty:rx-netty:${rxnetty_version}" + compile "com.netflix.rxnetty:rx-netty-servo:${rxnetty_version}" + compile "org.slf4j:slf4j-api:${slf4j_version}" + runtime "org.slf4j:slf4j-simple:${slf4j_version}" + compile "com.netflix.numerus:numerus:1.1" + testCompile "junit:junit-dep:4.10" + testCompile "org.mockito:mockito-core:1.8.5" } tasks.withType(Javadoc).each { diff --git a/gradle.properties b/gradle.properties index 85d8530..24bd20c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1,6 @@ version=0.1-SNAPSHOT + +slf4j_version=1.7.6 +netty_version=4.0.21.Final +rxnetty_version=0.3.8 +ribbon_version=2.0.0 diff --git a/settings.gradle b/settings.gradle index be0b7dc..2a05a98 100644 --- a/settings.gradle +++ b/settings.gradle @@ -5,4 +5,4 @@ include 'ws-backend-mock', \ 'ws-impls:ws-java-servlet-blocking', \ 'ws-impls:ws-java-jetty', \ 'ws-impls:ws-java-rxnetty', \ -'ws-impls/ws-java-ribbon' \ No newline at end of file +'ws-impls:ws-java-ribbon' \ No newline at end of file diff --git a/ws-client/src/main/java/perf/client/WSClient.java b/ws-client/src/main/java/perf/client/WSClient.java index 931b6a3..13d515d 100644 --- a/ws-client/src/main/java/perf/client/WSClient.java +++ b/ws-client/src/main/java/perf/client/WSClient.java @@ -5,10 +5,10 @@ import com.netflix.numerus.NumerusRollingPercentile; import io.netty.buffer.ByteBuf; import io.reactivex.netty.client.PoolExhaustedException; -import io.reactivex.netty.client.PoolStats; import io.reactivex.netty.protocol.http.client.HttpClient; import io.reactivex.netty.protocol.http.client.HttpClientBuilder; import io.reactivex.netty.protocol.http.client.HttpClientRequest; +import io.reactivex.netty.servo.http.HttpClientListener; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -17,13 +17,10 @@ import org.codehaus.jackson.map.ObjectMapper; import rx.Observable; -import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; -import java.io.FileWriter; import java.io.IOException; import java.io.OutputStream; -import java.io.Writer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -102,6 +99,7 @@ public static void main(String[] rawArgs) { private final Observable client; private final HttpClient httpClient; + private final HttpClientListener httpClientListener; private final ObjectMapper jsonMapper = new ObjectMapper(); @@ -122,6 +120,8 @@ public WSClient(String host, int port, int firstStep, int stepDuration, String q .withMaxConnections(15000) .config(new HttpClient.HttpClientConfig.Builder().readTimeout(1, TimeUnit.MINUTES).build()) .build(); + httpClientListener = HttpClientListener.newHttpListener("ws-client"); + httpClient.subscribe(httpClientListener); client = httpClient.submit(HttpClientRequest.createGet(this.query)) .flatMap(response -> { if (response.getStatus().code() == 200) { @@ -162,7 +162,7 @@ public Observable startLoad() { Observable> stepIntervals = Observable.timer(0, stepDuration, TimeUnit.SECONDS).map(l -> l + firstStep) .map(step -> { - long rps = step * 1000; + long rps = step * 10; long interval = TimeUnit.SECONDS.toMicros(1) / rps; StringBuilder str = new StringBuilder(); str.append('\n'); @@ -223,11 +223,11 @@ private void startMonitoring() { System.out.println(msg.toString()); StringBuilder n = new StringBuilder(); - n.append(" Netty => Used: ").append(httpClient.getStats().getInUseCount()); - n.append(" Idle: ").append(httpClient.getStats().getIdleCount()); - n.append(" Total Conns: ").append(httpClient.getStats().getTotalConnectionCount()); - n.append(" AcqReq: ").append(httpClient.getStats().getPendingAcquireRequestCount()); - n.append(" RelReq: ").append(httpClient.getStats().getPendingReleaseRequestCount()); + n.append(" Netty => Used: ").append(httpClientListener.getLiveConnections()); + n.append(" Idle: ").append(httpClientListener.getConnectionCount() - httpClientListener.getLiveConnections()); + n.append(" Total Conns: ").append(httpClientListener.getConnectionCount()); + n.append(" AcqReq: ").append(httpClientListener.getPendingPoolAcquires()); + n.append(" RelReq: ").append(httpClientListener.getPendingPoolReleases()); System.out.println(n.toString()); if (enableJsonLogging) { @@ -249,12 +249,11 @@ private void startMonitoring() { m.put("rollingLatency99", latency.getPercentile(99.0)); m.put("rollingLatencyMax", latency.getPercentile(100.0)); - PoolStats poolStats = httpClient.getStats(); - m.put("connsInUse", poolStats.getInUseCount()); - m.put("connsIdeal", poolStats.getIdleCount()); - m.put("connsTotal", poolStats.getTotalConnectionCount()); - m.put("connsPendingAcquire", poolStats.getPendingAcquireRequestCount()); - m.put("connsPendingRelease", poolStats.getPendingReleaseRequestCount()); + m.put("connsInUse", httpClientListener.getLiveConnections()); + m.put("connsIdeal", httpClientListener.getConnectionCount() - httpClientListener.getLiveConnections()); + m.put("connsTotal", httpClientListener.getConnectionCount()); + m.put("connsPendingAcquire", httpClientListener.getPendingPoolAcquires()); + m.put("connsPendingRelease", httpClientListener.getPendingPoolReleases()); String statMsg = jsonMapper.writeValueAsString(m); if (this.statsOutputStream != null) { diff --git a/ws-impls/ws-java-ribbon/build.gradle b/ws-impls/ws-java-ribbon/build.gradle index 61ad0b6..af2982e 100644 --- a/ws-impls/ws-java-ribbon/build.gradle +++ b/ws-impls/ws-java-ribbon/build.gradle @@ -17,12 +17,13 @@ dependencies { /** * ../../gradlew start - * ../../gradlew start -P'a=EVENTLOOPS PORT BACKEND_HOST BACKEND_PORT' - * ../../gradlew start -P'a=1 8888 ec2-54-87-12-221.compute-1.amazonaws.com 8989' + * ../../gradlew start -P'a=EVENTLOOPS PORT BACKEND_SERVER_LIST' + * ../../gradlew start -P'a=1 8888 host1:8989,host2:8989' */ task start(type:JavaExec) { main = "perf.test.ribbon.StartServer" classpath = sourceSets.main.runtimeClasspath + jvmArgs(['-Dorg.slf4j.simpleLogger.defaultLogLevel=debug']) if (project.hasProperty('a')) { args(a.split(' ')) } diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/MockBackendService.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/MockBackendService.java index 7b82289..2f297de 100644 --- a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/MockBackendService.java +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/MockBackendService.java @@ -10,12 +10,6 @@ /** * Maps mock backend request to Java interface. - *

- * A) GET http://hostname:9100/mock.json?numItems=2&itemSize=50&delay=50&id={uuid} - * B) GET http://hostname:9100/mock.json?numItems=25&itemSize=30&delay=150&id={uuid} - * C) GET http://hostname:9100/mock.json?numItems=1&itemSize=5000&delay=80&id={a.responseKey} - * D) GET http://hostname:9100/mock.json?numItems=1&itemSize=1000&delay=1&id={a.responseKey} - * E) GET http://hostname:9100/mock.json?numItems=100&itemSize=30&delay=4&id={b.responseKey} * * @author Tomasz Bak */ diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java index f7c7943..a851445 100644 --- a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java @@ -35,32 +35,30 @@ public static void main(String[] args) { int eventLoops = Runtime.getRuntime().availableProcessors(); int port = 8888; - String backendHost = "127.0.0.1"; - int backendPort = 8989; + String backendServerList = "127.0.0.1:8989"; if (args.length == 0) { // use defaults - } else if (args.length == 4) { + } else if (args.length == 3) { eventLoops = Integer.parseInt(args[0]); port = Integer.parseInt(args[1]); - backendHost = args[2]; - backendPort = Integer.parseInt(args[3]); + backendServerList = args[2]; } else { System.err.println( - "Execute with either no argument (for defaults) or 4 arguments: EVENTLOOPS, PORT, BACKEND_HOST, BACKEND_PORT"); + "Execute with either no argument (for defaults) or 4 arguments: EVENTLOOPS, PORT, BACKEND_HOST_LIST"); System.exit(-1); } - System.out.println(String.format("Using eventloops: %d port: %d backend host: %s backend port: %d", eventLoops, - port, backendHost, backendPort)); + System.out.println(String.format("Using eventloops: %d port: %d backend server list: %s", eventLoops, + port, backendServerList)); - route = new TestRouteBasic(backendHost, backendPort); + route = new TestRouteBasic(backendServerList); routeHello = new TestRouteHello(); SingleNioLoopProvider provider = new SingleNioLoopProvider(eventLoops); RxNetty.useEventLoopProvider(provider); - System.out.println("Starting service on port " + port + " with backend at " + backendHost + ':' + backendPort + " ..."); + System.out.println("Starting service on port " + port + " with backend servers " + backendServerList + " ..."); startMonitoring(); RxNetty.newHttpServerBuilder(port, (request, response) -> { try { @@ -126,11 +124,13 @@ private static void startMonitoring() { long successSum = counter.getCumulativeSum(CounterEvent.SUCCESS); long hystrixFallbackSum = counter.getCumulativeSum(CounterEvent.HYSTRIX_FALLBACK); - long realSuccessSum = successSum - hystrixFallbackSum; + long successRolling = counter.getRollingSum(CounterEvent.SUCCESS); + long hystrixFallbackRolling = counter.getRollingSum(CounterEvent.HYSTRIX_FALLBACK); msg.append("Total => "); msg.append(" Requests: ").append(counter.getCumulativeSum(CounterEvent.REQUESTS)); - msg.append(" Success: ").append(successSum).append('(').append(realSuccessSum).append(" good/").append(hystrixFallbackSum).append(" fallbacks)"); + msg.append(" Success: ").append(successSum).append('(').append(successSum - hystrixFallbackSum) + .append(" good/").append(hystrixFallbackSum).append(" fallbacks)"); msg.append(" Error: ").append(counter.getCumulativeSum(CounterEvent.HTTP_ERROR)); msg.append(" Netty Error: ").append(counter.getCumulativeSum(CounterEvent.NETTY_ERROR)); msg.append(" Client Pool Exhausted: ").append(counter.getCumulativeSum(CounterEvent.CLIENT_POOL_EXHAUSTION)); @@ -141,7 +141,8 @@ private static void startMonitoring() { msg.append(" Bytes: ").append(counter.getCumulativeSum(CounterEvent.BYTES) / 1024).append("kb"); msg.append(" \n Rolling =>"); msg.append(" Requests: ").append(getRollingSum(CounterEvent.REQUESTS)).append("/s"); - msg.append(" Success: ").append(getRollingSum(CounterEvent.SUCCESS)).append("/s"); + msg.append(" Success: ").append(successRolling).append('(').append(successRolling - hystrixFallbackRolling) + .append(" good/").append(hystrixFallbackRolling).append(" fallbacks)"); msg.append(" Error: ").append(getRollingSum(CounterEvent.HTTP_ERROR)).append("/s"); msg.append(" Netty Error: ").append(getRollingSum(CounterEvent.NETTY_ERROR)).append("/s"); msg.append(" Bytes: ").append(getRollingSum(CounterEvent.BYTES) / 1024).append("kb/s"); @@ -153,11 +154,11 @@ private static void startMonitoring() { StringBuilder n = new StringBuilder(); HttpClient httpClient = route.getClient(); - n.append(" Netty => Used: ").append(httpClient.getStats().getInUseCount()); - n.append(" Idle: ").append(httpClient.getStats().getIdleCount()); - n.append(" Total Conns: ").append(httpClient.getStats().getTotalConnectionCount()); - n.append(" AcqReq: ").append(httpClient.getStats().getPendingAcquireRequestCount()); - n.append(" RelReq: ").append(httpClient.getStats().getPendingReleaseRequestCount()); +// n.append(" Netty => Used: ").append(httpClient.getStats().getInUseCount()); +// n.append(" Idle: ").append(httpClient.getStats().getIdleCount()); +// n.append(" Total Conns: ").append(httpClient.getStats().getTotalConnectionCount()); +// n.append(" AcqReq: ").append(httpClient.getStats().getPendingAcquireRequestCount()); +// n.append(" RelReq: ").append(httpClient.getStats().getPendingReleaseRequestCount()); System.out.println(n.toString()); }).subscribe(); } diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java index b42c6ad..6f3207a 100644 --- a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java @@ -29,7 +29,7 @@ public class TestRouteBasic { private final HttpResourceGroup httpResourceGroup; private final MockBackendService service; - public TestRouteBasic(String backendHost, int backendPort) { + public TestRouteBasic(String backendServerList) { httpResourceGroup = new HttpResourceGroup("performanceTest", ClientOptions.create() .withMaxAutoRetries(0) .withMaxAutoRetriesNextServer(0) @@ -37,7 +37,7 @@ public TestRouteBasic(String backendHost, int backendPort) { .withReadTimeout(30000) .withMaxConnectionsPerHost(10000) .withMaxTotalConnections(10000) - .withConfigurationBasedServerList(backendHost + ":" + backendPort)); + .withConfigurationBasedServerList(backendServerList)); service = RibbonDynamicProxy.newInstance(MockBackendService.class, httpResourceGroup); } @@ -119,7 +119,7 @@ private static void addResponseHeaders(HttpServerResponse response, long star } private Observable getDataFromBackend(Integer numItems, Integer itemSize, Integer delay, Long id) { - return service.request(numItems, itemSize, delay, id).observe().flatMap(b -> { + return service.request(numItems, itemSize, delay, id).toObservable().flatMap(b -> { try { return Observable.just(BackendResponse.fromJson(jsonFactory, new ByteBufInputStream(b))); } catch (JsonParseException e) { diff --git a/ws-impls/ws-java-rxnetty/src/main/java/perf/test/rxnetty/StartServer.java b/ws-impls/ws-java-rxnetty/src/main/java/perf/test/rxnetty/StartServer.java index aaae2df..7605ffe 100644 --- a/ws-impls/ws-java-rxnetty/src/main/java/perf/test/rxnetty/StartServer.java +++ b/ws-impls/ws-java-rxnetty/src/main/java/perf/test/rxnetty/StartServer.java @@ -6,6 +6,7 @@ import io.netty.channel.nio.NioEventLoop; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.logging.LogLevel; import io.reactivex.netty.RxNetty; import io.reactivex.netty.channel.SingleNioLoopProvider; import io.reactivex.netty.client.PoolExhaustedException; @@ -99,7 +100,7 @@ public static void main(String[] args) { response.setStatus(HttpResponseStatus.BAD_REQUEST); return response.writeStringAndFlush("Error 400: Bad Request\n" + e.getMessage() + '\n'); } - }).eventLoops(new NioEventLoopGroup(1), provider.globalServerEventLoop()).build() + }).enableWireLogging(LogLevel.ERROR).eventLoops(new NioEventLoopGroup(1), provider.globalServerEventLoop()).build() .withErrorHandler(throwable -> Observable.empty()) .withErrorResponseGenerator((response, error) -> System.err.println("Error: " + error.getMessage())) .startAndWait(); @@ -147,11 +148,11 @@ private static void startMonitoring() { StringBuilder n = new StringBuilder(); HttpClient httpClient = route.getClient(); - n.append(" Netty => Used: ").append(httpClient.getStats().getInUseCount()); - n.append(" Idle: ").append(httpClient.getStats().getIdleCount()); - n.append(" Total Conns: ").append(httpClient.getStats().getTotalConnectionCount()); - n.append(" AcqReq: ").append(httpClient.getStats().getPendingAcquireRequestCount()); - n.append(" RelReq: ").append(httpClient.getStats().getPendingReleaseRequestCount()); +// n.append(" Netty => Used: ").append(httpClient.getStats().getInUseCount()); +// n.append(" Idle: ").append(httpClient.getStats().getIdleCount()); +// n.append(" Total Conns: ").append(httpClient.getStats().getTotalConnectionCount()); +// n.append(" AcqReq: ").append(httpClient.getStats().getPendingAcquireRequestCount()); +// n.append(" RelReq: ").append(httpClient.getStats().getPendingReleaseRequestCount()); System.out.println(n.toString()); }).subscribe(); } From 4847bc3a715add35e5633ead7b0cf11b72a29dde Mon Sep 17 00:00:00 2001 From: Tomasz Bak Date: Mon, 7 Jul 2014 16:19:06 -0700 Subject: [PATCH 3/4] Minor update. --- build.gradle | 1 - ws-client/src/main/java/perf/client/WSClient.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 1df933c..664e33b 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,6 @@ buildscript { allprojects { repositories { - mavenLocal() mavenCentral() } } diff --git a/ws-client/src/main/java/perf/client/WSClient.java b/ws-client/src/main/java/perf/client/WSClient.java index e13d1c6..469de77 100644 --- a/ws-client/src/main/java/perf/client/WSClient.java +++ b/ws-client/src/main/java/perf/client/WSClient.java @@ -163,7 +163,7 @@ public Observable startLoad() { Observable> stepIntervals = Observable.timer(0, stepDuration, TimeUnit.SECONDS).map(l -> l + firstStep) .map(step -> { - long rps = step * 100; + long rps = step * 1000; long interval = TimeUnit.SECONDS.toMicros(1) / rps; StringBuilder str = new StringBuilder(); str.append('\n'); From e3bd25d6f3d6974be5c51c345a460c97acf9ec12 Mon Sep 17 00:00:00 2001 From: Tomasz Bak Date: Tue, 8 Jul 2014 08:36:08 -0700 Subject: [PATCH 4/4] Add pool statistics to ribbon module. A few other minor updates. --- gradle.properties | 2 +- ws-impls/ws-java-ribbon/build.gradle | 13 ++- .../ribbon/ConnectionPoolMetricListener.java | 86 +++++++++++++++++++ .../java/perf/test/ribbon/StartServer.java | 12 +-- .../java/perf/test/ribbon/TestRouteBasic.java | 16 ++-- 5 files changed, 109 insertions(+), 20 deletions(-) create mode 100644 ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/ConnectionPoolMetricListener.java diff --git a/gradle.properties b/gradle.properties index 24bd20c..7cec7c9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,4 +3,4 @@ version=0.1-SNAPSHOT slf4j_version=1.7.6 netty_version=4.0.21.Final rxnetty_version=0.3.8 -ribbon_version=2.0.0 +ribbon_version=2.0-RC1 diff --git a/ws-impls/ws-java-ribbon/build.gradle b/ws-impls/ws-java-ribbon/build.gradle index faabe14..38bbe09 100644 --- a/ws-impls/ws-java-ribbon/build.gradle +++ b/ws-impls/ws-java-ribbon/build.gradle @@ -1,20 +1,17 @@ apply plugin: "eclipse" apply plugin: "idea" -apply plugin: 'java' -apply plugin: 'application' +apply plugin: "java" +apply plugin: "application" sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 dependencies { - compile 'com.netflix.ribbon:ribbon:2.0.0' - compile 'org.codehaus.jackson:jackson-core-asl:1.9.13' - compile 'org.codehaus.jackson:jackson-mapper-asl:1.9.13' - compile project(':ws-impls:ws-impl-utils') - compile project(':ws-backend-mock') + compile "com.netflix.ribbon:ribbon:${ribbon_version}" + compile project(":ws-impls:ws-impl-utils") + compile project(":ws-backend-mock") } - /** * ../../gradlew start * ../../gradlew start -P'a=EVENTLOOPS PORT BACKEND_SERVER_LIST' diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/ConnectionPoolMetricListener.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/ConnectionPoolMetricListener.java new file mode 100644 index 0000000..95d2c48 --- /dev/null +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/ConnectionPoolMetricListener.java @@ -0,0 +1,86 @@ +package perf.test.ribbon; + +import io.reactivex.netty.metrics.HttpClientMetricEventsListener; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +/** + * @author Nitesh Kant + */ +public class ConnectionPoolMetricListener extends HttpClientMetricEventsListener { + + private LongAdder inUseCount = new LongAdder(); + private LongAdder idleCount = new LongAdder(); + private LongAdder totalConnections = new LongAdder(); + private LongAdder pendingAcquire = new LongAdder(); + private LongAdder pendingRelease = new LongAdder(); + + @Override + protected void onPoolReleaseFailed(long duration, TimeUnit timeUnit, Throwable throwable) { + pendingRelease.decrement(); + } + + @Override + protected void onPoolReleaseSuccess(long duration, TimeUnit timeUnit) { + pendingRelease.decrement(); + idleCount.increment(); + inUseCount.decrement(); + } + + @Override + protected void onPooledConnectionReuse(long duration, TimeUnit timeUnit) { + idleCount.decrement(); + } + + @Override + protected void onPoolReleaseStart() { + pendingRelease.increment(); + } + + @Override + protected void onPoolAcquireStart() { + pendingAcquire.increment(); + } + + @Override + protected void onPoolAcquireFailed(long duration, TimeUnit timeUnit, Throwable throwable) { + pendingAcquire.decrement(); + } + + @Override + protected void onPoolAcquireSuccess(long duration, TimeUnit timeUnit) { + pendingAcquire.decrement(); + inUseCount.increment(); + } + + @Override + protected void onConnectionCloseSuccess(long duration, TimeUnit timeUnit) { + totalConnections.decrement(); + } + + @Override + protected void onConnectSuccess(long duration, TimeUnit timeUnit) { + totalConnections.increment(); + } + + public long getInUseCount() { + return inUseCount.longValue(); + } + + public long getIdleCount() { + return idleCount.longValue(); + } + + public long getTotalConnections() { + return totalConnections.longValue(); + } + + public long getPendingAcquire() { + return pendingAcquire.longValue(); + } + + public long getPendingRelease() { + return pendingRelease.longValue(); + } +} diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java index a851445..437a499 100644 --- a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/StartServer.java @@ -153,12 +153,12 @@ private static void startMonitoring() { System.out.println(msg.toString()); StringBuilder n = new StringBuilder(); - HttpClient httpClient = route.getClient(); -// n.append(" Netty => Used: ").append(httpClient.getStats().getInUseCount()); -// n.append(" Idle: ").append(httpClient.getStats().getIdleCount()); -// n.append(" Total Conns: ").append(httpClient.getStats().getTotalConnectionCount()); -// n.append(" AcqReq: ").append(httpClient.getStats().getPendingAcquireRequestCount()); -// n.append(" RelReq: ").append(httpClient.getStats().getPendingReleaseRequestCount()); + ConnectionPoolMetricListener stats = route.getStats(); + n.append(" Netty => Used: ").append(stats.getInUseCount()); + n.append(" Idle: ").append(stats.getIdleCount()); + n.append(" Total Conns: ").append(stats.getTotalConnections()); + n.append(" AcqReq: ").append(stats.getPendingAcquire()); + n.append(" RelReq: ").append(stats.getPendingRelease()); System.out.println(n.toString()); }).subscribe(); } diff --git a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java index 6f3207a..577b4fa 100644 --- a/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java +++ b/ws-impls/ws-java-ribbon/src/main/java/perf/test/ribbon/TestRouteBasic.java @@ -6,7 +6,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.handler.codec.http.HttpResponseStatus; -import io.reactivex.netty.protocol.http.client.HttpClient; import io.reactivex.netty.protocol.http.server.HttpServerRequest; import io.reactivex.netty.protocol.http.server.HttpServerResponse; import org.codehaus.jackson.JsonFactory; @@ -24,6 +23,9 @@ * @author Tomasz Bak */ public class TestRouteBasic { + + private final ConnectionPoolMetricListener metricListener; + private static JsonFactory jsonFactory = new JsonFactory(); private final HttpResourceGroup httpResourceGroup; @@ -38,6 +40,10 @@ public TestRouteBasic(String backendServerList) { .withMaxConnectionsPerHost(10000) .withMaxTotalConnections(10000) .withConfigurationBasedServerList(backendServerList)); + + metricListener = new ConnectionPoolMetricListener(); + httpResourceGroup.getClient().subscribe(metricListener); + service = RibbonDynamicProxy.newInstance(MockBackendService.class, httpResourceGroup); } @@ -95,6 +101,10 @@ public Observable handle(HttpServerRequest request, HttpServerRes }).doOnError(Throwable::printStackTrace); } + public ConnectionPoolMetricListener getStats() { + return metricListener; + } + private void checkForHystrixCallbacks(BackendResponse[] backendResponses) { for (BackendResponse r : backendResponses) { if (r.isFallback()) { @@ -104,10 +114,6 @@ private void checkForHystrixCallbacks(BackendResponse[] backendResponses) { } } - public HttpClient getClient() { - return httpResourceGroup.getClient(); - } - /** * Add various headers used for logging and statistics. */