Skip to content

Commit

Permalink
Refactored to use new Java version constructs (#961)
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Patierno <[email protected]>
  • Loading branch information
ppatierno authored Jan 12, 2025
1 parent 9969bed commit 3e983a2
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 189 deletions.
15 changes: 6 additions & 9 deletions src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@ public enum EmbeddedFormat {
* @return corresponding enum
*/
public static EmbeddedFormat from(String value) {
switch (value) {
case "json":
return JSON;
case "binary":
return BINARY;
case "text":
return TEXT;
}
throw new IllegalEmbeddedFormatException("Invalid format type.");
return switch (value) {
case "json" -> JSON;
case "binary" -> BINARY;
case "text" -> TEXT;
default -> throw new IllegalEmbeddedFormatException("Invalid format type.");
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
public class HttpAdminBridgeEndpoint extends HttpBridgeEndpoint {
private static final Logger LOGGER = LogManager.getLogger(HttpAdminBridgeEndpoint.class);

private final HttpBridgeContext httpBridgeContext;
private final HttpBridgeContext<?, ?> httpBridgeContext;
private final KafkaBridgeAdmin kafkaBridgeAdmin;

/**
Expand All @@ -53,7 +53,7 @@ public class HttpAdminBridgeEndpoint extends HttpBridgeEndpoint {
* @param bridgeConfig the bridge configuration
* @param context the HTTP bridge context
*/
public HttpAdminBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext context) {
public HttpAdminBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext<?, ?> context) {
super(bridgeConfig);
this.name = "kafka-bridge-admin";
this.httpBridgeContext = context;
Expand Down
11 changes: 4 additions & 7 deletions src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private void createConsumer(RoutingContext routingContext) {
responseStatus.code(),
ex.getMessage()
);
HttpUtils.sendResponse(routingContext, error.getCode(),
HttpUtils.sendResponse(routingContext, error.code(),
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
}
}
Expand Down Expand Up @@ -586,20 +586,17 @@ private void errorHandler(RoutingContext routingContext) {
// in case of validation exception, building a meaningful error message
if (routingContext.failure() != null) {
StringBuilder sb = new StringBuilder();
if (routingContext.failure().getCause() instanceof ValidationException) {
ValidationException validationException = (ValidationException) routingContext.failure().getCause();
if (routingContext.failure().getCause() instanceof ValidationException validationException) {
if (validationException.inputScope() != null) {
sb.append("Validation error on: ").append(validationException.inputScope()).append(" - ");
}
sb.append(validationException.getMessage());
} else if (routingContext.failure() instanceof ParameterProcessorException) {
ParameterProcessorException parameterException = (ParameterProcessorException) routingContext.failure();
} else if (routingContext.failure() instanceof ParameterProcessorException parameterException) {
if (parameterException.getParameterName() != null) {
sb.append("Parameter error on: ").append(parameterException.getParameterName()).append(" - ");
}
sb.append(parameterException.getMessage());
} else if (routingContext.failure() instanceof BodyProcessorException) {
BodyProcessorException bodyProcessorException = (BodyProcessorException) routingContext.failure();
} else if (routingContext.failure() instanceof BodyProcessorException bodyProcessorException) {
sb.append(bodyProcessorException.getMessage());
}
message = sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private void doSeek(RoutingContext routingContext, JsonNode bodyAsJson) {
HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
} else {
HttpBridgeError error = handleError(ex);
HttpUtils.sendResponse(routingContext, error.getCode(),
HttpUtils.sendResponse(routingContext, error.code(),
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
}
});
Expand All @@ -223,7 +223,7 @@ private void doSeekTo(RoutingContext routingContext, JsonNode bodyAsJson, HttpOp
HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
} else {
HttpBridgeError error = handleError(ex);
HttpUtils.sendResponse(routingContext, error.getCode(),
HttpUtils.sendResponse(routingContext, error.code(),
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
}
});
Expand Down Expand Up @@ -388,7 +388,7 @@ private void doAssign(RoutingContext routingContext, JsonNode bodyAsJson) {
StreamSupport.stream(partitionsList.spliterator(), false)
.map(JsonNode.class::cast)
.map(json -> new SinkTopicSubscription(JsonUtils.getString(json, "topic"), JsonUtils.getInt(json, "partition")))
.collect(Collectors.toList())
.toList()
);

// fulfilling the request in a separate thread to free the Vert.x event loop still in place
Expand Down Expand Up @@ -441,7 +441,7 @@ private void doSubscribe(RoutingContext routingContext, JsonNode bodyAsJson) {
StreamSupport.stream(topicsList.spliterator(), false)
.map(TextNode.class::cast)
.map(topic -> new SinkTopicSubscription(topic.asText()))
.collect(Collectors.toList())
.toList()
);
this.kafkaBridgeConsumer.subscribe(topicSubscriptions);
} else if (bodyAsJson.has("topic_pattern")) {
Expand Down Expand Up @@ -554,7 +554,7 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha
LOGGER.debug("[{}] Request: body = {}", routingContext.get("request-id"), bodyAsJson);
} catch (JsonDecodeException ex) {
HttpBridgeError error = handleError(ex);
HttpUtils.sendResponse(routingContext, error.getCode(),
HttpUtils.sendResponse(routingContext, error.code(),
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
return;
}
Expand Down Expand Up @@ -609,39 +609,30 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha

@SuppressWarnings("unchecked")
private MessageConverter<K, V, byte[], byte[]> buildMessageConverter() {
switch (this.format) {
case JSON:
return (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BINARY:
return (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case TEXT:
return (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
}
return null;
return switch (this.format) {
case JSON -> (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BINARY -> (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case TEXT -> (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
default -> null;
};
}

private String getContentType() {
switch (this.format) {
case JSON:
return BridgeContentType.KAFKA_JSON_JSON;
case BINARY:
return BridgeContentType.KAFKA_JSON_BINARY;
case TEXT:
return BridgeContentType.KAFKA_JSON_TEXT;
}
throw new IllegalArgumentException();
return switch (this.format) {
case JSON -> BridgeContentType.KAFKA_JSON_JSON;
case BINARY -> BridgeContentType.KAFKA_JSON_BINARY;
case TEXT -> BridgeContentType.KAFKA_JSON_TEXT;
default -> throw new IllegalArgumentException();
};
}

private boolean checkAcceptedBody(String accept) {
switch (accept) {
case BridgeContentType.KAFKA_JSON_JSON:
return format == EmbeddedFormat.JSON;
case BridgeContentType.KAFKA_JSON_BINARY:
return format == EmbeddedFormat.BINARY;
case BridgeContentType.KAFKA_JSON_TEXT:
return format == EmbeddedFormat.TEXT;
}
return false;
return switch (accept) {
case BridgeContentType.KAFKA_JSON_JSON -> format == EmbeddedFormat.JSON;
case BridgeContentType.KAFKA_JSON_BINARY -> format == EmbeddedFormat.BINARY;
case BridgeContentType.KAFKA_JSON_TEXT -> format == EmbeddedFormat.TEXT;
default -> false;
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,11 @@ private ObjectNode buildOffsets(List<HttpBridgeResult<?>> results) {

for (HttpBridgeResult<?> result : results) {
ObjectNode offset = null;
if (result.getResult() instanceof RecordMetadata) {
RecordMetadata metadata = (RecordMetadata) result.getResult();
if (result.result() instanceof RecordMetadata metadata) {
offset = JsonUtils.createObjectNode()
.put("partition", metadata.partition())
.put("offset", metadata.offset());
} else if (result.getResult() instanceof HttpBridgeError) {
HttpBridgeError error = (HttpBridgeError) result.getResult();
} else if (result.result() instanceof HttpBridgeError error) {
offset = error.toJson();
}
offsets.add(offset);
Expand All @@ -218,14 +216,14 @@ private int handleError(Throwable ex) {

@SuppressWarnings("unchecked")
private MessageConverter<K, V, byte[], byte[]> buildMessageConverter(String contentType) {
switch (contentType) {
case BridgeContentType.KAFKA_JSON_JSON:
return (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BridgeContentType.KAFKA_JSON_BINARY:
return (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case BridgeContentType.KAFKA_JSON_TEXT:
return (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
}
return null;
return switch (contentType) {
case BridgeContentType.KAFKA_JSON_JSON ->
(MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BridgeContentType.KAFKA_JSON_BINARY ->
(MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case BridgeContentType.KAFKA_JSON_TEXT ->
(MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
default -> null;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,11 @@

/**
* Represents an error related to HTTP bridging
*
* @param code code classifying the error itself
* @param message message providing more information about the error
*/
public class HttpBridgeError {

private final int code;
private final String message;

/**
* Constructor
*
* @param code code classifying the error itself
* @param message message providing more information about the error
*/
public HttpBridgeError(int code, String message) {
this.code = code;
this.message = message;
}

/**
* @return code classifying the error itself
*/
public int getCode() {
return code;
}

/**
* @return message providing more information about the error
*/
public String getMessage() {
return message;
}
public record HttpBridgeError(int code, String message) {

/**
* @return a JSON representation of the error with code and message
Expand All @@ -54,6 +29,7 @@ public ObjectNode toJson() {

/**
* Create an error instance from a JSON representation
*
* @param json JSON representation of the error
* @return error instance
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,6 @@
* This class represents a result of an HTTP bridging operation
*
* @param <T> the class bringing the actual result as {@link HttpBridgeError} or {@link org.apache.kafka.clients.producer.RecordMetadata}
* @param result actual result
*/
public class HttpBridgeResult<T> {

final T result;

/**
* Constructor
*
* @param result actual result
*/
public HttpBridgeResult(T result) {
this.result = result;
}

/**
* @return the actual result
*/
public T getResult() {
return result;
}
}
public record HttpBridgeResult<T>(T result) { }
Loading

0 comments on commit 3e983a2

Please sign in to comment.