Skip to content

Commit f721be9

Browse files
committedJan 26, 2017
Many small fixes
1 parent e8ba302 commit f721be9

File tree

14 files changed

+269
-200
lines changed

14 files changed

+269
-200
lines changed
 

‎conf/example.yml ‎conf/mongodb-consistent-backup.example.yml

+8-8
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ production:
1010
location: /opt/mongodb/backup
1111
mongodump:
1212
binary: /usr/bin/mongodump
13-
# compression: gzip
13+
# compression: gzip (default: true - if supported)
1414
replication:
1515
max_lag_secs: 5
1616
min_priority: 0
@@ -31,16 +31,16 @@ production:
3131
notify:
3232
method: none
3333
#nsca:
34-
# server:
35-
# password:
36-
# check_host:
34+
# server: localhost:5666
35+
# password: secret
36+
# check_host: localhost
3737
# check_name: mongodb_consistent_backup
3838
upload:
3939
method: none
40+
remove_uploaded: false
4041
#s3:
41-
# access_key:
42-
# secret_key:
43-
# bucket_name:
42+
# access_key: <AWS S3 Access Key>
43+
# secret_key: <AWS S3 Secret Key>
44+
# bucket_name: <AWS S3 Bucket Name>
4445
# bucket_prefix: /
45-
# remove_uploaded: true
4646
# threads: 4

‎mongodb_consistent_backup/Backup/Mongodump/Mongodump.py

+17-12
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ def __init__(self, config, base_dir, secondaries, config_server=None):
2727
self.response_queue = Queue()
2828
self.threads = []
2929
self._summary = {}
30-
self.mongodump_version = None
3130

31+
with hide('running', 'warnings'), settings(warn_only=True):
32+
self.version = local("%s --version|awk 'NR >1 {exit}; /version/{print $NF}'" % self.binary, capture=True)
3233
self.do_gzip = self.can_gzip()
3334

3435
if not self.do_gzip and self.config.backup.mongodump.compression == 'gzip':
@@ -40,18 +41,11 @@ def __init__(self, config, base_dir, secondaries, config_server=None):
4041
if not isinstance(self.secondaries, dict):
4142
raise Exception, "Field 'secondaries' must be a dictionary of secondary info (by shard)!", None
4243

43-
with hide('running', 'warnings'), settings(warn_only=True):
44-
self.version = local("%s --version|awk 'NR >1 {exit}; /version/{print $NF}'" % self.binary, capture=True)
45-
4644
def can_gzip(self):
4745
if os.path.isfile(self.binary) and os.access(self.binary, os.X_OK):
48-
with hide('running', 'warnings'), settings(warn_only=True):
49-
self.mongodump_version = tuple(
50-
local("%s --version|awk 'NR >1 {exit}; /version/{print $NF}'" % self.binary,
51-
capture=True).split("."))
52-
if tuple("3.2.0".split(".")) < self.mongodump_version:
53-
return True
54-
return False
46+
if tuple("3.2.0".split(".")) <= tuple(self.version.split(".")):
47+
return True
48+
return False
5549
else:
5650
logging.fatal("Cannot find or execute the mongodump binary file %s!" % self.binary)
5751
sys.exit(1)
@@ -88,6 +82,17 @@ def wait(self):
8882
else:
8983
raise Exception, "Not all mongodump threads completed successfully!", None
9084

85+
def threads_per_dump(self, threads=None):
86+
if threads:
87+
self.threads_per_dump = int(threads)
88+
elif not self.threads_per_dump:
89+
if tuple(self.version.split(".")) >= tuple("3.2.0".split(".")):
90+
self.threads_per_dump = 1
91+
if self.cpu_count > len(self.secondaries):
92+
self.threads_per_dump = int(floor(self.cpu_count / len(self.secondaries)))
93+
if self.threads_per_dump > self.threads_per_dump_max:
94+
self.threads_per_dump = self.threads_per_dump_max
95+
9196
def run(self):
9297
# decide how many parallel dump workers to use based on cpu count vs # of shards (if 3.2+), 8 max workers max to protect the db
9398
self.threads_per_dump = 0
@@ -124,7 +129,7 @@ def run(self):
124129

