Skip to content

Commit b180839

Browse files
authored
The 'ensure_crd_created' method retries on failure. (#79)
* The 'ensure_crd_created' method retries on failure. * Docs + properly recognized restarted state in wait_ready * Add test for wait_ready method. Make default namespace from kubeconfig accessible. * Do not panic on Applied event, as pod status might change. * Test wait_created timeout on nonexisting resource * Cargo fmt after merge with main * Reword wait_created documentation and add an example * Used stream approach for resource waiting * Kubernetes-dependent tests ignored by default. CONTRIBUTING guide. * Run all tests, not only ignored * Renamed wait_ready to wait_created in crd.rs * Removed explicit timeout on 'ensure_crd_created' function in crd.rs
1 parent 53a1465 commit b180839

File tree

7 files changed

+320
-14
lines changed

7 files changed

+320
-14
lines changed

.github/workflows/rust.yml

+12
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,22 @@ jobs:
2020
profile: minimal
2121
toolchain: nightly
2222
override: true
23+
- name: Start K3S
24+
uses: debianmaster/[email protected]
25+
id: k3s
26+
with:
27+
version: 'latest'
28+
- uses: actions-rs/cargo@v1
29+
name: Run tests
30+
with:
31+
command: test
2332
- uses: actions-rs/cargo@v1
33+
name: Run ignored-by-default tests
2434
with:
2535
command: test
36+
args: -- --ignored
2637
- uses: actions-rs/cargo@v1
38+
name: Release build
2739
with:
2840
command: build
2941
args: --release

CONTRIBUTING.adoc

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
= OPERATOR-RS CONTRIBUTING GUIDE
2+
3+
Contributions in any form are welcome and appreciated. The recommended approach for code contributions is to fork this repository and create a pull request.
4+
5+
== CODE
6+
7+
* Use the *stable* branch of Rust, not nightly.
8+
* Code must be formatted with https://github.com/rust-lang/rustfmt[rustfmt]. Easiest way to install `rustfmt` is to to invoke `cargo install rustfmt` and then format the project with `cargo fmt`.
9+
* Rustdoc: Structs, functions and all other elements are documented. In general, documentation style follows the https://doc.rust-lang.org/rustdoc/what-is-rustdoc.html[Rustdoc] guide.
10+
* Tests dependent on Kubernetes runtime are prefixed with `k8s_test_` and ignored.
11+
12+
13+
== TESTS
14+
15+
Tests requiring Kubernetes are *disabled* by default, yet enabled in link:.github/workflows/rust.yml[rust.yml] workflow. All others tests are *enabled* by default. This implies `cargo test` won't run Kubernetes-dependent tests by default. To run ignored test(s), invoke `cargo test -- --ignored`.
16+
17+
Kubernetes tests require a `KUBECONFIG` environment variable pointing to a valid kubeconfig file. The kubeconfig must:
18+
19+
* Point to a reachable Kubernetes cluster,
20+
* Use account with enough permissions to run the tests,
21+
* Be readable by the user running the tests.
22+
23+
On Linux, a convenient way to install local Kubernetes cluster is https://k3s.io/:[K3S.io]. Invoking `curl -sfL https://get.k3s.io | sh -` installs latest version of K3S and exports a `KUBECONFIG` at `/etc/rancher/k3s/k3s.yaml`. To run Kubernetes-dependent tests with K3S, set the `KUBECONFIG` environment variable using `export KUBECONFIG=/etc/rancher/k3s/k3s.yaml`. Make sure it is readable by user invoking the tests by using `chown`/`chmod`. Running the `cargo test -- --ignored` command afterwards runs all tests, including Kubernetes tests. K3S is also leveraged in link:.github/workflows/rust.yml[rust.yml] workflow.

README.adoc

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
= operator-rs
1+
= OPERATOR-RS
22

33
This is a simple library that includes all kinds of helper methods, structs and enums that can be used to write a Kubernetes Controller/Operator with the https://github.com/clux/kube-rs[kube-rs] crate.
44

5-
WARNING: This is very new, undocumented and untested!
5+
WARNING: This is very new, undocumented and partially tested (see the link:CONTRIBUTING.adoc[contributing guide])!
6+
7+
8+
== CONTRIBUTING
9+
10+
Contributions are much welcome. A detailed guide on contributing, development and testing is to be found at link:CONTRIBUTING.adoc[CONTRIBUTING].

src/client.rs

+169-4
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ use crate::label_selector;
44
use crate::podutils;
55

66
use either::Either;
7+
use futures::StreamExt;
78
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, LabelSelector};
89
use k8s_openapi::Resource;
910
use kube::api::{DeleteParams, ListParams, Meta, Patch, PatchParams, PostParams};
1011
use kube::client::{Client as KubeClient, Status};
11-
use kube::Api;
12+
use kube::{Api, Config};
1213
use serde::de::DeserializeOwned;
1314
use serde::Serialize;
15+
use std::convert::TryFrom;
1416
use tracing::trace;
1517

