Skip to content

Commit 6d09fa2

Browse files
committed
Apply client options as defined on the cluster
1 parent f159eac commit 6d09fa2

File tree

3 files changed

+131
-2
lines changed

3 files changed

+131
-2
lines changed

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaClientConfigUtil.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.kafka.common.config.SaslConfigs;
3232
import org.apache.kafka.common.config.SslConfigs;
3333
import org.apache.kafka.common.security.auth.SecurityProtocol;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3436
import org.sourcelab.kafka.webview.ui.manager.kafka.config.ClusterConfig;
3537

3638
import java.util.ArrayList;
@@ -46,6 +48,8 @@
4648
* Utility class to DRY out common Kafka client configuration options that apply to multiple client types.
4749
*/
4850
public class KafkaClientConfigUtil {
51+
private static final Logger logger = LoggerFactory.getLogger(KafkaClientConfigUtil.class);
52+
4953
/**
5054
* Path on filesystem where keystores are persisted.
5155
*/
@@ -116,9 +120,38 @@ private Map<String, Object> applyCommonSettings(
116120
// Optionally configure SASL
117121
applySaslSettings(clusterConfig, config);
118122

123+
// Apply cluster client properties if defined.
124+
// Note: This should always be applied last.
125+
applyClusterClientProperties(clusterConfig, config);
126+
119127
return config;
120128
}
121129

130+
/**
131+
* If client properties are defined on the cluster, they get applied last ontop of everything else.
132+
* @param clusterConfig configuration properties.
133+
* @param config config to be applied to.
134+
*/
135+
private void applyClusterClientProperties(final ClusterConfig clusterConfig, final Map<String, Object> config) {
136+
if (clusterConfig.getClusterClientProperties().isEmpty()) {
137+
return;
138+
}
139+
140+
for (final Map.Entry<String, String> entry : clusterConfig.getClusterClientProperties().entrySet()) {
141+
if (config.containsKey(entry.getKey())) {
142+
// Log a warning as behavior may be altered in a way that causes Kafka WebView to no longer function.
143+
logger.warn(
144+
"Client property defined on the cluster replaced property '{}'. "
145+
+ "The original value of '{}' was replaced with with '{}'. "
146+
+ "Overriding of configuration properties in this way may cause Kafka Webview to not function correctly.",
147+
entry.getKey(), config.get(entry.getKey()), entry.getValue()
148+
);
149+
}
150+
// Set value.
151+
config.put(entry.getKey(), entry.getValue());
152+
}
153+
}
154+
122155
/**
123156
* If SSL is configured for this cluster, apply the settings.
124157
* @param clusterConfig Cluster configuration definition to source values from.

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClusterConfig.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,18 @@
2424

2525
package org.sourcelab.kafka.webview.ui.manager.kafka.config;
2626

27+
import com.fasterxml.jackson.databind.ObjectMapper;
2728
import org.sourcelab.kafka.webview.ui.manager.encryption.SecretManager;
2829
import org.sourcelab.kafka.webview.ui.manager.sasl.SaslProperties;
2930
import org.sourcelab.kafka.webview.ui.manager.sasl.SaslUtility;
3031
import org.sourcelab.kafka.webview.ui.model.Cluster;
3132

33+
import java.io.IOException;
3234
import java.util.Arrays;
35+
import java.util.Collections;
36+
import java.util.HashMap;
3337
import java.util.HashSet;
38+
import java.util.Map;
3439
import java.util.Set;
3540
import java.util.stream.Collectors;
3641

@@ -58,6 +63,11 @@ public class ClusterConfig {
5863
private final String saslMechanism;
5964
private final String saslJaas;
6065

66+
/**
67+
* Client Properties defined on the Cluster configuration.
68+
*/
69+
private final Map<String, String> clusterClientProperties;
70+
6171
/**
6272
* Private constructor for connecting to SSL brokers.
6373
*/
@@ -72,7 +82,9 @@ private ClusterConfig(
7282
final String saslPlaintextUsername,
7383
final String saslPlaintextPassword,
7484
final String saslMechanism,
75-
final String saslJaas) {
85+
final String saslJaas,
86+
final Map<String, String> clusterClientProperties
87+
) {
7688

7789
this.brokerHosts = brokerHosts;
7890

@@ -89,6 +101,11 @@ private ClusterConfig(
89101
this.saslPlaintextPassword = saslPlaintextPassword;
90102
this.saslMechanism = saslMechanism;
91103
this.saslJaas = saslJaas;
104+
105+
// Shallow copy the cluster client properties.
106+
this.clusterClientProperties = Collections.unmodifiableMap(
107+
new HashMap<>(clusterClientProperties)
108+
);
92109
}
93110

94111
public Set<String> getBrokerHosts() {
@@ -139,6 +156,10 @@ public String getSaslJaas() {
139156
return saslJaas;
140157
}
141158

159+
public Map<String, String> getClusterClientProperties() {
160+
return clusterClientProperties;
161+
}
162+
142163
@Override
143164
public String toString() {
144165
return "ClusterConfig{"
@@ -196,6 +217,19 @@ public static Builder newBuilder(final Cluster cluster, final SecretManager secr
196217
builder.withUseSasl(false);
197218
}
198219

220+
// If we have defined cluster client options, decode and set them.
221+
if (cluster.getOptionParameters() != null) {
222+
final ObjectMapper objectMapper = new ObjectMapper();
223+
Map<String, String> customOptions;
224+
try {
225+
customOptions = objectMapper.readValue(cluster.getOptionParameters(), Map.class);
226+
} catch (final IOException e) {
227+
// Fail safe?
228+
customOptions = new HashMap<>();
229+
}
230+
builder.withClusterClientConfig(customOptions);
231+
}
232+
199233
return builder;
200234
}
201235

@@ -223,6 +257,12 @@ public static final class Builder {
223257
private String saslMechanism;
224258
private String saslJaas;
225259

260+
/**
261+
* Override properties defined from the cluster.option_parameters field.
262+
* These should be applied LAST ontop of all the other config options.
263+
*/
264+
private Map<String, String> clusterOverrideProperties = new HashMap<>();
265+
226266
private Builder() {
227267
}
228268

@@ -323,6 +363,22 @@ public Builder withSaslJaas(final String saslJaas) {
323363
return this;
324364
}
325365

366+
/**
367+
* Declare Override Properties defined on the cluster.
368+
*/
369+
public Builder withClusterClientConfig(final Map<String, String> clusterClientConfig) {
370+
this.clusterOverrideProperties.putAll(clusterClientConfig);
371+
return this;
372+
}
373+
374+
/**
375+
* Declare Override Properties defined on the cluster.
376+
*/
377+
public Builder withClusterClientConfig(final String key, final String value) {
378+
this.clusterOverrideProperties.put(key, value);
379+
return this;
380+
}
381+
326382
/**
327383
* Create ClusterConfig instance from builder values.
328384
*/
@@ -340,7 +396,9 @@ public ClusterConfig build() {
340396
saslPlaintextUsername,
341397
saslPlaintextPassword,
342398
saslMechanism,
343-
saslJaas
399+
saslJaas,
400+
// Cluster client properties
401+
clusterOverrideProperties
344402
);
345403
}
346404
}

kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaClientConfigUtilTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.junit.Test;
3232
import org.sourcelab.kafka.webview.ui.manager.kafka.config.ClusterConfig;
3333

34+
import java.util.HashMap;
3435
import java.util.Map;
3536

3637
import static org.junit.Assert.assertEquals;
@@ -81,6 +82,43 @@ public void testApplyCommonSettings_noSsl_noSasl() {
8182
validateNoKey(config, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
8283
}
8384

85+
/**
86+
* Basic smoke test, without SSL or SASL options, but has cluster properties.
87+
*/
88+
@Test
89+
public void testApplyCommonSettings_noSsl_noSasl_withClientProperties() {
90+
final Map<String, String> customProperties = new HashMap<>();
91+
customProperties.put("key3", "value3");
92+
customProperties.put("key4", "value4");
93+
94+
final ClusterConfig clusterConfig = ClusterConfig.newBuilder()
95+
.withBrokerHosts(expectedBrokerHosts)
96+
.withUseSsl(false)
97+
.withUseSasl(false)
98+
// Use both setters
99+
.withClusterClientConfig("key1", "value1")
100+
.withClusterClientConfig("key2", "value2")
101+
.withClusterClientConfig(customProperties)
102+
// Build it.
103+
.build();
104+
105+
final Map<String, Object> config = util.applyCommonSettings(clusterConfig, consumerId);
106+
107+
// Validate
108+
validateDefaultKeys(config);
109+
validateNoSsl(config);
110+
validateNoSasl(config);
111+
112+
// Validate this is not set.
113+
validateNoKey(config, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
114+
115+
// Validate custom properties applied
116+
validateKey(config, "key1", "value1");
117+
validateKey(config, "key2", "value2");
118+
validateKey(config, "key3", "value3");
119+
validateKey(config, "key4", "value4");
120+
}
121+
84122
/**
85123
* Basic smoke test with SSL and TrustStore, without SASL options.
86124
*/

0 commit comments

Comments
 (0)