Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Readiness probe to use broker listener #833

Merged
merged 18 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ All notable changes to this project will be documented in this file.
- Bump Kafka 3.7.1 to 3.7.2 in tests and getting_started, and bump upgrade testing from 3.7.1->3.8.0 to 3.8.0->3.9.0 ([#822]).
- docs: Update supported versions list ([#835]).

### Fixed

- Readiness probe fixed if Kerberos is enabled ([#833]).

[#796]: https://github.com/stackabletech/kafka-operator/pull/796
[#803]: https://github.com/stackabletech/kafka-operator/pull/803
[#809]: https://github.com/stackabletech/kafka-operator/pull/809
Expand All @@ -33,6 +37,7 @@ All notable changes to this project will be documented in this file.
[#819]: https://github.com/stackabletech/kafka-operator/pull/819
[#822]: https://github.com/stackabletech/kafka-operator/pull/822
[#830]: https://github.com/stackabletech/kafka-operator/pull/830
[#833]: https://github.com/stackabletech/kafka-operator/pull/833
[#835]: https://github.com/stackabletech/kafka-operator/pull/835

## [24.11.1] - 2025-01-10
Expand Down
6 changes: 3 additions & 3 deletions Cargo.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/operator-binary/src/crd/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ pub fn node_address_cmd(directory: &str) -> String {
format!("$(cat {directory}/default-address/address)")
}

fn node_port_cmd(directory: &str, port_name: &str) -> String {
pub fn node_port_cmd(directory: &str, port_name: &str) -> String {
format!("$(cat {directory}/default-address/ports/{port_name})")
}

Expand Down
41 changes: 27 additions & 14 deletions rust/operator-binary/src/crd/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use stackable_operator::{
utils::COMMON_BASH_TRAP_FUNCTIONS,
};

use super::listener::node_port_cmd;
use crate::crd::{
authentication::{self, ResolvedAuthenticationClasses},
listener::{self, node_address_cmd, KafkaListenerConfig},
Expand Down Expand Up @@ -284,7 +285,7 @@ impl KafkaTlsSecurity {
}

/// Returns the commands for the kcat readiness probe.
pub fn kcat_prober_container_commands(&self, pod_fqdn: &String) -> Vec<String> {
pub fn kcat_prober_container_commands(&self) -> Vec<String> {
let mut args = vec![];
let port = self.client_port();

Expand All @@ -293,42 +294,58 @@ impl KafkaTlsSecurity {
args.push("-b".to_string());
args.push(format!("localhost:{}", port));
args.extend(Self::kcat_client_auth_ssl(Self::STACKABLE_TLS_KCAT_DIR));
args.push("-L".to_string());
} else if self.has_kerberos_enabled() {
let service_name = KafkaRole::Broker.kerberos_service_name();
let broker_port = node_port_cmd(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name());
// here we need to specify a shell so that variable substitution will work
// see e.g. https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ExecAction.md
args.push("/bin/bash".to_string());
args.push("-x".to_string());
args.push("-euo".to_string());
args.push("pipefail".to_string());
args.push("-c".to_string());
args.push(

// the entire command needs to be subject to the -c directive
// to prevent short-circuiting
let mut bash_args = vec![];
bash_args.push(
format!(
"export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {});",
STACKABLE_KERBEROS_KRB5_PATH
)
.to_string(),
);
args.push("/stackable/kcat".to_string());
args.push("-b".to_string());
args.push(format!("{pod_fqdn}:{port}"));
args.extend(Self::kcat_client_sasl_ssl(
bash_args.push(
format!(
"export POD_BROKER_LISTENER_ADDRESS={};",
node_address_cmd(STACKABLE_LISTENER_BROKER_DIR)
)
.to_string(),
);
bash_args.push("/stackable/kcat".to_string());
bash_args.push("-b".to_string());
bash_args.push(format!("$POD_BROKER_LISTENER_ADDRESS:{broker_port}"));
bash_args.extend(Self::kcat_client_sasl_ssl(
Self::STACKABLE_TLS_KCAT_DIR,
service_name,
pod_fqdn,
));
bash_args.push("-L".to_string());

args.push(bash_args.join(" "));
} else if self.tls_server_secret_class().is_some() {
args.push("/stackable/kcat".to_string());
args.push("-b".to_string());
args.push(format!("localhost:{}", port));
args.extend(Self::kcat_client_ssl(Self::STACKABLE_TLS_KCAT_DIR));
args.push("-L".to_string());
} else {
args.push("/stackable/kcat".to_string());
args.push("-b".to_string());
args.push(format!("localhost:{}", port));
args.push("-L".to_string());
}

args.push("-L".to_string());
args
}

Expand Down Expand Up @@ -655,11 +672,7 @@ impl KafkaTlsSecurity {
]
}

fn kcat_client_sasl_ssl(
cert_directory: &str,
service_name: &str,
pod_fqdn: &String,
) -> Vec<String> {
fn kcat_client_sasl_ssl(cert_directory: &str, service_name: &str) -> Vec<String> {
vec![
"-X".to_string(),
"security.protocol=SASL_SSL".to_string(),
Expand All @@ -672,7 +685,7 @@ impl KafkaTlsSecurity {
"-X".to_string(),
format!("sasl.kerberos.service.name={service_name}"),
"-X".to_string(),
format!("sasl.kerberos.principal={service_name}/{pod_fqdn}@$KERBEROS_REALM"),
format!("sasl.kerberos.principal={service_name}/$POD_BROKER_LISTENER_ADDRESS@$KERBEROS_REALM"),
]
}
}
13 changes: 9 additions & 4 deletions rust/operator-binary/src/kafka_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use strum::{EnumDiscriminants, IntoStaticStr};
use crate::{
config::jvm::{construct_heap_jvm_args, construct_non_heap_jvm_args},
crd::{
listener::{get_kafka_listener_config, pod_fqdn, KafkaListenerError},
listener::{get_kafka_listener_config, KafkaListenerError},
security::KafkaTlsSecurity,
v1alpha1, Container, KafkaClusterStatus, KafkaConfig, KafkaRole, APP_NAME,
DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KAFKA_HEAP_OPTS,
Expand Down Expand Up @@ -999,8 +999,6 @@ fn build_broker_rolegroup_statefulset(
.context(AddVolumeMountSnafu)?
.resources(merged_config.resources.clone().into());

let pod_fqdn = pod_fqdn(kafka, &rolegroup_ref.object_name(), cluster_info)
.context(ResolveNamespaceSnafu)?;
// Use kcat sidecar for probing container status rather than the official Kafka tools, since they incur a lot of
// unacceptable perf overhead
cb_kcat_prober
Expand All @@ -1025,12 +1023,19 @@ fn build_broker_rolegroup_statefulset(
.with_memory_limit("128Mi")
.build(),
)
.add_volume_mount(
LISTENER_BOOTSTRAP_VOLUME_NAME,
STACKABLE_LISTENER_BOOTSTRAP_DIR,
)
.context(AddVolumeMountSnafu)?
.add_volume_mount(LISTENER_BROKER_VOLUME_NAME, STACKABLE_LISTENER_BROKER_DIR)
.context(AddVolumeMountSnafu)?
// Only allow the global load balancing service to send traffic to pods that are members of the quorum
// This also acts as a hint to the StatefulSet controller to wait for each pod to enter quorum before taking down the next
.readiness_probe(Probe {
exec: Some(ExecAction {
// If the broker is able to get its fellow cluster members then it has at least completed basic registration at some point
command: Some(kafka_security.kcat_prober_container_commands(&pod_fqdn)),
command: Some(kafka_security.kcat_prober_container_commands()),
}),
timeout_seconds: Some(5),
period_seconds: Some(2),
Expand Down
2 changes: 2 additions & 0 deletions tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ commands:
config:
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
brokerListenerClass: {{ test_scenario['values']['broker-listener-class'] }}
bootstrapListenerClass: {{ test_scenario['values']['bootstrap-listener-class'] }}
roleGroups:
default:
replicas: 3
Expand Down
10 changes: 10 additions & 0 deletions tests/test-definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ dimensions:
# Requires manual setup, see create-kerberos-secretclass.yaml
# This will *not* respect the kerberos-realm test attribute, but instead use a hard-coded realm
# - activeDirectory
- name: broker-listener-class
values:
- "cluster-internal"
- name: bootstrap-listener-class
values:
- "cluster-internal"
- "external-stable"
- "external-unstable"
tests:
- name: smoke
dimensions:
Expand Down Expand Up @@ -106,6 +114,8 @@ tests:
- kerberos-realm
- kerberos-backend
- openshift
- broker-listener-class
- bootstrap-listener-class

suites:
- name: nightly
Expand Down