1618
/// This `Client` can be used to access Kubernetes.
@@ -21,10 +23,16 @@ pub struct Client {
2123
patch_params: PatchParams,
2224
post_params: PostParams,
2325
delete_params: DeleteParams,
26+
/// Default namespace as defined in the kubeconfig this client has been created from.
27+
pub default_namespace: String,
2428
}
2529

2630
impl Client {
27-
pub fn new(client: KubeClient, field_manager: Option<String>) -> Self {
31+
pub fn new(
32+
client: KubeClient,
33+
field_manager: Option<String>,
34+
default_namespace: String,
35+
) -> Self {
2836
Client {
2937
client,
3038
post_params: PostParams {
@@ -39,6 +47,7 @@ impl Client {
3947
..PatchParams::default()
4048
},
4149
delete_params: DeleteParams::default(),
50+
default_namespace,
4251
}
4352
}
4453

@@ -236,7 +245,7 @@ impl Client {
236245
/// It checks whether the resource is already deleted by looking at the `deletion_timestamp`
237246
/// of the resource using the [`finalizer::has_deletion_stamp`] method.
238247
/// If that is the case it'll return a `Ok(None)`.
239-
///
248+
///
240249
/// In case the object is actually deleted or marked for deletion there are two possible
241250
/// return types.
242251
/// Which of the two are returned depends on the API being called.
@@ -309,11 +318,167 @@ impl Client {
309318
{
310319
Api::namespaced(self.client.clone(), namespace)
311320
}
321+
322+
/// Waits indefinitely until resources matching given `ListParams` are created in Kubernetes.
323+
/// If the resource is already present, this method just returns. Makes no assumptions about resource's state,
324+
/// e.g. a pod created could be created, but not in a ready state.
325+
///
326+
/// # Arguments
327+
///
328+
/// - `namespace` - Optional namespace to look for the resources in.
329+
/// - `lp` - Parameters to filter resources to wait for in given namespace.
330+
///
331+
/// # Example
332+
///
333+
/// ```no_run
334+
/// use kube::api::ListParams;
335+
/// use std::time::Duration;
336+
/// use tokio::time::error::Elapsed;
337+
/// use k8s_openapi::api::core::v1::Pod;
338+
/// use stackable_operator::client::{Client, create_client};
339+
///
340+
/// #[tokio::main]
341+
/// async fn main(){
342+
/// let client: Client = create_client(None).await.expect("Unable to construct client.");
343+
/// let lp: ListParams =
344+
/// ListParams::default().fields(&format!("metadata.name=nonexistent-pod"));
345+
///
346+
/// // Will time out in 1 second unless the nonexistent-pod actually exists
347+
/// let wait_created_result: Result<(), Elapsed> = tokio::time::timeout(
348+
/// Duration::from_secs(1),
349+
/// client.wait_created::<Pod>(Some(client.default_namespace.clone()), lp.clone()),
350+
/// )
351+
/// .await;
352+
/// }
353+
/// ```
354+
///
355+
pub async fn wait_created<T>(&self, namespace: Option<String>, lp: ListParams)
356+
where
357+
T: Meta + Clone + DeserializeOwned + Send + 'static,
358+
{
359+
let api: Api<T> = self.get_api(namespace);
360+
let watcher = kube_runtime::watcher(api, lp).boxed();
361+
kube_runtime::utils::try_flatten_applied(watcher)
362+
.skip_while(|res| std::future::ready(res.is_err()))
363+
.next()
364+
.await;
365+
}
312366
}
313367

314368
pub async fn create_client(field_manager: Option<String>) -> OperatorResult<Client> {
369+
let kubeconfig: Config = kube::Config::infer().await?;
370+
let default_namespace = kubeconfig.default_ns.clone();
315371
Ok(Client::new(
316-
kube::Client::try_default().await?,
372+
kube::Client::try_from(kubeconfig)?,
317373
field_manager,
374+
default_namespace,
318375
))
319376
}
377+
378+
#[cfg(test)]
379+
mod tests {
380+
use futures::StreamExt;
381+
use k8s_openapi::api::core::v1::{Container, Pod, PodSpec};
382+
use kube::api::{ListParams, Meta, ObjectMeta, PostParams};
383+
use kube_runtime::watcher::Event;
384+
use std::time::Duration;
385+
use tokio::time::error::Elapsed;
386+
387+
#[tokio::test]
388+
#[ignore = "Tests depending on Kubernetes are not ran by default"]
389+
async fn k8s_test_wait_created() {
390+
let client = super::create_client(None)
391+
.await
392+
.expect("KUBECONFIG variable must be configured.");
393+
394+
// Definition of the pod the `wait_created` function will be waiting for.
395+
let pod_to_wait_for: Pod = Pod {
396+
metadata: ObjectMeta {
397+
name: Some("test-wait-created-busybox".to_owned()),
398+
..ObjectMeta::default()
399+
},
400+
spec: Some(PodSpec {
401+
containers: vec![Container {
402+
name: "test-wait-created-busybox".to_owned(),
403+
image: Some("busybox:latest".to_owned()),
404+
image_pull_policy: Some("IfNotPresent".to_owned()),
405+
command: Some(vec!["sleep".into(), "infinity".into()]),
406+
..Container::default()
407+
}],
408+
termination_grace_period_seconds: Some(1),
409+
..PodSpec::default()
410+
}),
411+
..Pod::default()
412+
};
413+
let api = client.get_api(Some(client.default_namespace.clone()));
414+
let created_pod = api
415+
.create(&PostParams::default(), &pod_to_wait_for)
416+
.await
417+
.expect("Test pod not created.");
418+
let lp: ListParams = ListParams::default().fields(&format!(
419+
"metadata.name={}",
420+
created_pod
421+
.metadata
422+
.name
423+
.as_ref()
424+
.expect("Expected busybox pod to have metadata")
425+
));
426+
// First, let the tested `wait_creation` function wait until the resource is present.
427+
// Timeout is not acceptable
428+
tokio::time::timeout(
429+
Duration::from_secs(30), // Busybox is ~5MB and sub 1 sec to start.
430+
client.wait_created::<Pod>(Some(client.default_namespace.clone()), lp.clone()),
431+
)
432+
.await
433+
.expect("The tested wait_created function timed out.");
434+
435+
// A second, manually constructed watcher is used to verify the ListParams filter out the correct resource
436+
// and the `wait_created` function returned when the correct resources had been detected.
437+
let mut ready_watcher = kube_runtime::watcher::<Pod>(api, lp).boxed();
438+
while let Some(result) = ready_watcher.next().await {
439+
match result {
440+
Ok(event) => match event {
441+
Event::Applied(pod) => {
442+
assert_eq!("test-wait-created-busybox", pod.name());
443+
}
444+
Event::Restarted(pods) => {
445+
assert_eq!(1, pods.len());
446+
assert_eq!("test-wait-created-busybox", &pods[0].name());
447+
break;
448+
}
449+
Event::Deleted(_) => {
450+
panic!("Not expected the test_wait_created busybox pod to be deleted");
451+
}
452+
},
453+
Err(_) => {
454+
panic!("Error while waiting for readiness.");
455+
}
456+
}
457+
}
458+
459+
client
460+
.delete(&created_pod)
461+
.await
462+
.expect("Expected test_wait_created pod to be deleted.");
463+
}
464+
465+
#[tokio::test]
466+
#[ignore = "Tests depending on Kubernetes are not ran by default"]
467+
async fn k8s_test_wait_created_timeout() {
468+
let client = super::create_client(None)
469+
.await
470+
.expect("KUBECONFIG variable must be configured.");
471+
472+
let lp: ListParams =
473+
ListParams::default().fields(&format!("metadata.name=nonexistent-pod"));
474+
475+
// There is no such pod, therefore the `wait_created` function call times out.
476+
let wait_created_result: Result<(), Elapsed> = tokio::time::timeout(
477+
Duration::from_secs(1),
478+
client.wait_created::<Pod>(Some(client.default_namespace.clone()), lp.clone()),
479+
)
480+
.await;
481+
482+
assert!(wait_created_result.is_err());
483+
}
484+
}

src/crd.rs

+39-8
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
use crate::client::Client;
2-
use crate::error::{Error, OperatorResult};
1+
use std::time::Duration;
32

43
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
54
use kube::error::ErrorResponse;
65
use tracing::info;
76

7+
use crate::client::Client;
8+
use crate::error::{Error, OperatorResult};
9+
use kube::api::ListParams;
10+
811
/// This trait can be implemented to allow automatic handling
912
/// (e.g. creation) of `CustomResourceDefinition`s in Kubernetes.
1013
pub trait Crd {
@@ -61,10 +64,16 @@ where
6164
}
6265
}
6366

64-
/// This makes sure the CRD is registered in the apiserver.
65-
/// Currently this does not retry internally.
66-
/// This means that running it again _might_ work in case of transient errors.
67-
// TODO: Make sure to wait until it's enabled in the apiserver
67+
/// Makes sure CRD of given type `T` is running and accepted by the Kubernetes apiserver.
68+
/// If the CRD already exists at the time this method is invoked, this method exits.
69+
/// If there is no CRD of type `T` yet, it will attempt to create it and verify k8s apiserver
70+
/// applied the CRD. This method retries indefinitely. Use timeout on the `future` returned
71+
/// to apply time limit constraint.
72+
///
73+
/// # Parameters
74+
/// - `client`: Client to connect to Kubernetes API and create the CRD with
75+
/// - `timeout`: If specified, retries creating the CRD for given `Duration`. If not specified,
76+
/// retries indefinitely.
6877
pub async fn ensure_crd_created<T>(client: Client) -> OperatorResult<()>
6978
where
7079
T: Crd,
@@ -74,8 +83,15 @@ where
7483
Ok(())
7584
} else {
7685
info!("CRD not detected in Kubernetes. Attempting to create it.");
77-
create::<T>(client).await
78-
// TODO: Maybe retry?
86+
87+
loop {
88+
if let Ok(res) = create::<T>(client.clone()).await {
89+
break res;
90+
}
91+
tokio::time::sleep(Duration::from_millis(100)).await;
92+
}
93+
wait_created::<T>(client.clone()).await?;
94+
Ok(())
7995
}
8096
}
8197

@@ -90,3 +106,18 @@ where
90106
let crd: CustomResourceDefinition = serde_yaml::from_str(T::CRD_DEFINITION)?;
91107
client.create(&crd).await.and(Ok(()))
92108
}
109+
110+
/// Waits until CRD of given type `T` is applied to Kubernetes.
111+
pub async fn wait_created<T>(client: Client) -> OperatorResult<()>
112+
where
113+
T: Crd,
114+
{
115+
let lp: ListParams = ListParams {
116+
field_selector: Some(format!("metadata.name={}", T::RESOURCE_NAME)),
117+
..ListParams::default()
118+
};
119+
client
120+
.wait_created::<CustomResourceDefinition>(None, lp)
121+
.await;
122+
Ok(())
123+
}

src/error.rs

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ pub enum Error {
2323

2424
#[error("LabelSelector is invalid: {message}")]
2525
InvalidLabelSelector { message: String },
26+
#[error("Operation timed out: {source}")]
27+
TimeoutError {
28+
#[from]
29+
source: tokio::time::error::Elapsed,
30+
},
2631
}
2732

2833
pub type OperatorResult<T> = std::result::Result<T, Error>;

0 commit comments

Comments
 (0)