125130
# start all threads and wait
126131
logging.info(
127-
"Starting backups using mongodump %s (inline gzip: %s, threads per dump: %i)" % (self.version, str(self.do_gzip), self.threads_per_dump))
132+
"Starting backups using mongodump %s (options: gzip=%s,threads_per_dump=%i)" % (self.version, str(self.do_gzip), self.threads_per_dump))
128133
for thread in self.threads:
129134
thread.start()
130135
self.wait()

‎mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py

+1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def run(self):
7979
return None
8080

8181
oplog = Oplog(self.oplog_file, self.dump_gzip)
82+
oplog.read()
8283
self.completed = True
8384
self.response_queue.put({
8485
'host': self.host,

‎mongodb_consistent_backup/Common/Config.py

+3-8
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,12 @@
66
from yconf import BaseConfiguration
77

88

9-
__version__ = '#.#.#'
10-
git_commit = 'GIT_COMMIT_HASH'
11-
prog_name = 'mongodb-consistent-backup'
12-
13-
149
class PrintVersions(Action):
1510
def __init__(self, option_strings, dest, nargs=0, **kwargs):
1611
super(PrintVersions, self).__init__(option_strings=option_strings, dest=dest, nargs=nargs, **kwargs)
1712

1813
def __call__(self, parser, namespace, values, option_string=None):
19-
print "%s version: %s, git commit hash: %s" % (prog_name, __version__, git_commit)
14+
print "%s version: %s, git commit hash: %s" % (mongodb_consistent_backup.prog_name, mongodb_consistent_backup.__version__, mongodb_consistent_backup.git_commit)
2015

2116
import platform
2217
print "Python version: %s" % platform.python_version()
@@ -59,8 +54,8 @@ def __init__(self):
5954
self.parse_submodules()
6055
self.parse()
6156

62-
self.version = __version__
63-
self.git_commit = git_commit
57+
self.version = mongodb_consistent_backup.__version__
58+
self.git_commit = mongodb_consistent_backup.git_commit
6459

6560
def _get(self, keys, data=None):
6661
if not data:

‎mongodb_consistent_backup/Main.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,11 @@ def cleanup_and_exit(self, code, frame):
107107
if submodule:
108108
submodule.close()
109109

110-
self.notify.notify("%s: backup '%s' failed!" % (
111-
self.config,
112-
self.program_name
113-
), False)
110+
if self.notify:
111+
self.notify.notify("%s: backup '%s' failed!" % (
112+
self.config,
113+
self.program_name
114+
), False)
114115

115116
if self.db:
116117
self.db.close()

‎mongodb_consistent_backup/Oplog/Oplog.py

+48-22
Original file line numberDiff line numberDiff line change
@@ -2,43 +2,69 @@
22
import logging
33

44
from gzip import GzipFile
5-
# Skip bson in requirements , pymongo provides
6-
# noinspection PyPackageRequirements
7-
from bson import decode_file_iter
5+
from bson import BSON, decode_file_iter
86

97

108
class Oplog:
11-
def __init__(self, oplog_file, dump_gzip=False):
9+
def __init__(self, oplog_file, do_gzip=False):
1210
self.oplog_file = oplog_file
13-
self.dump_gzip = dump_gzip
11+
self.do_gzip = do_gzip
1412

1513
self._count = 0
1614
self._first_ts = None
1715
self._last_ts = None
16+
self._oplog = None
1817

19-
self.read()
18+
self.open()
2019

21-
def read(self):
22-
if os.path.isfile(self.oplog_file):
20+
def open(self):
21+
if not self._oplog and os.path.isfile(self.oplog_file):
2322
try:
24-
logging.debug("Reading oplog file %s" % self.oplog_file)
25-
26-
if self.dump_gzip:
27-
oplog = GzipFile(self.oplog_file)
23+
logging.debug("Opening oplog file %s" % self.oplog_file)
24+
if self.do_gzip:
25+
self._oplog = GzipFile(self.oplog_file)
2826
else:
29-
oplog = open(self.oplog_file)
30-
31-
for change in decode_file_iter(oplog):
32-
if 'ts' in change:
33-
self._last_ts = change['ts']
34-
if self._first_ts is None and self._last_ts is not None:
35-
self._first_ts = self._last_ts
36-
self._count += 1
37-
oplog.close()
27+
self._oplog = open(self.oplog_file)
28+
except Exception, e:
29+
logging.fatal("Error opening oplog file %s! Error: %s" % (self.oplog_file, e))
30+
raise e
31+
return self._oplog
32+
33+
def read(self):
34+
try:
35+
oplog = self.open()
36+
logging.debug("Reading oplog file %s" % self.oplog_file)
37+
for change in decode_file_iter(oplog):
38+
if 'ts' in change:
39+
self._last_ts = change['ts']
40+
if self._first_ts is None and self._last_ts is not None:
41+
self._first_ts = self._last_ts
42+
self._count += 1
43+
oplog.close()
44+
except Exception, e:
45+
logging.fatal("Error reading oplog file %s! Error: %s" % (self.oplog_file, e))
46+
raise e
47+
48+
def write(self, doc):
49+
if self._oplog:
50+
try:
51+
self._oplog.write(BSON.encode(doc))
52+
self._count += 1
53+
if not self._first_ts:
54+
self._first_ts = doc['ts']
55+
self._last_ts = doc['ts']
3856
except Exception, e:
39-
logging.fatal("Error reading oplog file %s! Error: %s" % (self.oplog_file, e))
57+
logging.fatal("Cannot write to oplog file %s! Error: %s" % (self.oplog_file, e))
4058
raise e
4159

60+
def flush(self):
61+
if self._oplog:
62+
return self._oplog.flush()
63+
64+
def close(self):
65+
if self._oplog:
66+
return self._oplog.close()
67+
4268
def count(self):
4369
return self._count
4470

‎mongodb_consistent_backup/Oplog/Resolver/Resolver.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def get_consistent_end_ts(self):
6767
return ts
6868

6969
def run(self):
70-
logging.info("Resolving oplogs (options: threads=%s,compression=%s)" % (self.threads(), self.compression()))
70+
logging.info("Resolving oplogs (options: threads=%s,compression=%s)" % (self.threads(), self.compression()))
7171
self.timer.start()
7272

7373
self.end_ts = self.get_consistent_end_ts()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import os
2+
import logging
3+
4+
# Skip bson in requirements , pymongo provides
5+
# noinspection PyPackageRequirements
6+
from multiprocessing import Process, Event
7+
from pymongo import CursorType
8+
from pymongo.errors import AutoReconnect
9+
from signal import signal, SIGINT, SIGTERM
10+
from time import sleep, time
11+
12+
from mongodb_consistent_backup.Common import DB, LocalCommand
13+
from mongodb_consistent_backup.Oplog import Oplog
14+
15+
16+
class TailThread(Process):
17+
def __init__(self, state, backup_name, base_dir, host, port, dump_gzip=False, user=None,
18+
password=None, authdb='admin', update_secs=10):
19+
Process.__init__(self)
20+
self.state = state
21+
self.backup_name = backup_name
22+
self.base_dir = base_dir
23+
self.host = host
24+
self.port = int(port)
25+
self.dump_gzip = dump_gzip
26+
self.user = user
27+
self.password = password
28+
self.authdb = authdb
29+
self.update_secs = update_secs
30+
31+
self.last_update = time()
32+
self.last_update_str = ""
33+
self._connection = None
34+
self._oplog = None
35+
self._stop = Event()
36+
self.stop_ts = None
37+
38+
self.out_dir = "%s/%s" % (self.base_dir, self.backup_name)
39+
self.oplog_file = "%s/oplog-tailed.bson" % self.out_dir
40+
if not os.path.isdir(self.out_dir):
41+
try:
42+
LocalCommand("mkdir", ["-p", self.out_dir]).run()
43+
except Exception, e:
44+
logging.error("Cannot make directory %s! Error: %s" % (self.out_dir, e))
45+
raise e
46+
47+
# init thread state
48+
self.state['host'] = self.host
49+
self.state['port'] = self.port
50+
self.state['file'] = self.oplog_file
51+
self.state['count'] = 0
52+
self.state['first_ts'] = None
53+
self.state['last_ts'] = None
54+
self.state['stop_ts'] = self.stop_ts
55+
56+
signal(SIGINT, self.close)
57+
signal(SIGTERM, self.close)
58+
59+
# the DB connection has to be made outside of __init__ due to threading:
60+
def connection(self):
61+
try:
62+
self._connection = DB(self.host, self.port, self.user, self.password, self.authdb).connection()
63+
except Exception, e:
64+
logging.fatal("Cannot get connection - %s" % e)
65+
raise e
66+
return self._connection
67+
68+
def oplog(self):
69+
if not self._oplog:
70+
try:
71+
self._oplog = Oplog(self.oplog_file, self.dump_gzip)
72+
except Exception, e:
73+
logging.fatal("Could not open oplog tailing file %s! Error: %s" % (self.oplog_file, e))
74+
raise e
75+
return self._oplog
76+
77+
def stop(self, timestamp=None):
78+
if timestamp:
79+
try:
80+
self.state['stop_ts'] = timestamp
81+
logging.info("Set oplog tail thread stop position to timestamp: %s" % timestamp)
82+
except Exception, e:
83+
logging.fatal("Cannot create bson.timestamp.Timestamp object! Error: %s" % e)
84+
raise e
85+
else:
86+
self._stop.set()
87+
88+
def close(self, exit_code=None, frame=None):
89+
del exit_code
90+
del frame
91+
self.stop()
92+
if self._oplog:
93+
self._oplog.flush()
94+
self._oplog.close()
95+
96+
def do_stop(self):
97+
if self.state['stop_ts'] and self.state['last_ts'] and self.state['last_ts'] >= self.state['stop_ts']:
98+
return True
99+
elif self._stop.is_set():
100+
logging.warn("Oplog tail thread killed at timestamp: %s" % self.state['last_ts'])
101+
return True
102+
return False
103+
104+
def status(self):
105+
update_str = "Oplog tailing of %s:%i current position: %s" % (self.host, self.port, self.state['last_ts'])
106+
if update_str != self.last_update_str:
107+
logging.info(update_str)
108+
self.last_update = time()
109+
self.last_update_str = update_str
110+
111+
def do_status(self):
112+
return (time() - self.last_update) >= self.update_secs
113+
114+
def run(self):
115+
conn = self.connection()
116+
db = conn['local']
117+
118+
logging.info("Tailing oplog on %s:%i for changes (options: gzip=%s)" % (self.host, self.port, self.dump_gzip))
119+
120+
oplog = self.oplog()
121+
tail_start_ts = db.oplog.rs.find().sort('$natural', -1)[0]['ts']
122+
while not self.do_stop():
123+
query = {'ts': {'$gt': tail_start_ts}}
124+
oplog = self.oplog()
125+
cursor = db.oplog.rs.find(query, cursor_type=CursorType.TAILABLE_AWAIT)
126+
try:
127+
while not self.do_stop():
128+
try:
129+
# get the next oplog doc and write it
130+
doc = cursor.next()
131+
if doc:
132+
oplog.write(doc)
133+
self.state['count'] = oplog.count()
134+
self.state['first_ts'] = oplog.first_ts()
135+
self.state['last_ts'] = oplog.last_ts()
136+
self.state['stop_ts'] = self.stop_ts
137+
if self.do_status():
138+
self.status()
139+
except (AutoReconnect, StopIteration):
140+
if self.do_stop():
141+
break
142+
sleep(1)
143+
finally:
144+
logging.debug("Stopping oplog cursor on %s:%i" % (self.host, self.port))
145+
cursor.close()
146+
oplog.flush()
147+
oplog.close()
148+
149+
logging.info("Done tailing oplog on %s:%i, %i changes captured to: %s" % (self.host, self.port, self.state['count'], self.state['last_ts']))

0 commit comments

Comments
 (0)
Please sign in to comment.