Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mcb 1.0 cleanup v7 #94

Merged
merged 41 commits into from
Apr 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e8ba302
Improved logging + balancer stop timer
timvaillancourt Jan 26, 2017
f721be9
Many small fixes
timvaillancourt Jan 26, 2017
8d5d8ea
Committing changes
timvaillancourt Feb 3, 2017
d0371a2
move LocalCommand mkdir command to use os package
timvaillancourt Feb 10, 2017
e28d8e9
fix port num
timvaillancourt Feb 10, 2017
95e2239
pass Replset classes, not host strings
timvaillancourt Feb 10, 2017
98912cc
More code cleanup
timvaillancourt Mar 13, 2017
56c4d04
Safe oplog tailing stop, nearly completed
timvaillancourt Mar 16, 2017
0362ac8
some cleanup of tweaks for safe oplog tailer stopping
timvaillancourt Mar 16, 2017
dc65dcb
Added flag to bypass safe-oplog-stop if we're closing on error
timvaillancourt Mar 16, 2017
9fe5678
More cleanup
timvaillancourt Mar 16, 2017
515afec
More cleanup of tailer stop logic
timvaillancourt Mar 16, 2017
9b1579c
Changes to support using OplogState+multiprocessing.Manager for Mongo…
timvaillancourt Mar 21, 2017
ddf89fd
Method to compute lag and include heartbeat lag, also fixed PRIMARY b…
timvaillancourt Mar 21, 2017
49f6b72
Whitespace fix and logline update
timvaillancourt Mar 21, 2017
9de79e1
Make sure Oplog/Tailer.py forces update of primary ts
timvaillancourt Mar 21, 2017
cb8b3e3
Added generic class for parsing/handling mongodb uris
timvaillancourt Mar 22, 2017
d69f997
Fixed resolver issue when no mongodump changes + there are tailer cha…
timvaillancourt Mar 22, 2017
0cf9602
More raise fixes and support configsvr as seed host
timvaillancourt Mar 23, 2017
cbc250e
More error handlind, moved Oplog/Resolver/ResolverThread.py to use Op…
timvaillancourt Mar 26, 2017
716b3d4
Print config to logs and check tailers are running before backup starts
timvaillancourt Mar 26, 2017
30feadb
shorter log messages
timvaillancourt Mar 26, 2017
f666999
shorter log messages #2
timvaillancourt Mar 26, 2017
89affc0
Realtime thread output for mongodump
timvaillancourt Mar 27, 2017
b733857
Connection fixes to use MongoUri, fixed locking order
timvaillancourt Mar 27, 2017
ee6fa39
Use w='majority'
timvaillancourt Mar 27, 2017
12f9475
fixed Common/DB.py
timvaillancourt Mar 29, 2017
badfd88
State file fixes
timvaillancourt Mar 29, 2017
df34ccf
Write cmdline to state
timvaillancourt Mar 29, 2017
99932bd
Moved to central timer class
timvaillancourt Apr 3, 2017
9b8c58c
\t -> 8 spaces
timvaillancourt Apr 3, 2017
55d9848
Exit with plain error if notify fails, not exception
timvaillancourt Apr 3, 2017
866ad72
Make hosts that are defauly priority (1) and 0 lag score=100 instead …
timvaillancourt Apr 3, 2017
ccb843f
Created pipeline module for simplifying workflow of the tool
timvaillancourt Apr 4, 2017
1bd589e
Cleanup \t chars and consistent-end-ts method of Resolver to be safer
timvaillancourt Apr 4, 2017
740fd8d
Fixes for problems when sending Ctrl+C
timvaillancourt Apr 5, 2017
6b4e895
adding subclasses for "tasks" and "stages" for uniform code execution
timvaillancourt Apr 5, 2017
d06d2e4
typo
timvaillancourt Apr 5, 2017
09ef5fd
cleaner handling of oplogs in resolver thread
timvaillancourt Apr 5, 2017
73170cc
\t to 8 space
timvaillancourt Apr 5, 2017
a63998d
cleanup method for resolver thread
timvaillancourt Apr 5, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#

