Skip to content

Commit

Permalink
Merge pull request #742 from allegro/release-0.11.2
Browse files Browse the repository at this point in the history
Release 0.11.2
  • Loading branch information
druminski authored Mar 15, 2017
2 parents a37f0bd + 4557cd3 commit 17947ee
Show file tree
Hide file tree
Showing 29 changed files with 338 additions and 53 deletions.
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
## 0.11.2 (15.03.2017)

### Enhancements

#### ([738](https://github.com/allegro/hermes/pull/738)) Payload content-type check and error handling

From now on, clients who send HTTP request without specified `Content-Type` on an Avro topic will receive proper error message.

#### ([739](https://github.com/allegro/hermes/pull/739)) Added latency metrics for schema registry

Metrics are available in the following path:

```
schema.<schema-repo-type>.latency.read-schema
```

### Bugfixes

#### ([740](https://github.com/allegro/hermes/issues/740)) Invalid metrics names in zookeeper for topics with underscore in name

## 0.11.1 (7.03.2017)

### Features
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.http;
package pl.allegro.tech.hermes.api;

public class AvroMediaType {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY;

@Path("topics")
public interface TopicEndpoint {
Expand Down Expand Up @@ -69,7 +69,7 @@ List<String> queryList(
Response publishMessage(@PathParam("topicName") String qualifiedTopicName, String message);

@POST
@Consumes(MediaType.TEXT_PLAIN)
@Consumes(AVRO_BINARY)
@Produces(APPLICATION_JSON)
@Path("/{topicName}")
Response publishMessage(@PathParam("topicName") String qualifiedTopicName, byte[] message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,12 @@ public UnsupportedContentTypeException(Subscription subscription) {
));
}

public UnsupportedContentTypeException(String payloadContentType, Topic topic) {
super(String.format(
"Unsupported payload content type header %s for topic %s",
payloadContentType,
topic.getQualifiedName()
));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.metric.timer.ConsumerLatencyTimer;
import pl.allegro.tech.hermes.common.schema.SchemaRepositoryType;
import pl.allegro.tech.hermes.metrics.PathContext;
import pl.allegro.tech.hermes.metrics.PathsCompiler;

Expand Down Expand Up @@ -177,6 +178,10 @@ private String metricRegistryName(String metricDisplayName) {
return pathCompiler.compile(metricDisplayName);
}

public Timer schemaTimer(String schemaMetric, SchemaRepositoryType schemaRepoType) {
return metricRegistry.timer(pathCompiler.compile(schemaMetric, pathContext().withSchemaRepoType(schemaRepoType.toString()).build()));
}

public Timer executorDurationTimer(String executorName) {
return metricRegistry.timer(pathCompiler.compile(Timers.EXECUTOR_DURATION, pathContext().withExecutorName(executorName).build()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static pl.allegro.tech.hermes.metrics.PathsCompiler.OAUTH_PROVIDER_NAME;
import static pl.allegro.tech.hermes.metrics.PathsCompiler.SUBSCRIPTION;
import static pl.allegro.tech.hermes.metrics.PathsCompiler.TOPIC;
import static pl.allegro.tech.hermes.metrics.PathsCompiler.SCHEMA_REPO_TYPE;

public class Timers {

Expand Down Expand Up @@ -33,6 +34,10 @@ public class Timers {

READ_LATENCY = "read-latency",

SCHEMA = "schema." + SCHEMA_REPO_TYPE,
GET_SCHEMA_LATENCY = SCHEMA + ".get-schema",
GET_SCHEMA_VERSIONS_LATENCY = SCHEMA + ".get-schema-versions",

CONSUMER_WORKLOAD_REBALANCE_DURATION = "consumers-workload." + KAFKA_CLUSTER + ".selective.rebalance-duration",

OAUTH_PROVIDER_TOKEN_REQUEST_LATENCY = "oauth.provider." + OAUTH_PROVIDER_NAME + ".token-request-latency",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private static TopicName escapedTopicName(String qualifiedTopicName) {
TopicName topicName = fromQualifiedName(qualifiedTopicName);
return new TopicName(
escapeMetricsReplacementChar(topicName.getGroupName()),
escapeMetricsReplacementChar(topicName.getName())
topicName.getName()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.frontend.metric;
package pl.allegro.tech.hermes.common.metric.timer;

import com.codahale.metrics.Timer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.glassfish.jersey.client.ClientProperties;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.schema.RawSchemaClient;
import pl.allegro.tech.hermes.schema.confluent.SchemaRegistryRawSchemaClient;
import pl.allegro.tech.hermes.schema.schemarepo.SchemaRepoRawSchemaClient;
Expand All @@ -17,10 +18,12 @@
public class RawSchemaClientFactory implements Factory<RawSchemaClient> {

private final ConfigFactory configFactory;
private final HermesMetrics hermesMetrics;

@Inject
public RawSchemaClientFactory(ConfigFactory configFactory) {
public RawSchemaClientFactory(ConfigFactory configFactory, HermesMetrics hermesMetrics) {
this.configFactory = configFactory;
this.hermesMetrics = hermesMetrics;
}

@Override
Expand All @@ -36,16 +39,21 @@ public RawSchemaClient provide() {
String schemaRepositoryType = configFactory.getStringProperty(Configs.SCHEMA_REPOSITORY_TYPE).toUpperCase();
Client client = ClientBuilder.newClient(config);
URI schemaRepositoryServerUri = URI.create(configFactory.getStringProperty(Configs.SCHEMA_REPOSITORY_SERVER_URL));
switch (SchemaRepositoryType.valueOf(schemaRepositoryType)) {
SchemaRepositoryType repoType = SchemaRepositoryType.valueOf(schemaRepositoryType);
switch (repoType) {
case SCHEMA_REPO:
return new SchemaRepoRawSchemaClient(client, schemaRepositoryServerUri);
return createMetricsTrackingClient(new SchemaRepoRawSchemaClient(client, schemaRepositoryServerUri), repoType);
case SCHEMA_REGISTRY:
return new SchemaRegistryRawSchemaClient(client, schemaRepositoryServerUri);
return createMetricsTrackingClient(new SchemaRegistryRawSchemaClient(client, schemaRepositoryServerUri), repoType);
default:
throw new IllegalStateException("Unknown schema repository type " + schemaRepositoryType);
}
}

private RawSchemaClient createMetricsTrackingClient(RawSchemaClient rawSchemaClient, SchemaRepositoryType schemaRepositoryType) {
return new ReadMetricsTrackingRawSchemaClient(rawSchemaClient, hermesMetrics, schemaRepositoryType);
}

@Override
public void dispose(RawSchemaClient instance) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package pl.allegro.tech.hermes.common.schema;

import com.codahale.metrics.Timer;
import pl.allegro.tech.hermes.api.RawSchema;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.metric.Timers;
import pl.allegro.tech.hermes.schema.RawSchemaClient;
import pl.allegro.tech.hermes.schema.SchemaVersion;

import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

public class ReadMetricsTrackingRawSchemaClient implements RawSchemaClient {
private final RawSchemaClient rawSchemaClient;
private final HermesMetrics hermesMetrics;
private final SchemaRepositoryType schemaRepoType;

public ReadMetricsTrackingRawSchemaClient(
RawSchemaClient rawSchemaClient,
HermesMetrics hermesMetrics,
SchemaRepositoryType schemaRepoType) {
this.rawSchemaClient = rawSchemaClient;
this.hermesMetrics = hermesMetrics;
this.schemaRepoType = schemaRepoType;
}

@Override
public Optional<RawSchema> getSchema(TopicName topic, SchemaVersion version) {
return timedSchema(() -> rawSchemaClient.getSchema(topic, version));
}

@Override
public Optional<RawSchema> getLatestSchema(TopicName topic) {
return timedSchema(() -> rawSchemaClient.getLatestSchema(topic));
}

@Override
public List<SchemaVersion> getVersions(TopicName topic) {
return timedVersions(() -> rawSchemaClient.getVersions(topic));
}

@Override
public void registerSchema(TopicName topic, RawSchema rawSchema) {
rawSchemaClient.registerSchema(topic, rawSchema);
}

@Override
public void deleteAllSchemaVersions(TopicName topic) {
rawSchemaClient.deleteAllSchemaVersions(topic);
}

private <T> T timedSchema(Supplier<? extends T> callable) {
return timed(callable, Timers.GET_SCHEMA_LATENCY);
}

private <T> T timedVersions(Supplier<? extends T> callable) {
return timed(callable, Timers.GET_SCHEMA_VERSIONS_LATENCY);
}

private <T> T timed(Supplier<? extends T> callable, String schemaTimer) {
try (Timer.Context time = startLatencyTimer(schemaTimer)) {
return callable.get();
}
}

private Timer.Context startLatencyTimer(String schemaReadLatency) {
return hermesMetrics.schemaTimer(schemaReadLatency, schemaRepoType).time();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package pl.allegro.tech.hermes.common.schema;

public enum SchemaRepositoryType {
SCHEMA_REPO, SCHEMA_REGISTRY
SCHEMA_REPO("schema-repo"), SCHEMA_REGISTRY("schema-registry");

private final String metricName;

SchemaRepositoryType(String metricName) {
this.metricName = metricName;
}

@Override
public String toString() {
return metricName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package pl.allegro.tech.hermes.common.schema

import com.codahale.metrics.Timer
import pl.allegro.tech.hermes.api.RawSchema
import pl.allegro.tech.hermes.api.TopicName
import pl.allegro.tech.hermes.common.metric.HermesMetrics
import pl.allegro.tech.hermes.common.metric.Timers
import pl.allegro.tech.hermes.schema.RawSchemaClient
import pl.allegro.tech.hermes.schema.SchemaVersion
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Subject

import static pl.allegro.tech.hermes.common.schema.SchemaRepositoryType.SCHEMA_REGISTRY

class ReadMetricsTrackingRawSchemaClientTest extends Specification {
@Shared
TopicName topicName = TopicName.fromQualifiedName("someGroup.someTopic")

@Shared
SchemaVersion schemaVersion = SchemaVersion.valueOf(1)

@Shared
RawSchema schema = RawSchema.valueOf("some_schema")

HermesMetrics hermesMetrics = Mock()

Timer schemaLatencyTimer = new Timer()

Timer schemaVersionsLatencyTimer = new Timer()

RawSchemaClient rawSchemaClient = Mock()

@Subject
RawSchemaClient readMetricsTrackingClient = new ReadMetricsTrackingRawSchemaClient(rawSchemaClient, hermesMetrics, SCHEMA_REGISTRY)

def "should track latency metrics for schema retrieval"(){
expect:
schemaLatencyTimer.count == 0

when:
readMetricsTrackingClient.getSchema(topicName, schemaVersion)

then:
1 * hermesMetrics.schemaTimer(Timers.GET_SCHEMA_LATENCY, SCHEMA_REGISTRY) >> schemaLatencyTimer
1 * rawSchemaClient.getSchema(topicName, schemaVersion)
schemaLatencyTimer.count == 1
}

def "should track latency metrics for latest schema retrieval"(){
expect:
schemaLatencyTimer.count == 0

when:
readMetricsTrackingClient.getLatestSchema(topicName)

then:
1 * hermesMetrics.schemaTimer(Timers.GET_SCHEMA_LATENCY, SCHEMA_REGISTRY) >> schemaLatencyTimer
1 * rawSchemaClient.getLatestSchema(topicName)
schemaLatencyTimer.count == 1
}

def "should track latency metrics for versions retrieval"(){
expect:
schemaVersionsLatencyTimer.count == 0

when:
readMetricsTrackingClient.getVersions(topicName)

then:
1 * hermesMetrics.schemaTimer(Timers.GET_SCHEMA_VERSIONS_LATENCY, SCHEMA_REGISTRY) >> schemaVersionsLatencyTimer
1 * rawSchemaClient.getVersions(topicName)
schemaVersionsLatencyTimer.count == 1
}

def "should call inner client for non-read operations"() {
when:
readMetricsTrackingClient.deleteAllSchemaVersions(topicName)

then:
1 * rawSchemaClient.deleteAllSchemaVersions(topicName)
schemaVersionsLatencyTimer.count == 0

when:
readMetricsTrackingClient.registerSchema(topicName, schema)

then:
1 * rawSchemaClient.registerSchema(topicName, schema)
schemaLatencyTimer.count == 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ public class ZookeeperCounterReporterTest {
public static final SortedMap<String, Gauge> EMPTY_GAUGES = null;
public static final String GROUP_NAME_UNDERSCORE = "pl_allegro_tech_skylab";
public static final String GROUP_NAME = "pl.allegro.tech.skylab";
public static final String TOPIC_NAME = "topic1";
public static final String TOPIC_NAME_UNDERSCORE = "topic_1";
public static final String SUBSCRIPTION_NAME_UNDERSCORE = "subscription_name";
public static final String SUBSCRIPTION_NAME = "subscription.name";
public static final TopicName QUALIFIED_TOPIC_NAME = new TopicName(GROUP_NAME, TOPIC_NAME);
public static final TopicName QUALIFIED_TOPIC_NAME = new TopicName(GROUP_NAME, TOPIC_NAME_UNDERSCORE);
public static final long COUNT = 100L;
public static final String GRAPHITE_PREFIX = "tech.hermes";

private static PathsCompiler pathsCompiler = new PathsCompiler("localhost.domain");

public static final String METRIC_NAME_FOR_PUBLISHED = pathsCompiler.compile(PUBLISHED,
pathContext().withGroup(GROUP_NAME_UNDERSCORE).withTopic(TOPIC_NAME).build());
public static final String METRIC_NAME_FOR_PUBLISHED = pathsCompiler.compile(PUBLISHED, pathContext()
.withGroup(GROUP_NAME_UNDERSCORE).withTopic(TOPIC_NAME_UNDERSCORE).build());

public static final String METRIC_NAME_FOR_DELIVERED = pathsCompiler.compile(DELIVERED,
pathContext().withGroup(GROUP_NAME_UNDERSCORE).withTopic(TOPIC_NAME).withSubscription(SUBSCRIPTION_NAME_UNDERSCORE).build());
public static final String METRIC_NAME_FOR_DELIVERED = pathsCompiler.compile(DELIVERED, pathContext()
.withGroup(GROUP_NAME_UNDERSCORE).withTopic(TOPIC_NAME_UNDERSCORE).withSubscription(SUBSCRIPTION_NAME_UNDERSCORE).build());

public static final String METRIC_NAME_FOR_DISCARDED = pathsCompiler.compile(DISCARDED,
pathContext().withGroup(GROUP_NAME_UNDERSCORE).withTopic(TOPIC_NAME).withSubscription(SUBSCRIPTION_NAME_UNDERSCORE).build());
public static final String METRIC_NAME_FOR_DISCARDED = pathsCompiler.compile(DISCARDED, pathContext()
.withGroup(GROUP_NAME_UNDERSCORE).withTopic(TOPIC_NAME_UNDERSCORE).withSubscription(SUBSCRIPTION_NAME_UNDERSCORE).build());

@Mock
private CounterStorage counterStorage;
Expand Down
3 changes: 2 additions & 1 deletion hermes-console/static/partials/modal/editGroup.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ <h3 class="modal-title" ng-show="operation === 'EDIT'"><small>Edit group:</small
<div class="form-group {{groupForm.groupName.$valid ? '' : 'has-error'}}">
<label for="groupName" class="col-md-3 control-label">Name</label>
<div class="col-md-9">
<input class="form-control" id="groupName" required name="groupName" placeholder="group name" ng-model="group.groupName" ng-disabled="operation === 'EDIT'"/>
<input class="form-control" id="groupName" required name="groupName" placeholder="group name"
ng-model="group.groupName" ng-disabled="operation === 'EDIT'" ng-pattern="/^[a-zA-Z0-9.-]+$/"/>
</div>
</div>
</form>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static pl.allegro.tech.hermes.api.ContentType.AVRO;
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.BATCH_ID;
import static pl.allegro.tech.hermes.common.http.MessageMetadataHeaders.RETRY_COUNT;
import static pl.allegro.tech.hermes.consumers.consumer.sender.http.AvroMediaType.AVRO_BINARY;
import static pl.allegro.tech.hermes.api.AvroMediaType.AVRO_BINARY;

public class ApacheHttpClientMessageBatchSender implements MessageBatchSender {

Expand Down
Loading

0 comments on commit 17947ee

Please sign in to comment.