This repository was archived by the owner on Nov 15, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathget_underlying_pcap.py
179 lines (134 loc) · 4.97 KB
/
get_underlying_pcap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/env python3
import sys
import zlib
from scapy.layers.dns import DNS, DNSQR, DNSRR
from scapy.layers.inet import IP
from scapy.utils import rdpcap, wrpcap
# See https://github.com/yarrick/iodine/blob/master/src/base128.c for details
ALPHABET = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789\274\275\276\277\300\301\302\303\304\305\306\307\310\311\312\313\314\315\316\317\320\321\322\323\324\325\326\327\330\331\332\333\334\335\336\337\340\341\342\343\344\345\346\347\350\351\352\353\354\355\356\357\360\361\362\363\364\365\366\367\370\371\372\373\374\375"
# useful shorthand
code = ALPHABET.index
def b128decode(data):
if type(data) == str:
data = data.encode()
if not isinstance(data, (bytes, bytearray)):
raise ValueError("str or bytes should be passed into function")
index = 0
result = bytearray()
while True:
if index + 1 >= len(data):
break
coefs = [
2 ** (7 - index % 8) - 1, # data[i] &
2 ** 7 - 2 ** (6 - index % 8) # data[i + 1] &
]
shifts = [
index % 8 + 1, # <<
6 - index % 8 # >>
]
result.append(
((code(data[index]) & coefs[0]) << shifts[0])
|
((code(data[index + 1]) & coefs[1]) >> shifts[1])
)
index += 1
if index % 8 == 7:
index += 1
return bytes(result)
def get_base16_code(c):
if c in range(65, 71):
return c - 55
if c in range(97, 103):
return c - 87
if c in range(48, 58):
return c - 48
raise ValueError("Invalid base16 character")
def get_base32_code(c):
if c in range(65, 91): # uppercase
return c - 65
if c in range(97, 123): # lowercase
return c - 97
if c in range(48, 54):
return c - 22
raise ValueError("Invalid base32 character")
def b32decode_int(c):
result = 0
for i in c:
result <<= 5
result |= get_base32_code(i)
return result
class ClientHeader:
def __init__(self, pkt):
self.uid = get_base16_code(pkt[0])
data = b32decode_int(pkt[1:4])
# SSS FFFF DDD GGGG L
self.useq = (data >> 12) & 0b111
self.ufrag = (data >> 8) & 0b1111
self.dseq = (data >> 5) & 0b111
self.dfrag = (data >> 1) & 0b1111
self.last = data & 1
class ServerHeader:
def __init__(self, pkt):
# C SSS FFFF DDD GGGG L
data = pkt[0] << 8 | pkt[1]
self.compress = (data >> 15) & 1
self.useq = (data >> 12) & 0b111
self.ufrag = (data >> 8) & 0b1111
self.dseq = (data >> 5) & 0b1111
self.dfrag = (data >> 1) & 0b1111
self.last = data & 1
def extract_iodine(root, source, destination):
dns_packets = rdpcap(source)
downstream = b''
upstream = b''
is_transfer = False
real_packets = []
for packet in dns_packets:
if not packet.haslayer(DNS):
continue
if DNSRR in packet and len(packet[DNSRR].rdata) > 0:
# downstream
data = packet[DNSRR].rdata
if isinstance(data, str):
# should be some bug in scapy?
data = data.encode()
if not is_transfer:
continue
downstream += data[2:]
headers = ServerHeader(data)
if headers.last and len(downstream) > 0:
try:
raw_data = zlib.decompress(downstream)
real_packets.append(IP(raw_data[4:]))
except zlib.error:
pass
downstream = b''
elif DNSQR in packet:
# client
hostname = packet[DNSQR].qname
if hostname[0] not in b"0123456789abcdefABCDEF":
continue
is_transfer = True
if not hostname.endswith(root):
print("Warning: skipped upstream packet:", hostname, file=sys.stderr)
continue
upstream += hostname[5:-len(root)].replace(b".", b"")
headers = ClientHeader(hostname)
if headers.last and len(upstream) > 0:
try:
raw_data = zlib.decompress(b128decode(upstream))
real_packets.append(IP(raw_data[4:]))
except zlib.error:
pass
upstream = b''
wrpcap(destination, real_packets)
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: ./get_underlying_pcap.py .domain.tld. dump.pcap output.pcap", file=sys.stderr)
sys.exit(1)
root = sys.argv[1].encode()
if not root.startswith(b".") or not root.endswith(b"."):
print("Warning: TLD should start and end with dot.", file=sys.stderr)
source = sys.argv[2]
destination = sys.argv[3]
extract_iodine(root, source, destination)