NAME=mongodb_consistent_backup
VERSION=$(shell cat VERSION)
VERSION=$(shell cat VERSION | cut -d- -f1)
PREFIX?=/usr/local
BASEDIR?=$(DESTDIR)$(PREFIX)
BINDIR?=$(BASEDIR)/bin
Expand Down
20 changes: 10 additions & 10 deletions conf/example.yml → conf/mongodb-consistent-backup.example.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
production:
host: localhost
port: 27017
#user:
#username:
#password:
#authdb: admin
backup:
Expand All @@ -10,12 +10,12 @@ production:
location: /opt/mongodb/backup
mongodump:
binary: /usr/bin/mongodump
# compression: gzip
# compression: gzip (default: true - if supported)
replication:
max_lag_secs: 5
min_priority: 0
max_priority: 1000
use_hidden: false
hidden_only: false
sharding:
balancer:
wait_secs: 300
Expand All @@ -31,16 +31,16 @@ production:
notify:
method: none
#nsca:
# server:
# password:
# check_host:
# server: localhost:5667
# password: secret
# check_host: localhost
# check_name: mongodb_consistent_backup
upload:
method: none
remove_uploaded: false
#s3:
# access_key:
# secret_key:
# bucket_name:
# access_key: <AWS S3 Access Key>
# secret_key: <AWS S3 Secret Key>
# bucket_name: <AWS S3 Bucket Name>
# bucket_prefix: /
# remove_uploaded: true
# threads: 4
58 changes: 6 additions & 52 deletions mongodb_consistent_backup/Archive/Archive.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,9 @@
import logging
from mongodb_consistent_backup.Archive.Tar import Tar
from mongodb_consistent_backup.Pipeline import Stage

from Tar import Tar
from mongodb_consistent_backup.Common import Timer, config_to_string, parse_method


class Archive:
def __init__(self, config, backup_dir):
self.config = config
self.backup_dir = backup_dir

self.method = None
self._archiver = None
self.timer = Timer()
class Archive(Stage):
def __init__(self, manager, config, timer, base_dir, backup_dir):
super(Archive, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)
self.task = self.config.archive.method
self.init()

def init(self):
archive_method = self.config.archive.method
if not archive_method or parse_method(archive_method) == "none":
logging.info("Archiving disabled, skipping")
else:
self.method = parse_method(archive_method)
logging.info("Using archiving method: %s" % self.method)
try:
self._archiver = globals()[self.method.capitalize()](
self.config,
self.backup_dir
)
except LookupError, e:
raise Exception, 'No archiving method: %s' % self.method, None
except Exception, e:
raise e

def compression(self, method=None):
if self._archiver:
return self._archiver.compression(method)

def threads(self, threads=None):
if self._archiver:
return self._archiver.threads(threads)

def archive(self):
if self._archiver:
config_string = config_to_string(self.config.archive[self.method])
logging.info("Archiving with method: %s (options: %s)" % (self.method, config_string))
self.timer.start()

self._archiver.run()

self.timer.stop()
logging.info("Archiving completed in %s seconds" % self.timer.duration())

def close(self):
if self._archiver:
return self._archiver.close()
62 changes: 25 additions & 37 deletions mongodb_consistent_backup/Archive/Tar/Tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import logging

from copy_reg import pickle
from multiprocessing import Pool, cpu_count
from multiprocessing import Pool
from types import MethodType

from TarThread import TarThread
from mongodb_consistent_backup.Common import parse_method
from mongodb_consistent_backup.Errors import Error
from mongodb_consistent_backup.Pipeline import Task


# Allows pooled .apply_async()s to work on Class-methods:
Expand All @@ -19,32 +21,13 @@ def _reduce_method(m):
pickle(MethodType, _reduce_method)


