-
Notifications
You must be signed in to change notification settings - Fork 551
Expand file tree
/
Copy pathruntime.rs
More file actions
690 lines (628 loc) · 25.5 KB
/
runtime.rs
File metadata and controls
690 lines (628 loc) · 25.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::constants::{KUBECONFIG_PATH, container_name, node_name};
use bollard::Docker;
use bollard::container::LogOutput;
use bollard::exec::CreateExecOptions;
use bollard::models::HealthStatusEnum;
use bollard::query_parameters::{InspectContainerOptions, LogsOptionsBuilder};
use futures::StreamExt;
use miette::{IntoDiagnostic, Result};
use std::collections::VecDeque;
use std::time::Duration;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
/// Log markers emitted by the entrypoint and health-check scripts when DNS
/// resolution fails inside the container. Detecting these early lets us
/// short-circuit the 6-minute polling loop and surface a clear diagnosis.
const DNS_FAILURE_MARKERS: &[&str] = &["DNS_PROBE_FAILED", "HEALTHCHECK_DNS_FAILURE"];
/// Log marker emitted by the health-check script when a Kubernetes node is
/// under resource pressure (`DiskPressure`, `MemoryPressure`, `PIDPressure`).
/// When a node has pressure conditions the kubelet evicts pods and rejects
/// new scheduling, so the cluster will never become healthy on its own.
const NODE_PRESSURE_MARKER: &str = "HEALTHCHECK_NODE_PRESSURE";
/// Log marker emitted by the health-check script when the sandbox supervisor
/// binary (`/opt/openshell/bin/openshell-sandbox`) is missing from the node
/// filesystem. Without this binary, every sandbox pod will crash immediately
/// with "no such file or directory". This is a permanent error that requires
/// rebuilding or updating the cluster image.
const MISSING_SUPERVISOR_MARKER: &str = "HEALTHCHECK_MISSING_SUPERVISOR";
/// Number of consecutive polling iterations that must observe DNS failure
/// markers before we treat the failure as persistent and abort. A small
/// grace period avoids false positives on transient hiccups during startup.
const DNS_FAILURE_GRACE_ITERATIONS: u32 = 5;
/// Number of consecutive polling iterations that must observe node pressure
/// markers before aborting. Slightly longer grace period than DNS since
/// transient pressure can occur during image extraction on startup.
const NODE_PRESSURE_GRACE_ITERATIONS: u32 = 8;
pub async fn wait_for_gateway_ready<F>(docker: &Docker, name: &str, mut on_log: F) -> Result<()>
where
F: FnMut(String) + Send,
{
let container_name = container_name(name);
let (log_tx, mut log_rx) = unbounded_channel();
let log_docker = docker.clone();
let log_container_name = container_name.clone();
let log_task = tokio::spawn(async move {
stream_container_logs(&log_docker, &log_container_name, &log_tx).await;
});
let mut recent_logs = VecDeque::with_capacity(15);
let attempts = 180;
let mut result = None;
let mut dns_failure_seen_count: u32 = 0;
let mut node_pressure_seen_count: u32 = 0;
for attempt in 0..attempts {
drain_logs(&mut log_rx, &mut recent_logs, &mut on_log);
// -- Early DNS failure detection ---------------------------------
// Check recent logs for DNS failure markers emitted by the
// entrypoint or health-check scripts. If the marker persists for
// several consecutive iterations the DNS proxy is broken and
// waiting longer won't help.
let dns_failing = recent_logs
.iter()
.any(|line| DNS_FAILURE_MARKERS.iter().any(|m| line.contains(m)));
if dns_failing {
dns_failure_seen_count += 1;
if dns_failure_seen_count >= DNS_FAILURE_GRACE_ITERATIONS {
result = Some(Err(miette::miette!(
"dial tcp: lookup registry: Try again\n\
DNS resolution is failing inside the gateway container.\n{}",
format_recent_logs(&recent_logs)
)));
break;
}
} else {
dns_failure_seen_count = 0;
}
// -- Early node pressure detection -------------------------------
// Check for DiskPressure / MemoryPressure / PIDPressure markers
// emitted by the health-check script. Under pressure the kubelet
// evicts pods and blocks new scheduling, so waiting won't help.
let pressure_lines: Vec<&str> = recent_logs
.iter()
.filter(|line| line.contains(NODE_PRESSURE_MARKER))
.map(String::as_str)
.collect();
if pressure_lines.is_empty() {
node_pressure_seen_count = 0;
} else {
node_pressure_seen_count += 1;
if node_pressure_seen_count >= NODE_PRESSURE_GRACE_ITERATIONS {
// Extract the specific pressure type(s) from the marker lines
let conditions: Vec<String> = pressure_lines
.iter()
.filter_map(|line| {
line.find(NODE_PRESSURE_MARKER)
.map(|pos| &line[pos + NODE_PRESSURE_MARKER.len()..])
.map(|rest| rest.trim_start_matches(':').trim().to_string())
})
.filter(|s| !s.is_empty())
.collect();
let condition_list = if conditions.is_empty() {
"unknown pressure condition".to_string()
} else {
conditions.join(", ")
};
result = Some(Err(miette::miette!(
"HEALTHCHECK_NODE_PRESSURE: {condition_list}\n\
The cluster node is under resource pressure. \
The kubelet is evicting pods and rejecting new scheduling.\n{}",
format_recent_logs(&recent_logs)
)));
break;
}
}
// -- Missing supervisor binary detection ----------------------------
// The health-check script verifies that /opt/openshell/bin/openshell-sandbox
// exists on the node filesystem. If missing, every sandbox pod will crash.
// This is a permanent error — fail immediately with actionable guidance.
if recent_logs
.iter()
.any(|line| line.contains(MISSING_SUPERVISOR_MARKER))
{
result = Some(Err(miette::miette!(
"The sandbox supervisor binary is missing from the cluster image.\n\
The file /opt/openshell/bin/openshell-sandbox was not found in the gateway \
container. Without it, sandbox pods cannot start.\n\n\
This usually means the cluster image was built or published without the \
supervisor-builder stage.\n\n\
To fix:\n \
1. Rebuild the cluster image: mise run docker:build:cluster\n \
2. Or update to a cluster image that includes the supervisor binary\n \
3. Then recreate the gateway: openshell gateway destroy && openshell gateway start\n\n{}",
format_recent_logs(&recent_logs)
)));
break;
}
let inspect = docker
.inspect_container(&container_name, None::<InspectContainerOptions>)
.await
.into_diagnostic()?;
// Check if the container has exited before checking health
let running = inspect
.state
.as_ref()
.and_then(|s| s.running)
.unwrap_or(false);
if !running {
drain_logs(&mut log_rx, &mut recent_logs, &mut on_log);
let exit_code = inspect
.state
.as_ref()
.and_then(|s| s.exit_code)
.unwrap_or(-1);
let error_msg = inspect
.state
.as_ref()
.and_then(|s| s.error.as_deref())
.unwrap_or("");
let mut detail =
format!("gateway container exited unexpectedly (exit_code={exit_code})");
if !error_msg.is_empty() {
use std::fmt::Write;
let _ = write!(detail, ", error={error_msg}");
}
result = Some(Err(miette::miette!(
"{detail}\n{}",
format_recent_logs(&recent_logs)
)));
break;
}
let status = inspect
.state
.and_then(|state| state.health)
.and_then(|health| health.status);
match status {
Some(HealthStatusEnum::HEALTHY) => {
result = Some(Ok(()));
break;
}
Some(HealthStatusEnum::UNHEALTHY) if attempt + 1 == attempts => {
result = Some(Err(miette::miette!(
"gateway health check reported unhealthy\n{}",
format_recent_logs(&recent_logs)
)));
break;
}
Some(HealthStatusEnum::NONE | HealthStatusEnum::EMPTY) | None if attempt == 0 => {
result = Some(Err(miette::miette!(
"gateway container does not expose a health check\n{}",
format_recent_logs(&recent_logs)
)));
break;
}
_ => {}
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
if result.is_none() {
drain_logs(&mut log_rx, &mut recent_logs, &mut on_log);
result = Some(Err(miette::miette!(
"timed out waiting for gateway health check\n{}",
format_recent_logs(&recent_logs)
)));
}
log_task.abort();
result.unwrap_or_else(|| Err(miette::miette!("gateway health status unavailable")))
}
async fn stream_container_logs(
docker: &Docker,
container_name: &str,
tx: &UnboundedSender<String>,
) {
let options = LogsOptionsBuilder::new()
.follow(true)
.stdout(true)
.stderr(true)
.tail("0")
.build();
let mut stream = docker.logs(container_name, Some(options));
let mut stdout_partial = String::new();
let mut stderr_partial = String::new();
let mut console_partial = String::new();
while let Some(item) = stream.next().await {
match item {
Ok(log) => match log {
LogOutput::StdOut { message } => {
append_log_chunk(tx, &mut stdout_partial, &String::from_utf8_lossy(&message));
}
LogOutput::StdErr { message } => {
append_log_chunk(tx, &mut stderr_partial, &String::from_utf8_lossy(&message));
}
LogOutput::Console { message } => {
append_log_chunk(tx, &mut console_partial, &String::from_utf8_lossy(&message));
}
LogOutput::StdIn { .. } => {}
},
Err(err) => {
let _ = tx.send(format!("[log stream error] {err}"));
return;
}
}
}
flush_partial(tx, &mut stdout_partial);
flush_partial(tx, &mut stderr_partial);
flush_partial(tx, &mut console_partial);
}
fn append_log_chunk(tx: &UnboundedSender<String>, partial: &mut String, chunk: &str) {
partial.push_str(chunk);
while let Some(pos) = partial.find('\n') {
let line = partial[..pos].trim_end_matches('\r').to_string();
if !line.is_empty() {
let _ = tx.send(line);
}
partial.drain(..=pos);
}
}
fn flush_partial(tx: &UnboundedSender<String>, partial: &mut String) {
let line = partial.trim();
if !line.is_empty() {
let _ = tx.send(line.to_string());
}
partial.clear();
}
fn drain_logs<F>(
rx: &mut UnboundedReceiver<String>,
recent_logs: &mut VecDeque<String>,
on_log: &mut F,
) where
F: FnMut(String),
{
while let Ok(line) = rx.try_recv() {
if recent_logs.len() == 15 {
recent_logs.pop_front();
}
recent_logs.push_back(line.clone());
on_log(line);
}
}
fn format_recent_logs(recent_logs: &VecDeque<String>) -> String {
if recent_logs.is_empty() {
return "container logs: none received".to_string();
}
let mut rendered = String::from("container logs:");
for line in recent_logs {
rendered.push('\n');
rendered.push_str(" ");
rendered.push_str(line);
}
rendered
}
/// Fetch the last `n` lines of container logs (non-streaming, for error context).
pub async fn fetch_recent_logs(docker: &Docker, container_name: &str, n: usize) -> String {
let options = LogsOptionsBuilder::new()
.follow(false)
.stdout(true)
.stderr(true)
.tail(&n.to_string())
.build();
let mut stream = docker.logs(container_name, Some(options));
let mut lines = Vec::new();
while let Some(item) = stream.next().await {
match item {
Ok(log) => {
let text = match log {
LogOutput::StdOut { message }
| LogOutput::StdErr { message }
| LogOutput::Console { message } => {
String::from_utf8_lossy(&message).to_string()
}
LogOutput::StdIn { .. } => continue,
};
for line in text.lines() {
let trimmed = line.trim();
if !trimmed.is_empty() {
lines.push(trimmed.to_string());
}
}
}
Err(_) => break,
}
}
if lines.is_empty() {
return "container logs: none available".to_string();
}
let mut rendered = String::from("container logs:");
for line in &lines {
rendered.push('\n');
rendered.push_str(" ");
rendered.push_str(line);
}
rendered
}
/// Remove stale k3s nodes and their orphaned pods from a resumed cluster.
///
/// When a cluster container is recreated but the volume is reused, k3s registers
/// a new node (using the container ID as the hostname) while old node entries
/// persist in etcd. Pods scheduled on those stale `NotReady` nodes will never run,
/// causing health checks to fail.
///
/// This function retries with backoff until `kubectl` becomes available (k3s may
/// still be initialising), then:
/// 1. Deletes all `NotReady` nodes so k3s stops tracking them.
/// 2. Force-deletes any pods stuck in `Terminating` so `StatefulSets` and
/// Deployments can reschedule replacements on the current (Ready) node.
///
/// Returns the number of stale nodes removed.
pub async fn clean_stale_nodes(docker: &Docker, name: &str) -> Result<usize> {
// Retry until kubectl is responsive. k3s can take 10-20 s to start the
// API server after a container restart, so we allow up to ~45 s.
const MAX_ATTEMPTS: u32 = 15;
const RETRY_DELAY: Duration = Duration::from_secs(3);
let container_name = container_name(name);
let mut stale_nodes: Vec<String> = Vec::new();
// Determine the current node name. With the deterministic `--node-name`
// entrypoint change the k3s node is `openshell-{gateway}`. However, older
// cluster images (built before that change) still use the container hostname
// (= Docker container ID) as the node name. We must handle both:
//
// 1. If the expected deterministic name appears in the node list, use it.
// 2. Otherwise fall back to the container hostname (old behaviour).
//
// This ensures backward compatibility during upgrades where the bootstrap
// CLI is newer than the cluster image.
let deterministic_node = node_name(name);
for attempt in 1..=MAX_ATTEMPTS {
let (output, exit_code) = exec_capture_with_exit(
docker,
&container_name,
vec![
"sh".to_string(),
"-c".to_string(),
format!(
"KUBECONFIG={KUBECONFIG_PATH} kubectl get nodes \
--no-headers -o custom-columns=NAME:.metadata.name \
2>/dev/null"
),
],
)
.await?;
if exit_code == 0 {
let all_nodes: Vec<&str> = output
.lines()
.map(str::trim)
.filter(|l| !l.is_empty())
.collect();
// Pick the current node identity: prefer the deterministic name,
// fall back to the container hostname for older cluster images.
let current_node = if all_nodes.contains(&deterministic_node.as_str()) {
deterministic_node.clone()
} else {
// Older cluster image without --node-name: read hostname.
let (hostname_out, _) =
exec_capture_with_exit(docker, &container_name, vec!["hostname".to_string()])
.await?;
hostname_out.trim().to_string()
};
stale_nodes = all_nodes
.into_iter()
.filter(|n| *n != current_node)
.map(ToString::to_string)
.collect();
break;
}
if attempt < MAX_ATTEMPTS {
tracing::debug!(
"kubectl not ready yet (attempt {attempt}/{MAX_ATTEMPTS}), retrying in {}s",
RETRY_DELAY.as_secs()
);
tokio::time::sleep(RETRY_DELAY).await;
}
}
if stale_nodes.is_empty() {
return Ok(0);
}
let node_list = stale_nodes.join(" ");
let count = stale_nodes.len();
tracing::info!("removing {} stale node(s): {}", count, node_list);
// Step 1: delete the stale node objects.
let (_output, exit_code) = exec_capture_with_exit(
docker,
&container_name,
vec![
"sh".to_string(),
"-c".to_string(),
format!(
"KUBECONFIG={KUBECONFIG_PATH} kubectl delete node {node_list} --ignore-not-found"
),
],
)
.await?;
if exit_code != 0 {
tracing::warn!("failed to delete stale nodes (exit code {exit_code})");
}
// Step 2: force-delete pods stuck in Terminating. After the stale node is
// removed, pods that were scheduled on it transition to Terminating but
// will never complete graceful shutdown (the node is gone). StatefulSets
// will not create a replacement until the old pod is fully deleted.
let (_output, exit_code) = exec_capture_with_exit(
docker,
&container_name,
vec![
"sh".to_string(),
"-c".to_string(),
format!(
"KUBECONFIG={KUBECONFIG_PATH} kubectl get pods --all-namespaces \
--field-selector=status.phase=Running -o name 2>/dev/null; \
for pod_line in $(KUBECONFIG={KUBECONFIG_PATH} kubectl get pods --all-namespaces \
--no-headers 2>/dev/null | awk '$4 == \"Terminating\" {{print $1\"/\"$2}}'); do \
ns=${{pod_line%%/*}}; pod=${{pod_line#*/}}; \
KUBECONFIG={KUBECONFIG_PATH} kubectl delete pod \"$pod\" -n \"$ns\" \
--force --grace-period=0 --ignore-not-found 2>/dev/null; \
done"
),
],
)
.await?;
if exit_code != 0 {
tracing::debug!(
"force-delete of terminating pods returned exit code {exit_code} (non-fatal)"
);
}
// Step 3: delete PersistentVolumeClaims in the openshell namespace whose
// backing PV has node affinity for a stale node. local-path-provisioner
// creates PVs tied to the original node; when the node changes, the PV is
// unschedulable and the `StatefulSet` pod stays Pending. Deleting the PVC
// (and its PV) lets the provisioner create a fresh one on the current node.
let (_output, exit_code) = exec_capture_with_exit(
docker,
&container_name,
vec![
"sh".to_string(),
"-c".to_string(),
format!(
r#"KUBECONFIG={KUBECONFIG_PATH}; export KUBECONFIG; \
CURRENT_NODE=$(kubectl get nodes --no-headers -o custom-columns=NAME:.metadata.name 2>/dev/null | head -1); \
[ -z "$CURRENT_NODE" ] && exit 0; \
for pv in $(kubectl get pv -o jsonpath='{{.items[*].metadata.name}}' 2>/dev/null); do \
NODE=$(kubectl get pv "$pv" -o jsonpath='{{.spec.nodeAffinity.required.nodeSelectorTerms[0].matchExpressions[0].values[0]}}' 2>/dev/null); \
[ "$NODE" = "$CURRENT_NODE" ] && continue; \
NS=$(kubectl get pv "$pv" -o jsonpath='{{.spec.claimRef.namespace}}' 2>/dev/null); \
PVC=$(kubectl get pv "$pv" -o jsonpath='{{.spec.claimRef.name}}' 2>/dev/null); \
[ -n "$PVC" ] && kubectl delete pvc "$PVC" -n "$NS" --ignore-not-found 2>/dev/null; \
kubectl delete pv "$pv" --ignore-not-found 2>/dev/null; \
done"#
),
],
)
.await?;
if exit_code != 0 {
tracing::debug!("PV/PVC cleanup returned exit code {exit_code} (non-fatal)");
}
Ok(count)
}
/// Restart the openshell workload so pods pick up updated images or secrets.
///
/// Probes for a `StatefulSet` first, then falls back to a `Deployment`, matching
/// the same detection pattern used by `cluster-deploy-fast.sh`.
pub async fn restart_openshell_deployment(docker: &Docker, name: &str) -> Result<()> {
let cname = container_name(name);
// Detect which workload kind exists in the cluster.
let workload_kind = detect_openshell_workload_kind(docker, &cname).await?;
let workload_ref = format!("{workload_kind}/openshell");
let (restart_output, restart_exit) = exec_capture_with_exit(
docker,
&cname,
vec![
"sh".to_string(),
"-c".to_string(),
format!(
"KUBECONFIG={KUBECONFIG_PATH} kubectl rollout restart {workload_ref} -n openshell"
),
],
)
.await?;
if restart_exit != 0 {
return Err(miette::miette!(
"failed to restart openshell {workload_ref} (exit code {restart_exit})\n{restart_output}"
));
}
let (status_output, status_exit) = exec_capture_with_exit(
docker,
&cname,
vec![
"sh".to_string(),
"-c".to_string(),
format!(
"KUBECONFIG={KUBECONFIG_PATH} kubectl rollout status {workload_ref} -n openshell --timeout=180s"
),
],
)
.await?;
if status_exit != 0 {
return Err(miette::miette!(
"openshell rollout status failed for {workload_ref} (exit code {status_exit})\n{status_output}"
));
}
Ok(())
}
/// Check whether an openshell workload exists in the cluster (`StatefulSet` or `Deployment`).
pub async fn openshell_workload_exists(docker: &Docker, name: &str) -> Result<bool> {
let cname = container_name(name);
match detect_openshell_workload_kind(docker, &cname).await {
Ok(_) => Ok(true),
Err(_) => Ok(false),
}
}
/// Detect whether openshell is deployed as a `StatefulSet` or `Deployment`.
/// Returns "statefulset" or "deployment".
async fn detect_openshell_workload_kind(docker: &Docker, container_name: &str) -> Result<String> {
// Check StatefulSet first (primary workload type for fresh deploys)
let (_, ss_exit) = exec_capture_with_exit(
docker,
container_name,
vec![
"sh".to_string(),
"-c".to_string(),
format!(
"KUBECONFIG={KUBECONFIG_PATH} kubectl get statefulset/openshell -n openshell -o name 2>/dev/null"
),
],
)
.await?;
if ss_exit == 0 {
return Ok("statefulset".to_string());
}
// Fall back to Deployment
let (_, dep_exit) = exec_capture_with_exit(
docker,
container_name,
vec![
"sh".to_string(),
"-c".to_string(),
format!(
"KUBECONFIG={KUBECONFIG_PATH} kubectl get deployment/openshell -n openshell -o name 2>/dev/null"
),
],
)
.await?;
if dep_exit == 0 {
return Ok("deployment".to_string());
}
Err(miette::miette!(
"no openshell workload (statefulset or deployment) found in namespace 'openshell'"
))
}
pub async fn exec_capture_with_exit(
docker: &Docker,
container_name: &str,
cmd: Vec<String>,
) -> Result<(String, i64)> {
let exec = docker
.create_exec(
container_name,
CreateExecOptions {
attach_stdout: Some(true),
attach_stderr: Some(true),
cmd: Some(cmd),
..Default::default()
},
)
.await
.into_diagnostic()?
.id;
let start = docker.start_exec(&exec, None).await.into_diagnostic()?;
let mut buffer = String::new();
if let bollard::exec::StartExecResults::Attached { mut output, .. } = start {
while let Some(item) = output.next().await {
let log = item.into_diagnostic()?;
match log {
LogOutput::StdOut { message }
| LogOutput::StdErr { message }
| LogOutput::Console { message } => {
buffer.push_str(&String::from_utf8_lossy(&message));
}
LogOutput::StdIn { .. } => {}
}
}
}
let mut exit_code = None;
for _ in 0..20 {
let inspect = docker.inspect_exec(&exec).await.into_diagnostic()?;
if let Some(code) = inspect.exit_code {
exit_code = Some(code);
break;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
Ok((buffer, exit_code.unwrap_or(1)))
}