-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdo-dyn-dns.py
executable file
·236 lines (183 loc) · 7.17 KB
/
do-dyn-dns.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/usr/bin/python3
"""
Use a DigitalOcean (sub)domain for a dynamic DNS service.
Updates a Digital Ocean domain A record with the current external IP address.
Optionally verifies the HTTPS connection. Optionally creates the record if it
doesn't exist.
Brad Conte ([email protected])
"""
from http.client import HTTPSConnection
from http.client import HTTPConnection
import ssl
import json
import sys
# =========================================================================
# Globals
# =========================================================================
# The domain registered with Digital Ocean. Eg: name.com
DOMAIN_NAME = ""
# The record name, Eg: "dynamicip" for "dynamicip.name.com"
RECORD_NAME = ""
# Get these from: https://www.digitalocean.com/api_access
CLIENT_ID = ""
API_KEY = ""
# Verify the HTTP cert for the API?
CHECK_CERT = True
# Path to SSL CA certificates. Eg: /etc/ssl/certs/
CERT_PATH = ""
# =========================================================================
# Classes and Methods
# =========================================================================
class NoRecordException(Exception):
pass
class APIException(Exception):
"""Thrown when an Digital Ocean API request returns an error.
Attributes:
api_msg - the error message returned by the API
api_call - the API call that generated the error
"""
def __init__(self, api_msg, api_call):
self.api_msg = api_msg
self.api_call = api_call
def __str__ (self):
return 'API failed. Error msg = "{}", attempted API = "{}".'.format(
self.api_msg, self.api_call)
class DigitalOceanAPI(object):
"""A general Digital Ocean API wrapper. A single SSL connection is made
when the object is instantiated and kept until it's deleted. API requests
are returned in parsed JSON. API failures throw an exception. SSL
parameters are pass-through to the SSL module. Requires the D.O. Client ID
and API Key for the account.
Keyword Arguments:
check_cert - whether the HTTPS connection's cert should be verified
pemfile - path to PEM file, such as "/etc/ssl/certs/ca-certificates.pem"
capath - path to list of CA certs, such as "/etc/ssl/certs/"
Requires:
ssl
json
http.client
"""
api_host = "api.digitalocean.com"
# "check_cert" is True by default because we have API keys we're sending.
# By default, this means at least one of the keyword arguments must always
# be supplied since checking the cert requires a cert path, which has no
# working default.
def __init__(self, client_id, api_key, check_cert = True, pemfile = None,
capath = None):
self.connection = None
self.auth_data = ""
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
if check_cert:
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
ssl_ctx.load_verify_locations(pemfile, capath)
else:
ssl_cts.verify_mode = ssl.CERT_NONE
self.connection = HTTPSConnection(
DigitalOceanAPI.api_host, context=ssl_ctx)
self.auth_data = "&client_id={}&api_key={}".format(client_id, api_key)
def __enter__(self):
return self
def request(self, url):
"""Perform an API request and returns the parsed JSON data. Throws
an exception with API description if the response is an error. See
api.digitalocean.com for API usage.
Arguments:
url - API call, minus auth, ex: "/domains/new?data=127.0.0.1&name=home"
"""
# If the URL doesn't contain arguments, add the "?" so we an append
# the self-auth arguments.
final_url = url
if not "?" in final_url:
final_url += "?"
final_url += self.auth_data
self.connection.request("GET", final_url)
response = self.connection.getresponse()
response_data = json.loads(response.read().decode("utf-8"))
response.close()
if response_data["status"] != "OK":
raise APIException(response_data["message"], url)
return response_data
def close(self):
if not self.connection is None:
self.connection.close()
self.connection = None
self.auth_data = ""
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def __del__(self):
self.close()
def get_external_ip():
connection = HTTPConnection("ifconfig.me")
connection.request("GET", "/ip")
response = connection.getresponse()
ip = response.read().decode("utf-8").strip()
response.close()
connection.close()
return ip
def get_domain_info(api):
url = "/domains/"
response = api.request(url)
for domain in response["domains"]:
if domain["name"] == DOMAIN_NAME:
return domain
raise Exception("Didn't find domain '{}'".format(DOMAIN_NAME))
def get_record_info(domain_id, api):
url = "/domains/{}/records".format(domain_id)
response = api.request(url)
for record in response["records"]:
if record["name"] == RECORD_NAME:
if record["record_type"] == "A":
return record
else:
raise Exception("Record type is '{}' but should be 'A'".format(
record["record_type"]))
raise NoRecordException
def update_record_ip(domain_id, record_id, ip, api):
url = "/domains/{}/records/{}/edit".format(domain_id, record_id)
data = "?data={}".format(ip)
api.request(url + data)
def create_record(domain_id, ip, api):
url = "/domains/{}/records/new".format(domain_id)
data = "?record_type=A&name={}&data={}".format(RECORD_NAME, ip)
api.request(url + data)
def main(should_create_record):
ip = get_external_ip()
with DigitalOceanAPI(CLIENT_ID, API_KEY, CHECK_CERT, capath=CERT_PATH) \
as api:
domain = get_domain_info(api)
try:
record = get_record_info(domain["id"], api)
old_ip = record["data"]
if ip != old_ip:
update_record_ip(domain["id"], record["id"], ip, api)
print("Updated IP: old = {}, new = {}".format(old_ip, ip))
else:
print("No change: IP = {}".format(old_ip))
except NoRecordException:
if should_create_record:
create_record(domain["id"], ip, api)
print("Created record '{}': IP = {}".format(RECORD_NAME, ip))
else:
raise Exception("Didn't find record '{}'".format(RECORD_NAME))
# All errors throw an exception.
return True
def help():
print(
"""Usage: digitalocean-dyn-dns [-h] [-c]
-h : Show this help message
-c : Create the DNS A record if it doesn't already exist
""")
if __name__ == "__main__":
if "-h" in sys.argv:
help()
exit(0)
should_create_record = True if "-c" in sys.argv else False
success = False
try:
success = main(should_create_record)
except Exception as ex:
sys.stderr.write("Error: {}\n".format(ex))
success = False
except:
sys.stderr.write("Unhandled generic error\n")
sys.exit(not success)