class Tar:
def __init__(self, config, backup_base_dir):
self.config = config
self.backup_base_dir = backup_base_dir
self.verbose = self.config.verbose
self.binary = "tar"
self._pool = None
class Tar(Task):
def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
super(Tar, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
self.compression_method = self.config.archive.tar.compression
self.binary = "tar"

def compression(self, method=None):
if method:
self.config.archive.tar.compression = parse_method(method)
logging.info("Setting tar compression method to: %s" % self.config.archive.tar.compression)
return parse_method(self.config.archive.tar.compression)

def do_gzip(self):
if self.compression() == 'gzip':
return True
return False

def threads(self, thread_count=None):
if thread_count:
self.config.archive.tar.threads = int(thread_count)
logging.info("Setting tar thread count to: %i" % self.config.archive.tar.threads)
if self.config.archive.tar.threads is None or self.config.archive.tar.threads < 1:
self.config.archive.tar.threads = cpu_count()
return int(self.config.archive.tar.threads)
self._pool = None

def run(self):
try:
Expand All @@ -53,28 +36,33 @@ def run(self):
logging.info("Archiving backup directories with pool of %i thread(s)" % thread_count)
except Exception, e:
logging.fatal("Could not start pool! Error: %s" % e)
raise e
raise Error(e)

if os.path.isdir(self.backup_base_dir):
if os.path.isdir(self.backup_dir):
try:
for backup_dir in os.listdir(self.backup_base_dir):
subdir_name = "%s/%s" % (self.backup_base_dir, backup_dir)
self.running = True
for backup_dir in os.listdir(self.backup_dir):
subdir_name = os.path.join(self.backup_dir, backup_dir)
if not os.path.isdir(os.path.join(subdir_name, "dump")):
continue
output_file = "%s.tar" % subdir_name

if self.do_gzip():
output_file = "%s.tgz" % subdir_name

self._pool.apply_async(TarThread(subdir_name, output_file, self.do_gzip(), self.verbose, self.binary).run)
except Exception, e:
self._pool.terminate()
logging.fatal("Could not create tar archiving thread! Error: %s" % e)
raise e
self._pool.close()
self._pool.join()
raise Error(e)
finally:
self._pool.close()
self._pool.join()
self.stopped = True
self.completed = True

def close(self):
logging.debug("Stopping tar archiving threads")
if self._pool is not None:
if not self.stopped and self._pool is not None:
self._pool.terminate()
self._pool.join()
logging.info("Stopped all tar archiving threads")
logging.info("Stopped all tar archiving threads")
self.stopped = True
37 changes: 17 additions & 20 deletions mongodb_consistent_backup/Archive/Tar/TarThread.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import logging
import sys

from signal import signal, SIGINT, SIGTERM

Expand Down Expand Up @@ -30,26 +31,22 @@ def run(self):
if os.path.isdir(self.backup_dir):
if not os.path.isfile(self.output_file):
try:
backup_base_dir = os.path.dirname(self.backup_dir)
backup_base_name = os.path.basename(self.backup_dir)

log_msg = "Archiving and compressing directory: %s" % self.backup_dir
cmd_flags = ["-C", backup_base_dir, "-cf", self.output_file, "--remove-files", backup_base_name]

if self.do_gzip:
log_msg = "Archiving directory: %s" % self.backup_dir
cmd_flags = ["-C", backup_base_dir, "-czf", self.output_file, "--remove-files", backup_base_name]

try:
logging.info(log_msg)
self._command = LocalCommand(self.binary, cmd_flags, self.verbose)
self._command.run()
except Exception, e:
raise e
backup_base_dir = os.path.dirname(self.backup_dir)
backup_base_name = os.path.basename(self.backup_dir)

log_msg = "Archiving and compressing directory: %s" % self.backup_dir
cmd_flags = ["-C", backup_base_dir, "-cf", self.output_file, "--remove-files", backup_base_name]

if self.do_gzip:
log_msg = "Archiving directory: %s" % self.backup_dir
cmd_flags = ["-C", backup_base_dir, "-czf", self.output_file, "--remove-files", backup_base_name]

logging.info(log_msg)
self._command = LocalCommand(self.binary, cmd_flags, self.verbose)
self._command.run()
except Exception, e:
logging.fatal("Failed archiving file: %s! Error: %s" % (self.output_file, e))
raise e
elif os.path.isfile(self.output_file):
sys.exit(1)
else:
logging.fatal("Output file: %s already exists!" % self.output_file)
raise Exception, "Output file %s already exists!" % self.output_file, None

sys.exit(1)
58 changes: 6 additions & 52 deletions mongodb_consistent_backup/Backup/Backup.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,9 @@
import logging
from mongodb_consistent_backup.Backup.Mongodump import Mongodump
from mongodb_consistent_backup.Pipeline import Stage

from Mongodump import Mongodump
from mongodb_consistent_backup.Common import Timer, config_to_string, parse_method


class Backup:
def __init__(self, config, backup_dir, secondaries, config_server=None):
self.config = config
self.backup_dir = backup_dir
self.secondaries = secondaries
self.config_server = config_server

self.method = None
self._method = None
self.timer = Timer()
class Backup(Stage):
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, sharding=None):
super(Backup, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, replsets=replsets, sharding=sharding)
self.task = self.config.backup.method
self.init()

def init(self):
backup_method = self.config.backup.method
if not backup_method or parse_method(backup_method) == "none":
raise Exception, 'Must specify a backup method!', None
self.method = parse_method(backup_method)
try:
self._method = globals()[self.method.capitalize()](
self.config,
self.backup_dir,
self.secondaries,
self.config_server
)
except LookupError, e:
raise Exception, 'No backup method: %s' % self.method, None
except Exception, e:
raise Exception, "Problem performing %s! Error: %s" % (self.method, e), None

def is_compressed(self):
if self._method:
return self._method.is_compressed()

def backup(self):
if self._method:
config_string = config_to_string(self.config.backup[self.method])
logging.info("Using backup method: %s (options: %s)" % (self.method, config_string))
self.timer.start()

info = self._method.run()

self.timer.stop()
logging.info("Backup completed in %s seconds" % self.timer.duration())

return info

def close(self):
if self._method:
return self._method.close()
Loading