From b678ba0b7ce6eb828b106b610c54c7e8fe48a652 Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Sun, 7 Jan 2024 17:28:07 +0100 Subject: [PATCH 1/3] fix: Handle containers which were running before Pulsar Always check whether a detected process belongs to a container-related cgroup. This way we ensure that we are adding container info even when the container was created before Pulsar was launched. --- crates/bpf-common/src/parsing/containers.rs | 9 ++++++ crates/pulsar-core/src/pdk/process_tracker.rs | 32 +++++++++++-------- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/crates/bpf-common/src/parsing/containers.rs b/crates/bpf-common/src/parsing/containers.rs index 88d9257b..ea95defb 100644 --- a/crates/bpf-common/src/parsing/containers.rs +++ b/crates/bpf-common/src/parsing/containers.rs @@ -43,6 +43,15 @@ pub enum ContainerId { Libpod(String), } +impl fmt::Display for ContainerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ContainerId::Docker(id) => write!(f, "{id}"), + ContainerId::Libpod(id) => write!(f, "{id}"), + } + } +} + #[derive(Debug, Deserialize)] struct DockerConfig { #[serde(rename = "Config")] diff --git a/crates/pulsar-core/src/pdk/process_tracker.rs b/crates/pulsar-core/src/pdk/process_tracker.rs index ed9c8e58..d214b110 100644 --- a/crates/pulsar-core/src/pdk/process_tracker.rs +++ b/crates/pulsar-core/src/pdk/process_tracker.rs @@ -263,13 +263,23 @@ impl ProcessTracker { } } - fn handle_container(&mut self, pid: Pid, namespaces: Namespaces) -> Result<(), ContainerError> { + fn handle_container_info( + &mut self, + pid: Pid, + namespaces: Namespaces, + is_new_container: bool, + ) -> Result<(), ContainerError> { let container_id = procfs::get_process_container_id(pid)?; if let Some(id) = container_id { - let container_info = ContainerInfo::from_container_id(id)?; - self.containers.insert(namespaces, container_info); - } else { - log::warn!("could not determine a continer ID of a new containerized process {pid}"); + let container_info = ContainerInfo::from_container_id(id.clone())?; + self.containers.entry(namespaces).or_insert_with(|| { + if is_new_container { + log::debug!("Detected a new container {id}"); + } else { + log::debug!("Detected an already existing container {id}"); + } + container_info + }); } Ok(()) } @@ -283,10 +293,8 @@ impl ProcessTracker { namespaces, is_new_container, } => { - if is_new_container { - self.handle_container(pid, namespaces) - .unwrap_or_else(|err| log::error!("{err}")); - } + self.handle_container_info(pid, namespaces, is_new_container) + .unwrap_or_else(|err| log::error!("{err}")); self.processes.insert( pid, ProcessData { @@ -317,10 +325,8 @@ impl ProcessTracker { namespaces, is_new_container, } => { - if is_new_container { - self.handle_container(pid, namespaces) - .unwrap_or_else(|err| log::error!("{err}")); - } + self.handle_container_info(pid, namespaces, is_new_container) + .unwrap_or_else(|err| log::error!("{err}")); if let Some(p) = self.processes.get_mut(&pid) { p.exec_changes.insert(timestamp, std::mem::take(image)); p.argv = std::mem::take(argv) From ffd99aca9d620656af550986b21facb69517d8a1 Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Mon, 8 Jan 2024 14:30:16 +0100 Subject: [PATCH 2/3] fix: Ensure that the container info of each process is up to date Before this change, we were storing a map of namespaces and corresponding container info, but that didn't work - namespaces used by one container might be reused by a next container later. This change drops that solution and fetches the container info always. --- crates/pulsar-core/src/pdk/process_tracker.rs | 48 +++++++++++-------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/crates/pulsar-core/src/pdk/process_tracker.rs b/crates/pulsar-core/src/pdk/process_tracker.rs index d214b110..e9c421be 100644 --- a/crates/pulsar-core/src/pdk/process_tracker.rs +++ b/crates/pulsar-core/src/pdk/process_tracker.rs @@ -140,8 +140,6 @@ struct ProcessTracker { rx: mpsc::UnboundedReceiver, /// current processes processes: HashMap, - /// current containers - containers: HashMap, /// scheduled removal of exited processes next_cleanup: Timestamp, /// pending info requests arrived before the process was created @@ -162,6 +160,7 @@ struct ProcessData { >, argv: Vec, namespaces: Namespaces, + container: Option, } /// Cleanup timeout in nanoseconds. This is how long an exited process @@ -186,12 +185,12 @@ impl ProcessTracker { exec_changes: BTreeMap::new(), argv: Vec::new(), namespaces: Namespaces::default(), + container: None, }, ); Self { rx, processes, - containers: HashMap::new(), next_cleanup: Timestamp::now() + CLEANUP_TIMEOUT, pending_requests: Vec::new(), pending_updates: HashMap::new(), @@ -263,25 +262,24 @@ impl ProcessTracker { } } - fn handle_container_info( + fn get_container_info( &mut self, pid: Pid, - namespaces: Namespaces, is_new_container: bool, - ) -> Result<(), ContainerError> { + ) -> Result, ContainerError> { let container_id = procfs::get_process_container_id(pid)?; - if let Some(id) = container_id { - let container_info = ContainerInfo::from_container_id(id.clone())?; - self.containers.entry(namespaces).or_insert_with(|| { + match container_id { + Some(id) => { + let container_info = ContainerInfo::from_container_id(id.clone())?; if is_new_container { log::debug!("Detected a new container {id}"); } else { log::debug!("Detected an already existing container {id}"); } - container_info - }); + Ok(Some(container_info)) + } + None => Ok(None), } - Ok(()) } fn handle_update(&mut self, mut update: TrackerUpdate) { @@ -293,8 +291,13 @@ impl ProcessTracker { namespaces, is_new_container, } => { - self.handle_container_info(pid, namespaces, is_new_container) - .unwrap_or_else(|err| log::error!("{err}")); + let container = match self.get_container_info(pid, is_new_container) { + Ok(container) => container, + Err(err) => { + log::error!("{err}"); + None + } + }; self.processes.insert( pid, ProcessData { @@ -309,6 +312,7 @@ impl ProcessTracker { .map(|parent| parent.argv.clone()) .unwrap_or_default(), namespaces, + container, }, ); if let Some(pending_updates) = self.pending_updates.remove(&pid) { @@ -325,11 +329,18 @@ impl ProcessTracker { namespaces, is_new_container, } => { - self.handle_container_info(pid, namespaces, is_new_container) - .unwrap_or_else(|err| log::error!("{err}")); + let container = match self.get_container_info(pid, is_new_container) { + Ok(container) => container, + Err(err) => { + log::error!("{err}"); + None + } + }; if let Some(p) = self.processes.get_mut(&pid) { p.exec_changes.insert(timestamp, std::mem::take(image)); - p.argv = std::mem::take(argv) + p.argv = std::mem::take(argv); + p.namespaces = namespaces; + p.container = container; } else { // if exec arrived before the fork, we save the event as pending log::debug!("(exec) Process {pid} not found in process tree, saving for later"); @@ -360,7 +371,6 @@ impl ProcessTracker { .processes .get(&pid) .ok_or(TrackerError::ProcessNotFound)?; - let container: Option = self.containers.get(&process.namespaces).cloned(); if ts < process.fork_time { log::warn!( "{} not forked yet {} < {} ({}ms)", @@ -383,7 +393,7 @@ impl ProcessTracker { fork_time: process.fork_time, argv: process.argv.clone(), namespaces: process.namespaces, - container, + container: process.container.clone(), }) } From 510666cbbdfa5ce4495262ecb8fafad5afb8e248 Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Wed, 10 Jan 2024 14:22:25 +0100 Subject: [PATCH 3/3] fix: Don't return errors or warnings for PID 0 The scheduler/idle task is not our object of interes and it has no cgroups or namespaces, it also never can belong to any container. Just ignore it. --- crates/bpf-common/src/parsing/procfs.rs | 4 ++++ crates/bpf-filtering/src/process_tree.rs | 14 ++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/crates/bpf-common/src/parsing/procfs.rs b/crates/bpf-common/src/parsing/procfs.rs index fb41bfce..b61d3113 100644 --- a/crates/bpf-common/src/parsing/procfs.rs +++ b/crates/bpf-common/src/parsing/procfs.rs @@ -178,6 +178,10 @@ fn get_container_id_from_cgroup(cgroup_info: &str) -> Option { } pub fn get_process_container_id(pid: Pid) -> Result, ProcfsError> { + if pid.as_raw() == 0 { + return Ok(None); + } + let path = format!("/proc/{pid}/cgroup"); let file = File::open(&path).map_err(|source| ProcfsError::ReadFile { source, path })?; diff --git a/crates/bpf-filtering/src/process_tree.rs b/crates/bpf-filtering/src/process_tree.rs index 0a902ef2..1d6c5506 100644 --- a/crates/bpf-filtering/src/process_tree.rs +++ b/crates/bpf-filtering/src/process_tree.rs @@ -66,12 +66,14 @@ fn get_process_namespace(pid: Pid, ns_type: &str) -> Result { fn get_process_namespace_or_log(pid: Pid, namespace_type: &str) -> u32 { get_process_namespace(pid, namespace_type).map_or_else( |e| { - log::warn!( - "Failed to determine {} namespace for process {:?}: {}", - namespace_type, - pid, - e - ); + if pid.as_raw() != 0 { + log::warn!( + "Failed to determine {} namespace for process {:?}: {}", + namespace_type, + pid, + e + ); + } u32::default() }, |v| v,