Skip to content

Commit

Permalink
support selecting events from specific containers
Browse files Browse the repository at this point in the history
  • Loading branch information
piax93 committed Nov 6, 2024
1 parent 174b113 commit 7bd32f5
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 5 deletions.
4 changes: 4 additions & 0 deletions example_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ _net_filters: &net_filters
# Some configuration fields supported by all probes:
# filters: list of network filters (see above for schema); they act on the destination
# address for tcp_connect and udp_session, local listening address for net_listen
# container_labels: list of label filters to only capture events generated from processes in containers
# container_poll_interval: how often to poll for running containers (in seconds)
# excludeports: list of ports to be filtered out (cannot be used with includeports)
# includeports: list of ports for which events will be logged (filters out all the others) (cannot be used with excludeports)
# plugins: map of plugins to enable for the probe (check README for more details)
Expand All @@ -31,6 +33,8 @@ udp_session:
filters: *net_filters
tcp_connect:
filters: *net_filters
container_labels:
- key=value
plugins:
sourceipmap:
enabled: True
Expand Down
2 changes: 1 addition & 1 deletion pidtree_bcc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

HOTSWAP_CALLBACK_NAMESPACE = get_namespace('__change_callbacks')
LOADED_CONFIG_FILES_NAMESPACE = get_namespace('__loaded_configs')
HOT_SWAPPABLE_SETTINGS = ('filters', 'excludeports', 'includeports')
HOT_SWAPPABLE_SETTINGS = ('filters', 'excludeports', 'includeports', 'container_labels')
NON_PROBE_NAMESPACES = (DEFAULT_NAMESPACE, HOTSWAP_CALLBACK_NAMESPACE.name, LOADED_CONFIG_FILES_NAMESPACE.name)


Expand Down
82 changes: 82 additions & 0 deletions pidtree_bcc/containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import logging
import os
import subprocess
from functools import lru_cache
from itertools import chain
from typing import List
from typing import Set


@lru_cache(maxsize=1)
def detect_containerizer_client() -> str:
""" Detect whether the system is using Docker or Containerd.
Since containerd does not have a proper python API implementation, we rely on
CLI tools to query container information in both cases.
This method is very opinionated towards detecting Containerd usage in Kubernetes,
so in most cases it will fall back to standard Docker.
:return: CLI tool to query containerizer
"""
return 'nerdctl' if os.path.exists('/var/run/containerd/io.containerd.runtime.v2.task/k8s.io') else 'docker'


def list_containers(filter_labels: List[str] = None) -> List[str]:
""" List running containers matching filter
:param List[str] filter_labels: list of label values, either `<label_name>` or `<label_name>=<label_value>`
:return: yields container hash IDs
"""
filter_args = chain.from_iterable(('--filter', f'label={label}') for label in (filter_labels or []))
try:
output = subprocess.check_output(
(
detect_containerizer_client(), 'ps',
'--no-trunc', '--quiet', *filter_args,
), encoding='utf8',
)
return output.splitlines()
except Exception as e:
logging.error(f'Issue listing running containers: {e}')
return []


@lru_cache(maxsize=2048)
def get_container_mntns_id(sha: str) -> int:
""" Get mount namespace ID for a container
:param str sha: container hash ID
:return: mount namespace ID
"""
try:
output = subprocess.check_output(
(
detect_containerizer_client(), 'inspect',
'-f', r'{{.State.Pid}}', sha,
), encoding='utf8',
)
pid = int(output.splitlines()[0])
except Exception as e:
logging.error(f'Issue inspecting container {sha}: {e}')
return -1
try:
return os.stat(f'/proc/{pid}/ns/mnt').st_ino
except Exception as e:
logging.error(f'Issue reading mntns ID for {pid}: {e}')
return -1


