|
| 1 | +import json |
| 2 | +import subprocess |
| 3 | +import yaml |
| 4 | +from functools import partial |
| 5 | + |
| 6 | +def hex2int(hex): |
| 7 | + return int(hex.replace("0x", ""), 16) |
| 8 | + |
| 9 | + |
| 10 | +def parse_ipv4(rule): |
| 11 | + k = ".".join(reversed([str(hex2int(k)) for k in rule["key"]])) |
| 12 | + chunks_v = [rule["value"][idx : idx + 2] for idx in range(0, len(rule["value"]), 2)] |
| 13 | + ports = [hex2int(f"{chunk[1]}{chunk[0]}") for chunk in chunks_v] |
| 14 | + v = ( |
| 15 | + "[{}, ...]".format(", ".join([str(k) for k in ports if k != 0])) |
| 16 | + if not all(map(lambda x: x == 0, ports)) |
| 17 | + else "*" |
| 18 | + ) |
| 19 | + return {k: v} |
| 20 | + |
| 21 | +def parse_ipv6(rule): |
| 22 | + chunks_k = [rule["key"][idx : idx + 2] for idx in range(0, len(rule["key"]), 2)] |
| 23 | + k = ":".join( |
| 24 | + reversed([str(hex2int(f"{chunk[1]}{chunk[0]}")) for chunk in chunks_k]) |
| 25 | + ) |
| 26 | + chunks_v = [rule["value"][idx : idx + 2] for idx in range(0, len(rule["value"]), 2)] |
| 27 | + ports = [hex2int(f"{chunk[1]}{chunk[0]}") for chunk in chunks_v] |
| 28 | + v = ( |
| 29 | + "[{}, ...]".format(", ".join([str(k) for k in ports if k != 0])) |
| 30 | + if not all(map(lambda x: x == 0, ports)) |
| 31 | + else "*" |
| 32 | + ) |
| 33 | + return {k: v} |
| 34 | + |
| 35 | + |
| 36 | + |
| 37 | + |
| 38 | +filter_idx_map = dict( |
| 39 | + transport={0: "tcp", 1: "udp"}, |
| 40 | + network={0: "ipv4", 1: "ipv6", 2: "icmp"}, |
| 41 | + link={0: "arp"}, |
| 42 | +) |
| 43 | + |
| 44 | + |
| 45 | +def parse_filter(rule, filter_type): |
| 46 | + idx = hex2int(rule["key"][0]) |
| 47 | + flag = hex2int(rule["value"][0]) |
| 48 | + idx_map = filter_idx_map[filter_type] |
| 49 | + if idx in idx_map: |
| 50 | + filter_name = idx_map[idx] |
| 51 | + return {filter_name: "❌" if flag else "✔️"} |
| 52 | + |
| 53 | + |
| 54 | +def parse_traffic_filter(rule): |
| 55 | + flag = hex2int(rule["value"][0]) |
| 56 | + return {"status": "❌" if flag else "✔️"} |
| 57 | + |
| 58 | + |
| 59 | + |
| 60 | +def parse_ingress_egress(rule): |
| 61 | + return {"type":"ingress" if rule["value"][0] == "0xff" else "egress"} |
| 62 | + |
| 63 | + |
| 64 | +def fmt_maps(ebpfs,maps): |
| 65 | + ingress_maps = [] |
| 66 | + egress_maps = [] |
| 67 | + ingress_egress = {k: v for k,v in maps.items() if v["name"]==".rodata"} |
| 68 | + |
| 69 | + for idx,in_eg_map in ingress_egress.items(): |
| 70 | + map_type = in_eg_map["elements"][0]["type"] |
| 71 | + |
| 72 | + out_map = ingress_maps if map_type =="ingress" else egress_maps |
| 73 | + for map_group in ebpfs: |
| 74 | + |
| 75 | + if idx in map_group: |
| 76 | + for map_idx in map_group: |
| 77 | + if map_idx in maps: |
| 78 | + out_map.append(maps[map_idx]) |
| 79 | + |
| 80 | + outputs = {} |
| 81 | + for fmted_map in (ingress_maps,egress_maps): |
| 82 | + res = {m["name"]: {k:v for el in m["elements"] for k,v in el.items() } for m in fmted_map} |
| 83 | + filters = {"direction":res["TRAFFIC_DIRECTI"],"link":res["LINK_FILTERS"],"transport":res["TRANSPORT_FILTE"],"network":res["NETWORK_FILTERS"]} |
| 84 | + firewall = {"ipv4":res["BLOCKLIST_IPV4"],"ipv6":res["BLOCKLIST_IPV6"]} |
| 85 | + outputs[res[".rodata"]["type"]]=dict(filters=filters,firewall=firewall) |
| 86 | + return outputs |
| 87 | + |
| 88 | + |
| 89 | + |
| 90 | + |
| 91 | + |
| 92 | + |
| 93 | + |
| 94 | +to_display_maps = { |
| 95 | + ".rodata": parse_ingress_egress, |
| 96 | + "BLOCKLIST_IPV4": parse_ipv4, |
| 97 | + "BLOCKLIST_IPV6": parse_ipv6, |
| 98 | + "LINK_FILTERS": partial(parse_filter, filter_type="link"), |
| 99 | + "TRANSPORT_FILTE": partial(parse_filter, filter_type="transport"), |
| 100 | + "NETWORK_FILTERS": partial(parse_filter, filter_type="network"), |
| 101 | + "TRAFFIC_DIRECTI": parse_traffic_filter, |
| 102 | + |
| 103 | +} |
| 104 | + |
| 105 | + |
| 106 | +map_info_cmd = lambda _id: ["bpftool", "--json", "map", "dump", "name", str(_id)] |
| 107 | +prog_map_cmd = lambda _id: ["bpftool", "--json", "prog", "show", "name", str(_id)] |
| 108 | + |
| 109 | + |
| 110 | +try: |
| 111 | + ebpfs = [] |
| 112 | + ebpfs = json.loads(subprocess.check_output(prog_map_cmd("oryx")).decode()) |
| 113 | + ebpfs=[prog["map_ids"] for prog in ebpfs] |
| 114 | + |
| 115 | + |
| 116 | + maps = {} |
| 117 | + for map_name, func in to_display_maps.items(): |
| 118 | + active_maps = json.loads( |
| 119 | + subprocess.check_output(map_info_cmd(map_name)).decode() |
| 120 | + ) |
| 121 | + |
| 122 | + for active_map in active_maps: |
| 123 | + elements = active_map.get("elements", []) |
| 124 | + _id = active_map.get("id") |
| 125 | + maps[_id] = dict(name=map_name,elements=[]) |
| 126 | + for el in elements: |
| 127 | + if x := func(el): |
| 128 | + maps[_id]["elements"].append(x) |
| 129 | + out = fmt_maps(ebpfs,maps) |
| 130 | + print(yaml.dump(out,indent=2,allow_unicode=True)) |
| 131 | + |
| 132 | + |
| 133 | +except: |
| 134 | + print("") |
0 commit comments