-
Notifications
You must be signed in to change notification settings - Fork 0
/
docker_filter.py
83 lines (70 loc) · 1.88 KB
/
docker_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import attach.attach as attach
import docker
import click
import json
from bcc import BPF
def log(level, message, color):
click.echo(click.style(f"{level}: {message}", fg=color))
def fatal(message):
log("Fatal", message, "red")
def info(message):
log("Info", message, "white")
@click.command()
@click.option("-i", type = str, help = "Pass container id")
@click.option("-n", type = str, help = "Pass container name")
@click.option("-d", is_flag = True, help = "Detach trace print from terminal")
@click.argument("action")
@click.argument("allowlistfile")
def cli(i, n, d, action, allowlistfile):
if not action:
fatal("No action passed")
if action != "attach" and action != "detach":
fatal("Illegal action")
if not i:
if not n:
fatal("No args passed")
return
client = docker.from_env()
container = None
try:
container = client.containers.get(n)
except docker.errors.NotFound as e:
fatal(f"No such container: {n}")
return
i = container.id
# container id exists now
cgroup2_path = "/sys/fs/cgroup/system.slice/" + "docker-" + i + ".scope"
# create bcc bpf object
b = attach.getBPF()
allowDict = {}
# read allowlist file
with open(allowlistfile, "r") as disallowFile:
allowDict = json.load(disallowFile)
print(allowDict)
# set allow bpf hash
attach.setAllowHash(b, allowDict)
# return
# detach existing handlers
info("Detatching existing handlers...")
try:
attach.sock(b, attach.DETACH)
except Exception as e:
info("No sock to detach")
try:
attach.kprobe(b, attach.DETACH)
except Exception as e:
info("No kprobe to detach")
print(e)
if action == "attach":
info("Attaching handlers...")
attach.kprobe(b)
attach.sock(b, cgroup2_path)
if not d:
try:
b.trace_print()
except KeyboardInterrupt:
info("Detatching...")
attach.sock(b, cgroup2_path, attach.DETACH)
attach.kprobe(b, attach.DETACH)
if __name__ == '__main__':
cli()