Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ClusterResources for Listener controller #232

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Use ClusterResources for Listener controller
Fixes #221
nightkr committed Oct 7, 2024
commit b1237d2863909b519574303a320ea4c875b52102
101 changes: 78 additions & 23 deletions rust/operator-binary/src/listener_controller.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,8 @@ use futures::{
};
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_operator::{
builder::meta::OwnerReferenceBuilder,
builder::meta::ObjectMetaBuilder,
cluster_resources::{ClusterResourceApplyStrategy, ClusterResources},
commons::listener::{
AddressType, Listener, ListenerClass, ListenerIngress, ListenerPort, ListenerSpec,
ListenerStatus, ServiceType,
@@ -21,19 +22,23 @@ use stackable_operator::{
kube::{
api::{DynamicObject, ObjectMeta},
runtime::{controller, reflector::ObjectRef, watcher},
ResourceExt,
Resource, ResourceExt,
},
kvp::Labels,
logging::controller::{report_controller_reconciled, ReconcilerError},
time::Duration,
};
use strum::IntoStaticStr;

use crate::{csi_server::node::NODE_TOPOLOGY_LABEL_HOSTNAME, utils::node_primary_address};
use crate::{
csi_server::node::NODE_TOPOLOGY_LABEL_HOSTNAME, utils::node_primary_address, APP_NAME,
OPERATOR_KEY,
};

#[cfg(doc)]
use stackable_operator::k8s_openapi::api::core::v1::Pod;

const FIELD_MANAGER_SCOPE: &str = "listener";
const CONTROLLER_NAME: &str = "listener";

pub async fn run(client: stackable_operator::client::Client) {
let controller =
@@ -112,6 +117,11 @@ pub enum Error {
#[snafu(display("object has no name"))]
NoName,

#[snafu(display("failed to create cluster resources"))]
CreateClusterResources {
source: stackable_operator::cluster_resources::Error,
},

#[snafu(display("object has no ListenerClass (.spec.class_name)"))]
NoListenerClass,

@@ -130,6 +140,16 @@ pub enum Error {
source: stackable_operator::client::Error,
},

#[snafu(display("failed to validate labels passed through from Listener"))]
ValidateListenerLabels {
source: stackable_operator::kvp::LabelError,
},

#[snafu(display("failed to build cluster resource labels"))]
BuildClusterResourcesLabels {
source: stackable_operator::kvp::LabelError,
},

#[snafu(display("failed to get {obj}"))]
GetObject {
source: stackable_operator::client::Error,
@@ -143,10 +163,15 @@ pub enum Error {

#[snafu(display("failed to apply {svc}"))]
ApplyService {
source: stackable_operator::client::Error,
source: stackable_operator::cluster_resources::Error,
svc: ObjectRef<Service>,
},

#[snafu(display("failed to delete orphaned resources"))]
DeleteOrphans {
source: stackable_operator::cluster_resources::Error,
},

#[snafu(display("failed to apply status for Listener"))]
ApplyStatus {
source: stackable_operator::client::Error,
@@ -162,19 +187,33 @@ impl ReconcilerError for Error {
match self {
Self::NoNs => None,
Self::NoName => None,
Self::CreateClusterResources { source: _ } => None,
Self::NoListenerClass => None,
Self::ListenerPvSelector { source: _ } => None,
Self::ListenerPodSelector { source: _ } => None,
Self::GetListenerPvs { source: _ } => None,
Self::ValidateListenerLabels { source: _ } => None,
Self::BuildClusterResourcesLabels { source: _ } => None,
Self::GetObject { source: _, obj } => Some(obj.clone()),
Self::BuildListenerOwnerRef { .. } => None,
Self::ApplyService { source: _, svc } => Some(svc.clone().erase()),
Self::DeleteOrphans { source: _ } => None,
Self::ApplyStatus { source: _ } => None,
}
}
}

pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<controller::Action> {
let mut cluster_resources = ClusterResources::new(
APP_NAME,
OPERATOR_KEY,
CONTROLLER_NAME,
&listener.object_ref(&()),
// Listeners don't currently support pausing
ClusterResourceApplyStrategy::Default,
)
.context(CreateClusterResourcesSnafu)?;

let ns = listener.metadata.namespace.as_deref().context(NoNsSnafu)?;
let listener_class_name = listener
.spec
@@ -228,17 +267,29 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
};

let svc = Service {
metadata: ObjectMeta {
namespace: Some(ns.to_string()),
name: Some(svc_name.clone()),
owner_references: Some(vec![OwnerReferenceBuilder::new()
.initialize_from_resource(&*listener)
.build()
.context(BuildListenerOwnerRefSnafu)?]),
// Propagate the labels from the Listener object to the Service object, so it can be found easier
labels: listener.metadata.labels.clone(),
..Default::default()
},
metadata: ObjectMetaBuilder::new()
.namespace(ns)
.name(&svc_name)
.ownerreference_from_resource(&*listener, Some(true), Some(true))
.context(BuildListenerOwnerRefSnafu)?
.with_labels(
Labels::try_from(
listener
.metadata
.labels
.as_ref()
.unwrap_or(&BTreeMap::new()),
)
.context(ValidateListenerLabelsSnafu)?,
)
.with_labels(
cluster_resources
// Not using Labels::recommended, since it carries a bunch of extra information that is
// only relevant for stacklets (such as rolegroups and product versions).
.get_required_labels()
.context(BuildClusterResourcesLabelsSnafu)?,
)
.build(),
spec: Some(ServiceSpec {
// We explicitly match here and do not implement `ToString` as there might be more (non vanilla k8s Service
// types) in the future.
@@ -260,13 +311,11 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
}),
..Default::default()
};
let svc = ctx
.client
.apply_patch(FIELD_MANAGER_SCOPE, &svc, &svc)
let svc_ref = ObjectRef::from_obj(&svc);
let svc = cluster_resources
.add(&ctx.client, svc)
.await
.with_context(|_| ApplyServiceSnafu {
svc: ObjectRef::from_obj(&svc),
})?;
.context(ApplyServiceSnafu { svc: svc_ref })?;

let nodes: Vec<Node>;
let addresses: Vec<(&str, AddressType)>;
@@ -363,8 +412,14 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
),
node_ports: (listener_class.spec.service_type == ServiceType::NodePort).then_some(ports),
};

cluster_resources
.delete_orphaned_resources(&ctx.client)
.await
.context(DeleteOrphansSnafu)?;

ctx.client
.apply_patch_status(FIELD_MANAGER_SCOPE, &listener_status_meta, &listener_status)
.apply_patch_status(CONTROLLER_NAME, &listener_status_meta, &listener_status)
.await
.context(ApplyStatusSnafu)?;

1 change: 1 addition & 0 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ mod csi_server;
mod listener_controller;
mod utils;

const APP_NAME: &str = "listener";
const OPERATOR_KEY: &str = "listeners.stackable.tech";

#[derive(clap::Parser)]