def list_container_mnt_namespaces(filter_labels: List[str] = None) -> Set[int]:
""" Get collection of mount namespace IDs for running containers matching label filters
:param List[str] filter_labels: list of label values, either `<label_name>` or `<label_name>=<label_value>`
:return: set of mount namespace IDs
"""
return {
mntns_id
for mntns_id in (
get_container_mntns_id(container_id)
for container_id in list_containers(filter_labels)
if container_id
)
if mntns_id > 0
}
17 changes: 17 additions & 0 deletions pidtree_bcc/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any
from typing import Iterable
from typing import List
from typing import Set
from typing import Union

from pidtree_bcc.ctypes_helper import ComparableCtStructure
Expand Down Expand Up @@ -131,6 +132,7 @@ def load_filters_into_map(filters: List[dict], ebpf_map: Any, do_diff: bool = Fa
'include_ports': [789], # optional
}
:param Any ebpf_map: reference to eBPF table where filters should be loaded.
:param bool do_diff: diff input with existing values, removing excess entries
"""
leftovers = set(
# The map returns keys using an auto-generated type.
Expand Down Expand Up @@ -179,6 +181,7 @@ def load_port_filters_into_map(
:param List[Union[int, str]] filters: list of ports or port ranges
:param PortFilterMode mode: include or exclude
:param Any ebpf_map: array in which filters are loaded
:param bool do_diff: diff input with existing values, removing excess entries
"""
if mode not in (PortFilterMode.include, PortFilterMode.exclude):
raise ValueError('Invalid global port filtering mode: {}'.format(mode))
Expand All @@ -198,3 +201,17 @@ def load_port_filters_into_map(
ebpf_map[ctypes.c_int(port)] = ctypes.c_uint8(0)
# 0-element of the map holds the filtering mode
ebpf_map[ctypes.c_int(0)] = ctypes.c_uint8(mode.value)


def load_intset_into_map(intset: Set[int], ebpf_map: Any, do_diff: bool = False):
""" Loads set of int values into eBPF map
:param Set[int] intset: input values
:param Any ebpf_map: array in which filters are loaded
:param bool do_diff: diff input with existing values, removing excess entries
"""
current_state = set((k.value for k, _ in ebpf_map.items()) if do_diff else [])
for val in intset:
ebpf_map[ctypes.c_int(val)] = ctypes.c_uint8(1)
for val in (current_state - intset):
del ebpf_map[ctypes.c_int(val)]
27 changes: 25 additions & 2 deletions pidtree_bcc/probes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os.path
import platform
import re
import time
from datetime import datetime
from functools import lru_cache
from multiprocessing import SimpleQueue
Expand All @@ -18,7 +19,9 @@
from jinja2 import FileSystemLoader

from pidtree_bcc.config import enumerate_probe_configs
from pidtree_bcc.containers import list_container_mnt_namespaces
from pidtree_bcc.filtering import load_filters_into_map
from pidtree_bcc.filtering import load_intset_into_map
from pidtree_bcc.filtering import load_port_filters_into_map
from pidtree_bcc.filtering import NET_FILTER_MAX_PORT_RANGES
from pidtree_bcc.filtering import PortFilterMode
Expand Down Expand Up @@ -52,6 +55,8 @@ class BPFProbe:
NET_FILTER_MAP_SIZE_MAX = 4 * 1024
NET_FILTER_MAP_SIZE_SCALING = 512
PORT_FILTER_MAP_NAME = 'port_filter_map'
MNTNS_FILTER_MAP_NAME = 'mntns_filter_map'
DEFAULT_CONTAINER_POLL_INTERVAL = 30 # seconds

def __init__(
self,
Expand Down Expand Up @@ -96,6 +101,12 @@ class variable defining a list of config fields.
self.net_filter_mutex = Lock()
if self.USES_DYNAMIC_FILTERS and config_change_queue:
self.SIDECARS.append((self._poll_config_changes, (config_change_queue,)))
self.container_labels_filter = template_config.get('container_labels')
if self.container_labels_filter:
self.SIDECARS.append((
self._monitor_running_containers,
(template_config.get('container_poll_interval', self.DEFAULT_CONTAINER_POLL_INTERVAL),),
))

def build_probe_config(self, probe_config: dict, hotswap_only: bool = False) -> dict:
""" Load probe configuration values
Expand Down Expand Up @@ -130,6 +141,7 @@ def build_probe_config(self, probe_config: dict, hotswap_only: bool = False) ->
self.NET_FILTER_MAP_SIZE_MAX,
round_nearest_multiple(len(self.net_filters), self.NET_FILTER_MAP_SIZE_SCALING, headroom=128),
)
template_config['MNTNS_FILTER_MAP_NAME'] = self.MNTNS_FILTER_MAP_NAME
return template_config

@lru_cache(maxsize=1)
Expand Down Expand Up @@ -199,6 +211,17 @@ def _poll_config_changes(self, config_queue: SimpleQueue):
self.build_probe_config(config_data, hotswap_only=True)
self.reload_filters()

@never_crash
def _monitor_running_containers(self, poll_interval: int):
""" Polls running containers, filtering by label to keep mntns filtering map updated """
monitored_mntns = set()
while True:
mntns = list_container_mnt_namespaces(self.container_labels_filter)
if mntns != monitored_mntns:
monitored_mntns = mntns
load_intset_into_map(mntns, self.bpf[self.MNTNS_FILTER_MAP_NAME], True)
time.sleep(poll_interval)

def reload_filters(self, is_init: bool = False):
""" Load filters
Expand All @@ -211,12 +234,12 @@ def reload_filters(self, is_init: bool = False):

def start_polling(self):
""" Start infinite loop polling BPF events """
for func, args in self.SIDECARS:
Thread(target=func, args=args, daemon=True).start()
self.bpf = BPF(
text=self.expanded_bpf_text,
cflags=['-Wno-macro-redefined'] if self._has_buggy_headers() else [],
)
for func, args in self.SIDECARS:
Thread(target=func, args=args, daemon=True).start()
if self.lost_event_telemetry > 0:
extra_args = {'lost_cb': self._lost_event_callback}
poll_func = self._poll_and_check_lost
Expand Down
14 changes: 14 additions & 0 deletions pidtree_bcc/probes/net_listen.j2
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ struct listen_bind_t {

{{ utils.get_proto_func() }}

{% if container_labels %}
{{ utils.mntns_filter_init(MNTNS_FILTER_MAP_NAME) }}
{% endif %}

static void net_listen_event(struct pt_regs *ctx)
{
u32 pid = bpf_get_current_pid_tgid();
Expand Down Expand Up @@ -61,6 +65,11 @@ int kprobe__inet_bind(
const struct sockaddr *addr,
int addrlen)
{
{% if container_labels -%}
if (!is_mntns_included()) {
return 0;
}
{% endif -%}
{% if exclude_random_bind -%}
struct sockaddr_in* inet_addr = (struct sockaddr_in*)addr;
if (inet_addr->sin_port == 0) {
Expand All @@ -86,6 +95,11 @@ int kretprobe__inet_bind(struct pt_regs *ctx)
{% if 'tcp' in protocols -%}
int kprobe__inet_listen(struct pt_regs *ctx, struct socket *sock, int backlog)
{
{% if container_labels -%}
if (!is_mntns_included()) {
return 0;
}
{% endif -%}
struct sock* sk = sock->sk;
if (sk->__sk_common.skc_family == AF_INET) {
u32 pid = bpf_get_current_pid_tgid();
Expand Down
1 change: 1 addition & 0 deletions pidtree_bcc/probes/net_listen.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class NetListenProbe(BPFProbe):
'ip_to_int': ip_to_int,
'protocols': ['tcp'],
'filters': [],
'container_labels': [],
'excludeports': [],
'includeports': [],
'snapshot_periodicity': False,
Expand Down
9 changes: 9 additions & 0 deletions pidtree_bcc/probes/tcp_connect.j2
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@ struct connection_t {

{{ utils.net_filter_trie_init(NET_FILTER_MAP_NAME, PORT_FILTER_MAP_NAME, size=NET_FILTER_MAP_SIZE, max_ports=NET_FILTER_MAX_PORT_RANGES) }}

{% if container_labels %}
{{ utils.mntns_filter_init(MNTNS_FILTER_MAP_NAME) }}
{% endif %}

int kprobe__tcp_v4_connect(struct pt_regs *ctx, struct sock *sk)
{
{% if container_labels -%}
if (!is_mntns_included()) {
return 0;
}
{% endif -%}
u32 pid = bpf_get_current_pid_tgid();
currsock.update(&pid, &sk);
return 0;
Expand Down
1 change: 1 addition & 0 deletions pidtree_bcc/probes/tcp_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class TCPConnectProbe(BPFProbe):
CONFIG_DEFAULTS = {
'ip_to_int': ip_to_int,
'filters': [],
'container_labels': [],
'includeports': [],
'excludeports': [],
}
Expand Down
10 changes: 10 additions & 0 deletions pidtree_bcc/probes/udp_session.j2
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ BPF_HASH(tracing, u64, u8);

{{ utils.get_proto_func() }}

{% if container_labels %}
{{ utils.mntns_filter_init(MNTNS_FILTER_MAP_NAME) }}
{% endif %}

// We probe only the entrypoint as looking at return codes doesn't have much value
// since UDP does not do any checks for successfull communications. The only errors
// which may arise from this function would be due to the kernel running out of memory,
Expand All @@ -30,6 +34,12 @@ int kprobe__udp_sendmsg(struct pt_regs *ctx, struct sock *sk, struct msghdr *msg
{
if(sk->__sk_common.skc_family != AF_INET) return 0;

{% if container_labels -%}
if (!is_mntns_included()) {
return 0;
}
{% endif -%}

// Destination info will either be embedded in the socket if `connect`
// was called or specified in the message
struct sockaddr_in* sin = msg->msg_name;
Expand Down
1 change: 1 addition & 0 deletions pidtree_bcc/probes/udp_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class UDPSessionProbe(BPFProbe):
CONFIG_DEFAULTS = {
'ip_to_int': ip_to_int,
'filters': [],
'container_labels': [],
'includeports': [],
'excludeports': [],
}
Expand Down
27 changes: 27 additions & 0 deletions pidtree_bcc/probes/utils.j2
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,30 @@ enum {
#define BPF_PSEUDO_FUNC 4
{%- endif %}
{%- endmacro %}

{% macro mntns_filter_init(mntns_filter_map_name, size=512) -%}

/* Original source: https://github.com/iovisor/bcc/blob/master/tools/mountsnoop.py#L32-L52
* `struct mnt_namespace` is defined in fs/mount.h, which is private
* to the VFS and not installed in any kernel-devel packages. So, let's
* duplicate the important part of the definition. There are actually
* more members in the real struct, but we don't need them, and they're
* more likely to change.
*/
struct mnt_namespace {
// This field was removed in https://github.com/torvalds/linux/commit/1a7b8969e664d6af328f00fe6eb7aabd61a71d13
#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 11, 0)
atomic_t count;
#endif
struct ns_common ns;
};
BPF_HASH({{ mntns_filter_map_name }}, u64, bool, {{ size }});
static inline bool is_mntns_included() {
struct task_struct *task;
task = (struct task_struct *)bpf_get_current_task();
u64 mntns_id = task->nsproxy->mnt_ns->ns.inum;
return {{ mntns_filter_map_name }}.lookup(&mntns_id) != NULL;
}
{%- endmacro %}
Loading

0 comments on commit 7bd32f5

Please sign in to comment.