Skip to content

Commit 3cc252f

Browse files
fix: Readiness probe to use broker listener (#833)
* wip * imports, formatting * wip: provisional fix with pod_kcat instead of pod_fqdn * wip: use correct suffix * changelog, restored test defs, added comment * restored test defs II * removed pod_scope * corrected comment * port to be be dependent on broker listener class * extend test to check all 3 bootstrap listener possibilities * Update tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 Co-authored-by: Siegfried Weber <[email protected]> * review feedback * removed unecessary log statement --------- Co-authored-by: Siegfried Weber <[email protected]>
1 parent e757736 commit 3cc252f

File tree

7 files changed

+57
-22
lines changed

7 files changed

+57
-22
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ All notable changes to this project will be documented in this file.
2525
- Bump Kafka 3.7.1 to 3.7.2 in tests and getting_started, and bump upgrade testing from 3.7.1->3.8.0 to 3.8.0->3.9.0 ([#822]).
2626
- docs: Update supported versions list ([#835]).
2727

28+
### Fixed
29+
30+
- Readiness probe fixed if Kerberos is enabled ([#833]).
31+
2832
[#796]: https://github.com/stackabletech/kafka-operator/pull/796
2933
[#803]: https://github.com/stackabletech/kafka-operator/pull/803
3034
[#809]: https://github.com/stackabletech/kafka-operator/pull/809
@@ -33,6 +37,7 @@ All notable changes to this project will be documented in this file.
3337
[#819]: https://github.com/stackabletech/kafka-operator/pull/819
3438
[#822]: https://github.com/stackabletech/kafka-operator/pull/822
3539
[#830]: https://github.com/stackabletech/kafka-operator/pull/830
40+
[#833]: https://github.com/stackabletech/kafka-operator/pull/833
3641
[#835]: https://github.com/stackabletech/kafka-operator/pull/835
3742

3843
## [24.11.1] - 2025-01-10

Cargo.nix

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/operator-binary/src/crd/listener.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ pub fn node_address_cmd(directory: &str) -> String {
241241
format!("$(cat {directory}/default-address/address)")
242242
}
243243

244-
fn node_port_cmd(directory: &str, port_name: &str) -> String {
244+
pub fn node_port_cmd(directory: &str, port_name: &str) -> String {
245245
format!("$(cat {directory}/default-address/ports/{port_name})")
246246
}
247247

rust/operator-binary/src/crd/security.rs

+27-14
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use stackable_operator::{
2727
utils::COMMON_BASH_TRAP_FUNCTIONS,
2828
};
2929

30+
use super::listener::node_port_cmd;
3031
use crate::crd::{
3132
authentication::{self, ResolvedAuthenticationClasses},
3233
listener::{self, node_address_cmd, KafkaListenerConfig},
@@ -284,7 +285,7 @@ impl KafkaTlsSecurity {
284285
}
285286

286287
/// Returns the commands for the kcat readiness probe.
287-
pub fn kcat_prober_container_commands(&self, pod_fqdn: &String) -> Vec<String> {
288+
pub fn kcat_prober_container_commands(&self) -> Vec<String> {
288289
let mut args = vec![];
289290
let port = self.client_port();
290291

@@ -293,42 +294,58 @@ impl KafkaTlsSecurity {
293294
args.push("-b".to_string());
294295
args.push(format!("localhost:{}", port));
295296
args.extend(Self::kcat_client_auth_ssl(Self::STACKABLE_TLS_KCAT_DIR));
297+
args.push("-L".to_string());
296298
} else if self.has_kerberos_enabled() {
297299
let service_name = KafkaRole::Broker.kerberos_service_name();
300+
let broker_port = node_port_cmd(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name());
298301
// here we need to specify a shell so that variable substitution will work
299302
// see e.g. https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ExecAction.md
300303
args.push("/bin/bash".to_string());
301304
args.push("-x".to_string());
302305
args.push("-euo".to_string());
303306
args.push("pipefail".to_string());
304307
args.push("-c".to_string());
305-
args.push(
308+
309+
// the entire command needs to be subject to the -c directive
310+
// to prevent short-circuiting
311+
let mut bash_args = vec![];
312+
bash_args.push(
306313
format!(
307314
"export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {});",
308315
STACKABLE_KERBEROS_KRB5_PATH
309316
)
310317
.to_string(),
311318
);
312-
args.push("/stackable/kcat".to_string());
313-
args.push("-b".to_string());
314-
args.push(format!("{pod_fqdn}:{port}"));
315-
args.extend(Self::kcat_client_sasl_ssl(
319+
bash_args.push(
320+
format!(
321+
"export POD_BROKER_LISTENER_ADDRESS={};",
322+
node_address_cmd(STACKABLE_LISTENER_BROKER_DIR)
323+
)
324+
.to_string(),
325+
);
326+
bash_args.push("/stackable/kcat".to_string());
327+
bash_args.push("-b".to_string());
328+
bash_args.push(format!("$POD_BROKER_LISTENER_ADDRESS:{broker_port}"));
329+
bash_args.extend(Self::kcat_client_sasl_ssl(
316330
Self::STACKABLE_TLS_KCAT_DIR,
317331
service_name,
318-
pod_fqdn,
319332
));
333+
bash_args.push("-L".to_string());
334+
335+
args.push(bash_args.join(" "));
320336
} else if self.tls_server_secret_class().is_some() {
321337
args.push("/stackable/kcat".to_string());
322338
args.push("-b".to_string());
323339
args.push(format!("localhost:{}", port));
324340
args.extend(Self::kcat_client_ssl(Self::STACKABLE_TLS_KCAT_DIR));
341+
args.push("-L".to_string());
325342
} else {
326343
args.push("/stackable/kcat".to_string());
327344
args.push("-b".to_string());
328345
args.push(format!("localhost:{}", port));
346+
args.push("-L".to_string());
329347
}
330348

331-
args.push("-L".to_string());
332349
args
333350
}
334351

@@ -655,11 +672,7 @@ impl KafkaTlsSecurity {
655672
]
656673
}
657674

658-
fn kcat_client_sasl_ssl(
659-
cert_directory: &str,
660-
service_name: &str,
661-
pod_fqdn: &String,
662-
) -> Vec<String> {
675+
fn kcat_client_sasl_ssl(cert_directory: &str, service_name: &str) -> Vec<String> {
663676
vec![
664677
"-X".to_string(),
665678
"security.protocol=SASL_SSL".to_string(),
@@ -672,7 +685,7 @@ impl KafkaTlsSecurity {
672685
"-X".to_string(),
673686
format!("sasl.kerberos.service.name={service_name}"),
674687
"-X".to_string(),
675-
format!("sasl.kerberos.principal={service_name}/{pod_fqdn}@$KERBEROS_REALM"),
688+
format!("sasl.kerberos.principal={service_name}/$POD_BROKER_LISTENER_ADDRESS@$KERBEROS_REALM"),
676689
]
677690
}
678691
}

rust/operator-binary/src/kafka_controller.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ use strum::{EnumDiscriminants, IntoStaticStr};
7676
use crate::{
7777
config::jvm::{construct_heap_jvm_args, construct_non_heap_jvm_args},
7878
crd::{
79-
listener::{get_kafka_listener_config, pod_fqdn, KafkaListenerError},
79+
listener::{get_kafka_listener_config, KafkaListenerError},
8080
security::KafkaTlsSecurity,
8181
v1alpha1, Container, KafkaClusterStatus, KafkaConfig, KafkaRole, APP_NAME,
8282
DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KAFKA_HEAP_OPTS,
@@ -999,8 +999,6 @@ fn build_broker_rolegroup_statefulset(
999999
.context(AddVolumeMountSnafu)?
10001000
.resources(merged_config.resources.clone().into());
10011001

1002-
let pod_fqdn = pod_fqdn(kafka, &rolegroup_ref.object_name(), cluster_info)
1003-
.context(ResolveNamespaceSnafu)?;
10041002
// Use kcat sidecar for probing container status rather than the official Kafka tools, since they incur a lot of
10051003
// unacceptable perf overhead
10061004
cb_kcat_prober
@@ -1025,12 +1023,19 @@ fn build_broker_rolegroup_statefulset(
10251023
.with_memory_limit("128Mi")
10261024
.build(),
10271025
)
1026+
.add_volume_mount(
1027+
LISTENER_BOOTSTRAP_VOLUME_NAME,
1028+
STACKABLE_LISTENER_BOOTSTRAP_DIR,
1029+
)
1030+
.context(AddVolumeMountSnafu)?
1031+
.add_volume_mount(LISTENER_BROKER_VOLUME_NAME, STACKABLE_LISTENER_BROKER_DIR)
1032+
.context(AddVolumeMountSnafu)?
10281033
// Only allow the global load balancing service to send traffic to pods that are members of the quorum
10291034
// This also acts as a hint to the StatefulSet controller to wait for each pod to enter quorum before taking down the next
10301035
.readiness_probe(Probe {
10311036
exec: Some(ExecAction {
10321037
// If the broker is able to get its fellow cluster members then it has at least completed basic registration at some point
1033-
command: Some(kafka_security.kcat_prober_container_commands(&pod_fqdn)),
1038+
command: Some(kafka_security.kcat_prober_container_commands()),
10341039
}),
10351040
timeout_seconds: Some(5),
10361041
period_seconds: Some(2),

tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ commands:
4949
config:
5050
logging:
5151
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
52+
brokerListenerClass: {{ test_scenario['values']['broker-listener-class'] }}
53+
bootstrapListenerClass: {{ test_scenario['values']['bootstrap-listener-class'] }}
5254
roleGroups:
5355
default:
5456
replicas: 3

tests/test-definition.yaml

+10
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ dimensions:
5656
# Requires manual setup, see create-kerberos-secretclass.yaml
5757
# This will *not* respect the kerberos-realm test attribute, but instead use a hard-coded realm
5858
# - activeDirectory
59+
- name: broker-listener-class
60+
values:
61+
- "cluster-internal"
62+
- name: bootstrap-listener-class
63+
values:
64+
- "cluster-internal"
65+
- "external-stable"
66+
- "external-unstable"
5967
tests:
6068
- name: smoke
6169
dimensions:
@@ -106,6 +114,8 @@ tests:
106114
- kerberos-realm
107115
- kerberos-backend
108116
- openshift
117+
- broker-listener-class
118+
- bootstrap-listener-class
109119

110120
suites:
111121
- name: nightly

0 commit comments

Comments
 (0)