diff --git a/CHANGELOG.md b/CHANGELOG.md index ffc6575e..fa982ac8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,10 @@ All notable changes to this project will be documented in this file. - Bump Kafka 3.7.1 to 3.7.2 in tests and getting_started, and bump upgrade testing from 3.7.1->3.8.0 to 3.8.0->3.9.0 ([#822]). - docs: Update supported versions list ([#835]). +### Fixed + +- Readiness probe fixed if Kerberos is enabled ([#833]). + [#796]: https://github.com/stackabletech/kafka-operator/pull/796 [#803]: https://github.com/stackabletech/kafka-operator/pull/803 [#809]: https://github.com/stackabletech/kafka-operator/pull/809 @@ -33,6 +37,7 @@ All notable changes to this project will be documented in this file. [#819]: https://github.com/stackabletech/kafka-operator/pull/819 [#822]: https://github.com/stackabletech/kafka-operator/pull/822 [#830]: https://github.com/stackabletech/kafka-operator/pull/830 +[#833]: https://github.com/stackabletech/kafka-operator/pull/833 [#835]: https://github.com/stackabletech/kafka-operator/pull/835 ## [24.11.1] - 2025-01-10 diff --git a/Cargo.nix b/Cargo.nix index cc300ed4..343be2e7 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -6030,10 +6030,10 @@ rec { }; "ring" = rec { crateName = "ring"; - version = "0.17.11"; + version = "0.17.13"; edition = "2021"; - links = "ring_core_0_17_11_"; - sha256 = "0wzyhdbf71ndd14kkpyj2a6nvczvli2mndzv2al7r26k4yp4jlys"; + links = "ring_core_0_17_13_"; + sha256 = "1vjhhlmpqqd9lc53ffjj1yk203188n2km27g3myvssm15a1mvb3h"; dependencies = [ { name = "cfg-if"; diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 38908da9..c27c5d57 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -241,7 +241,7 @@ pub fn node_address_cmd(directory: &str) -> String { format!("$(cat {directory}/default-address/address)") } -fn node_port_cmd(directory: &str, port_name: &str) -> String { +pub fn node_port_cmd(directory: &str, port_name: &str) -> String { format!("$(cat {directory}/default-address/ports/{port_name})") } diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index bf98b082..bff15f18 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -27,6 +27,7 @@ use stackable_operator::{ utils::COMMON_BASH_TRAP_FUNCTIONS, }; +use super::listener::node_port_cmd; use crate::crd::{ authentication::{self, ResolvedAuthenticationClasses}, listener::{self, node_address_cmd, KafkaListenerConfig}, @@ -284,7 +285,7 @@ impl KafkaTlsSecurity { } /// Returns the commands for the kcat readiness probe. - pub fn kcat_prober_container_commands(&self, pod_fqdn: &String) -> Vec { + pub fn kcat_prober_container_commands(&self) -> Vec { let mut args = vec![]; let port = self.client_port(); @@ -293,8 +294,10 @@ impl KafkaTlsSecurity { args.push("-b".to_string()); args.push(format!("localhost:{}", port)); args.extend(Self::kcat_client_auth_ssl(Self::STACKABLE_TLS_KCAT_DIR)); + args.push("-L".to_string()); } else if self.has_kerberos_enabled() { let service_name = KafkaRole::Broker.kerberos_service_name(); + let broker_port = node_port_cmd(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name()); // here we need to specify a shell so that variable substitution will work // see e.g. https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ExecAction.md args.push("/bin/bash".to_string()); @@ -302,33 +305,47 @@ impl KafkaTlsSecurity { args.push("-euo".to_string()); args.push("pipefail".to_string()); args.push("-c".to_string()); - args.push( + + // the entire command needs to be subject to the -c directive + // to prevent short-circuiting + let mut bash_args = vec![]; + bash_args.push( format!( "export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {});", STACKABLE_KERBEROS_KRB5_PATH ) .to_string(), ); - args.push("/stackable/kcat".to_string()); - args.push("-b".to_string()); - args.push(format!("{pod_fqdn}:{port}")); - args.extend(Self::kcat_client_sasl_ssl( + bash_args.push( + format!( + "export POD_BROKER_LISTENER_ADDRESS={};", + node_address_cmd(STACKABLE_LISTENER_BROKER_DIR) + ) + .to_string(), + ); + bash_args.push("/stackable/kcat".to_string()); + bash_args.push("-b".to_string()); + bash_args.push(format!("$POD_BROKER_LISTENER_ADDRESS:{broker_port}")); + bash_args.extend(Self::kcat_client_sasl_ssl( Self::STACKABLE_TLS_KCAT_DIR, service_name, - pod_fqdn, )); + bash_args.push("-L".to_string()); + + args.push(bash_args.join(" ")); } else if self.tls_server_secret_class().is_some() { args.push("/stackable/kcat".to_string()); args.push("-b".to_string()); args.push(format!("localhost:{}", port)); args.extend(Self::kcat_client_ssl(Self::STACKABLE_TLS_KCAT_DIR)); + args.push("-L".to_string()); } else { args.push("/stackable/kcat".to_string()); args.push("-b".to_string()); args.push(format!("localhost:{}", port)); + args.push("-L".to_string()); } - args.push("-L".to_string()); args } @@ -655,11 +672,7 @@ impl KafkaTlsSecurity { ] } - fn kcat_client_sasl_ssl( - cert_directory: &str, - service_name: &str, - pod_fqdn: &String, - ) -> Vec { + fn kcat_client_sasl_ssl(cert_directory: &str, service_name: &str) -> Vec { vec![ "-X".to_string(), "security.protocol=SASL_SSL".to_string(), @@ -672,7 +685,7 @@ impl KafkaTlsSecurity { "-X".to_string(), format!("sasl.kerberos.service.name={service_name}"), "-X".to_string(), - format!("sasl.kerberos.principal={service_name}/{pod_fqdn}@$KERBEROS_REALM"), + format!("sasl.kerberos.principal={service_name}/$POD_BROKER_LISTENER_ADDRESS@$KERBEROS_REALM"), ] } } diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 0374e572..bdd94137 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -76,7 +76,7 @@ use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ config::jvm::{construct_heap_jvm_args, construct_non_heap_jvm_args}, crd::{ - listener::{get_kafka_listener_config, pod_fqdn, KafkaListenerError}, + listener::{get_kafka_listener_config, KafkaListenerError}, security::KafkaTlsSecurity, v1alpha1, Container, KafkaClusterStatus, KafkaConfig, KafkaRole, APP_NAME, DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KAFKA_HEAP_OPTS, @@ -999,8 +999,6 @@ fn build_broker_rolegroup_statefulset( .context(AddVolumeMountSnafu)? .resources(merged_config.resources.clone().into()); - let pod_fqdn = pod_fqdn(kafka, &rolegroup_ref.object_name(), cluster_info) - .context(ResolveNamespaceSnafu)?; // Use kcat sidecar for probing container status rather than the official Kafka tools, since they incur a lot of // unacceptable perf overhead cb_kcat_prober @@ -1025,12 +1023,19 @@ fn build_broker_rolegroup_statefulset( .with_memory_limit("128Mi") .build(), ) + .add_volume_mount( + LISTENER_BOOTSTRAP_VOLUME_NAME, + STACKABLE_LISTENER_BOOTSTRAP_DIR, + ) + .context(AddVolumeMountSnafu)? + .add_volume_mount(LISTENER_BROKER_VOLUME_NAME, STACKABLE_LISTENER_BROKER_DIR) + .context(AddVolumeMountSnafu)? // Only allow the global load balancing service to send traffic to pods that are members of the quorum // This also acts as a hint to the StatefulSet controller to wait for each pod to enter quorum before taking down the next .readiness_probe(Probe { exec: Some(ExecAction { // If the broker is able to get its fellow cluster members then it has at least completed basic registration at some point - command: Some(kafka_security.kcat_prober_container_commands(&pod_fqdn)), + command: Some(kafka_security.kcat_prober_container_commands()), }), timeout_seconds: Some(5), period_seconds: Some(2), diff --git a/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 b/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 index fc3a958a..a03d03ed 100644 --- a/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 @@ -49,6 +49,8 @@ commands: config: logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + brokerListenerClass: {{ test_scenario['values']['broker-listener-class'] }} + bootstrapListenerClass: {{ test_scenario['values']['bootstrap-listener-class'] }} roleGroups: default: replicas: 3 diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 3da10894..c80e09db 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -56,6 +56,14 @@ dimensions: # Requires manual setup, see create-kerberos-secretclass.yaml # This will *not* respect the kerberos-realm test attribute, but instead use a hard-coded realm # - activeDirectory + - name: broker-listener-class + values: + - "cluster-internal" + - name: bootstrap-listener-class + values: + - "cluster-internal" + - "external-stable" + - "external-unstable" tests: - name: smoke dimensions: @@ -106,6 +114,8 @@ tests: - kerberos-realm - kerberos-backend - openshift + - broker-listener-class + - bootstrap-listener-class suites: - name: nightly