Skip to content

Commit c2fc521

Browse files
vadorovskybanditopazzo
authored andcommitted
fix: Ensure that the container info of each process is up to date
Before this change, we were storing a map of namespaces and corresponding container info, but that didn't work - namespaces used by one container might be reused by a next container later. This change drops that solution and fetches the container info always.
1 parent a1822d6 commit c2fc521

File tree

1 file changed

+29
-19
lines changed

1 file changed

+29
-19
lines changed

crates/pulsar-core/src/pdk/process_tracker.rs

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,6 @@ struct ProcessTracker {
140140
rx: mpsc::UnboundedReceiver<TrackerRequest>,
141141
/// current processes
142142
processes: HashMap<Pid, ProcessData>,
143-
/// current containers
144-
containers: HashMap<Namespaces, ContainerInfo>,
145143
/// scheduled removal of exited processes
146144
next_cleanup: Timestamp,
147145
/// pending info requests arrived before the process was created
@@ -162,6 +160,7 @@ struct ProcessData {
162160
>,
163161
argv: Vec<String>,
164162
namespaces: Namespaces,
163+
container: Option<ContainerInfo>,
165164
}
166165

167166
/// Cleanup timeout in nanoseconds. This is how long an exited process
@@ -186,12 +185,12 @@ impl ProcessTracker {
186185
exec_changes: BTreeMap::new(),
187186
argv: Vec::new(),
188187
namespaces: Namespaces::default(),
188+
container: None,
189189
},
190190
);
191191
Self {
192192
rx,
193193
processes,
194-
containers: HashMap::new(),
195194
next_cleanup: Timestamp::now() + CLEANUP_TIMEOUT,
196195
pending_requests: Vec::new(),
197196
pending_updates: HashMap::new(),
@@ -263,25 +262,24 @@ impl ProcessTracker {
263262
}
264263
}
265264

266-
fn handle_container_info(
265+
fn get_container_info(
267266
&mut self,
268267
pid: Pid,
269-
namespaces: Namespaces,
270268
is_new_container: bool,
271-
) -> Result<(), ContainerError> {
269+
) -> Result<Option<ContainerInfo>, ContainerError> {
272270
let container_id = procfs::get_process_container_id(pid)?;
273-
if let Some(id) = container_id {
274-
let container_info = ContainerInfo::from_container_id(id.clone())?;
275-
self.containers.entry(namespaces).or_insert_with(|| {
271+
match container_id {
272+
Some(id) => {
273+
let container_info = ContainerInfo::from_container_id(id.clone())?;
276274
if is_new_container {
277275
log::debug!("Detected a new container {id}");
278276
} else {
279277
log::debug!("Detected an already existing container {id}");
280278
}
281-
container_info
282-
});
279+
Ok(Some(container_info))
280+
}
281+
None => Ok(None),
283282
}
284-
Ok(())
285283
}
286284

287285
fn handle_update(&mut self, mut update: TrackerUpdate) {
@@ -293,8 +291,13 @@ impl ProcessTracker {
293291
namespaces,
294292
is_new_container,
295293
} => {
296-
self.handle_container_info(pid, namespaces, is_new_container)
297-
.unwrap_or_else(|err| log::error!("{err}"));
294+
let container = match self.get_container_info(pid, is_new_container) {
295+
Ok(container) => container,
296+
Err(err) => {
297+
log::error!("{err}");
298+
None
299+
}
300+
};
298301
self.processes.insert(
299302
pid,
300303
ProcessData {
@@ -309,6 +312,7 @@ impl ProcessTracker {
309312
.map(|parent| parent.argv.clone())
310313
.unwrap_or_default(),
311314
namespaces,
315+
container,
312316
},
313317
);
314318
if let Some(pending_updates) = self.pending_updates.remove(&pid) {
@@ -325,11 +329,18 @@ impl ProcessTracker {
325329
namespaces,
326330
is_new_container,
327331
} => {
328-
self.handle_container_info(pid, namespaces, is_new_container)
329-
.unwrap_or_else(|err| log::error!("{err}"));
332+
let container = match self.get_container_info(pid, is_new_container) {
333+
Ok(container) => container,
334+
Err(err) => {
335+
log::error!("{err}");
336+
None
337+
}
338+
};
330339
if let Some(p) = self.processes.get_mut(&pid) {
331340
p.exec_changes.insert(timestamp, std::mem::take(image));
332-
p.argv = std::mem::take(argv)
341+
p.argv = std::mem::take(argv);
342+
p.namespaces = namespaces;
343+
p.container = container;
333344
} else {
334345
// if exec arrived before the fork, we save the event as pending
335346
log::debug!("(exec) Process {pid} not found in process tree, saving for later");
@@ -360,7 +371,6 @@ impl ProcessTracker {
360371
.processes
361372
.get(&pid)
362373
.ok_or(TrackerError::ProcessNotFound)?;
363-
let container: Option<ContainerInfo> = self.containers.get(&process.namespaces).cloned();
364374
if ts < process.fork_time {
365375
log::warn!(
366376
"{} not forked yet {} < {} ({}ms)",
@@ -383,7 +393,7 @@ impl ProcessTracker {
383393
fork_time: process.fork_time,
384394
argv: process.argv.clone(),
385395
namespaces: process.namespaces,
386-
container,
396+
container: process.container.clone(),
387397
})
388398
}
389399

0 commit comments

Comments
 (0)