-
Notifications
You must be signed in to change notification settings - Fork 0
/
dns.py
146 lines (113 loc) · 3.44 KB
/
dns.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import socket, glob, json
port = 53
ip = "127.0.0.1"
def bind_socket():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((ip, port))
return sock
sock = bind_socket()
# add data structures for testing.
def load_zones():
j_zone = {}
zonefiles = glob.glob("zone/*.zone")
for zone in zonefiles:
with open(zone) as zonedata:
data = json.load(zonedata)
zonename = data["$origin"]
j_zone[zonename] = data
return j_zone
zonedata = load_zones()
def getflags(flags):
b1, b2 = bytes(flags[:1]), bytes(flags[1:2])
rflags = ""
qr = "1"
opcode = ""
for bit in range(1, 5):
opcode += str(ord(b1) & (1 << bit))
AA = "1" # athoritative answer
TC = "0" # truncation
RD = "0" # recursion
RA = "0" # recursion avalible
Z = "0000" #
RCODE = "0000" #
return int(qr + opcode + AA + TC + RD, 2).to_bytes(1, byteorder="big") + int(
RA + Z + RCODE, 2
).to_bytes(1, byteorder="big")
def get_query(data):
state = 0
expectedlength = 0
domain = ""
domainparts = []
x = 0
y = 0
for byte in data:
if state == 1:
if byte != 0:
domain += chr(byte)
x += 1
if x == expectedlength:
domainparts.append(domain)
domain = ""
state = 0
x = 0
if byte == 0:
domainparts.append(domain)
break
else:
state = 1
expectedlength = byte
y += 1
question_type = data[y : y + 2]
return (domainparts, question_type)
def getzone(domain):
global zonedata
zone_name = ".".join(domain)
return zonedata[zone_name]
def get_rec(data):
domain, questiontype = get_query(data)
qt = ""
if questiontype == b"\x00\x01":
qt = "a"
zone = getzone(domain)
return (zone[qt], qt, domain)
def build_question(domain, rectype):
qbytes = b""
for part in domain:
length = len(part)
qbytes += bytes([length])
for char in part:
qbytes += ord(char).to_bytes(1, byteorder="big")
if rectype == "a":
qbytes += (1).to_bytes(2, byteorder="big")
qbytes += (1).to_bytes(2, byteorder="big")
return qbytes
def rectobytes(domain, rectype, recttl, recval):
rbytes = b"\xc0\xc0"
if rectype == "a":
rbytes = rbytes + bytes([0]) + bytes([1])
rbytes = rbytes + bytes([0]) + bytes([1])
rbytes += int(recttl).to_bytes(4, byteorder="big")
if rectype == "a":
rbytes = bytes([0]) + bytes([4])
for part in recval.split("."):
rbytes += bytes([int(part)])
return rbytes
def buildresponse(data):
transactionID = data[:2]
flags = getflags(data[2:4])
qd_count = b"\x00\x01"
ancount = len(get_rec(data[12:])[0]).to_bytes(2, byteorder="big")
nscount = (0).to_bytes(2, byteorder="big")
arcount = (0).to_bytes(2, byteorder="big")
dns_header = transactionID + flags + qd_count + ancount + nscount + arcount
# create dns body
dnsbody = b""
records, rectype, domain = get_rec(data[12:])
dnsquestion = build_question(domain, rectype)
for record in records:
dnsbody += rectobytes(domain, rectype, record["ttl"], record["value"])
return dns_header + dnsquestion + dnsbody
while True:
data, addr = sock.recvfrom(512)
r = buildresponse(data)
sock.sendto(r, addr)