forked from wal-e/wal-e
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Now, `file:///` URIs can be used to store on a file system. Includes the following original commits by @colmaengus (Colm Aengus Murphy <[email protected]>), rebased onto 1.1.0b1 by [email protected]: - First pass at file blobstore (6cddec1) - Fixed style check errors (4dada39) - Fixed permissions (9af2498) - Added tests for file blobstore (963d1f1) - Fixed code check and added file store to README (69905a1)
- Loading branch information
1 parent
63dcd81
commit c12cf97
Showing
17 changed files
with
566 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,10 +10,12 @@ Bo Shi <[email protected]> | |
Brian Oldfield <[email protected]> | ||
Brian Rosner <[email protected]> | ||
Carlo Cabanilla <[email protected]> | ||
Charles Duffy <[email protected]> | ||
Chris Armstrong <[email protected]> | ||
Christian Pedersen <[email protected]> | ||
Christophe Pettus <[email protected]> | ||
Christopher Weibel <[email protected]> | ||
Colm Aengus Murphy <[email protected]> | ||
Dan Milstein <[email protected]> | ||
Dan Robinson <[email protected]> | ||
Daniel Farina <[email protected]> | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
import pytest | ||
import os | ||
import errno | ||
|
||
from subprocess import call | ||
|
||
from wal_e.storage import StorageLayout | ||
from wal_e import exception | ||
from wal_e.operator.file_operator import FileBackup | ||
|
||
from wal_e.blobstore.file import uri_put_file | ||
from wal_e.blobstore.file import uri_get_file | ||
from wal_e.blobstore.file import do_lzop_get | ||
from wal_e.blobstore.file import write_and_return_error | ||
|
||
|
||
def create_files(files): | ||
"""Helper function to create a test directory structure. | ||
File path is used as file contents""" | ||
for f in files: | ||
dir_path = os.path.dirname(f) | ||
if not os.path.exists(dir_path): | ||
os.makedirs(dir_path) | ||
fp = open(f, "wb") | ||
fp.write(f.encode("utf-8")) | ||
fp.close() | ||
|
||
|
||
def test_valid_prefix(): | ||
store = StorageLayout("file://localhost/tmp") | ||
assert store.is_file is True | ||
|
||
|
||
def test_invalid_prefix(): | ||
with pytest.raises(exception.UserException): | ||
StorageLayout("notfile://localhost/tmp") | ||
|
||
|
||
def test_uri_put_file_writes_key_file(tmpdir): | ||
base = str(tmpdir.mkdir("base")) | ||
file_list = [base + "/src.txt"] | ||
create_files(file_list) | ||
with open(base + "/src.txt", "rb") as f: | ||
uri_put_file("", "file://localhost/" + base + "/dst.txt", f) | ||
|
||
with open(base + "/dst.txt", "rb") as dst_file: | ||
assert dst_file.read() == file_list[0].encode('utf-8') | ||
|
||
|
||
def test_uri_put_file_creates_key_dir(tmpdir): | ||
"""Verify file upload""" | ||
base = str(tmpdir.mkdir("base")) | ||
file_list = [base + "/src.txt"] | ||
create_files(file_list) | ||
with open(file_list[0], "rb") as f: | ||
uri_put_file("", "file://localhost/" + base + "/subdir/dst.txt", f) | ||
|
||
with open(base + "/subdir//dst.txt", "rb") as dst_file: | ||
assert dst_file.read() == file_list[0].encode('utf-8') | ||
|
||
|
||
def test_uri_get_file(tmpdir): | ||
"""Verify file download""" | ||
base = str(tmpdir.mkdir("base")) | ||
file_list = [base + "/src.txt"] | ||
create_files(file_list) | ||
file_contents = uri_get_file("", "file://localhost/" + base + "/src.txt") | ||
assert file_contents == file_list[0].encode('utf-8') | ||
|
||
|
||
def test_bucket_list(tmpdir): | ||
"""Verify bucket keys can be listed""" | ||
base = str(tmpdir.mkdir("base")) | ||
file_list = [base + "/subdirfile.txt", | ||
base + "/subdir/file.txt"] | ||
create_files(file_list) | ||
store = StorageLayout("file://localhost/" + base) | ||
backup = FileBackup(store, "", "") | ||
conn = backup.cinfo.connect("") | ||
bucket = conn.get_bucket("") | ||
result = bucket.list(base) | ||
assert len(result) == len(file_list) | ||
for f in file_list: | ||
matches = [x for x in result if x.path == f] | ||
assert len(matches) == 1 | ||
assert hasattr(matches[0], 'size') is True | ||
assert hasattr(matches[0], 'last_modified') is True | ||
|
||
|
||
def test_delete_keys(tmpdir): | ||
"""Verify keys are deleted and bucket is trimmed""" | ||
base = str(tmpdir.mkdir("base")) | ||
file_list = [base + "/subdir1/file.txt", | ||
base + "/subdir2/file.txt"] | ||
create_files(file_list) | ||
store = StorageLayout("file://localhost/" + base) | ||
backup = FileBackup(store, "", "") | ||
conn = backup.cinfo.connect("") | ||
bucket = conn.get_bucket("") | ||
bucket.delete_keys(file_list) | ||
assert len(os.listdir(base)) == 0 | ||
|
||
|
||
def test_do_lzop_get(tmpdir): | ||
"""Create a dummy lzo file and confirm it is download/decompression""" | ||
base = str(tmpdir.mkdir("base")) | ||
file_list = [base + "/src.txt"] | ||
create_files(file_list) | ||
call(["lzop", base + "/src.txt"]) | ||
do_lzop_get("", "file://localhost/" + base + "/src.txt.lzo", | ||
base + "/dst.txt", False, do_retry=True) | ||
|
||
with open(base + "/dst.txt", "rb") as dst_file: | ||
assert dst_file.read() == file_list[0].encode('utf-8') | ||
|
||
|
||
def test_do_lzop_get_missing_key(tmpdir): | ||
"""Verify exception is raised for missing key""" | ||
base = str(tmpdir.mkdir("base")) | ||
with pytest.raises(IOError) as e: | ||
do_lzop_get("", "file://localhost/" + base + "/src.txt.lzo", | ||
base + "/dst.txt", False, do_retry=True) | ||
|
||
assert e.value.errno == errno.ENOENT | ||
|
||
|
||
def test_write_and_return_error(tmpdir): | ||
"""Verify None as result in normal operation""" | ||
base = str(tmpdir.mkdir("base")) | ||
file_list = [base + "/src.txt"] | ||
create_files(file_list) | ||
|
||
store = StorageLayout("file://localhost/" + base) | ||
backup = FileBackup(store, "", "") | ||
conn = backup.cinfo.connect("") | ||
bucket = conn.get_bucket("") | ||
f = open(base + "/dst.txt", "wb") | ||
key = bucket.get_key(base + "/src.txt") | ||
|
||
result = write_and_return_error(key, f) | ||
assert result is None | ||
|
||
with open(base + "/dst.txt", "rb") as dst_file: | ||
assert dst_file.read() == file_list[0].encode('utf-8') | ||
|
||
|
||
def test_write_and_return_error_with_error(tmpdir): | ||
"""Verify exception as result in error operation""" | ||
base = str(tmpdir.mkdir("base")) | ||
file_list = [base + "/src.txt"] | ||
create_files(file_list) | ||
|
||
store = StorageLayout("file://localhost/" + base) | ||
backup = FileBackup(store, "", "") | ||
conn = backup.cinfo.connect("") | ||
bucket = conn.get_bucket("") | ||
f = open(base + "/dst.txt", "wb") | ||
key = bucket.get_key(base + "/missing.txt") | ||
|
||
with pytest.raises(IOError) as e: | ||
result = write_and_return_error(key, f) | ||
raise result | ||
|
||
assert e.value.errno == errno.ENOENT |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from wal_e.blobstore.file.file_credentials import Credentials | ||
from wal_e.blobstore.file.file_util import do_lzop_get | ||
from wal_e.blobstore.file.file_util import uri_get_file | ||
from wal_e.blobstore.file.file_util import uri_put_file | ||
from wal_e.blobstore.file.file_util import write_and_return_error | ||
|
||
__all__ = [ | ||
'Credentials', | ||
'do_lzop_get', | ||
'uri_get_file', | ||
'uri_put_file', | ||
'write_and_return_error', | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
import os | ||
import shutil | ||
from datetime import datetime | ||
|
||
|
||
def remove_empty_dirs(path): | ||
""" removes empty dirs under a given path """ | ||
for root, dirs, files in os.walk(path): | ||
for d in dirs: | ||
dir_path = os.path.join(root, d) | ||
if not os.listdir(dir_path): | ||
os.rmdir(dir_path) | ||
|
||
|
||
def ensure_dir_exists(path): | ||
""" create a directory if required """ | ||
dir_path = os.path.dirname(path) | ||
if not os.path.exists(dir_path): | ||
os.makedirs(dir_path) | ||
|
||
|
||
def common_dir_path(args, sep='/'): | ||
""" return the highest common directory given a list of files """ | ||
return os.path.commonprefix(args).rpartition(sep)[0] | ||
|
||
|
||
def epoch_to_iso8601(timestamp): | ||
return datetime.utcfromtimestamp(timestamp).isoformat() | ||
|
||
|
||
class FileKey(object): | ||
def __init__(self, bucket, name): | ||
self.bucket = bucket | ||
self.name = name | ||
self.path = os.path.join("/", name.strip("/")) | ||
if os.path.isfile(self.path): | ||
stat = os.stat(self.path) | ||
self.last_modified = epoch_to_iso8601(stat.st_mtime) | ||
self.size = stat.st_size | ||
|
||
def get_contents_as_string(self): | ||
with open(self.path, 'rb') as fp: | ||
contents = fp.read() | ||
return contents | ||
|
||
def set_contents_from_file(self, fp): | ||
ensure_dir_exists(self.path) | ||
with open(self.path, 'wb') as f: | ||
shutil.copyfileobj(fp, f) | ||
setattr(self, 'size', os.path.getsize(self.path)) | ||
|
||
def get_contents_to_file(self, fp): | ||
with open(self.path, 'rb') as f: | ||
shutil.copyfileobj(f, fp) | ||
|
||
|
||
class Bucket(object): | ||
def __init__(self, name): | ||
self.name = name | ||
|
||
def get_key(self, name): | ||
return FileKey(bucket=self, name=name) | ||
|
||
def delete_keys(self, keys): | ||
for k in keys: | ||
key_path = os.path.join("/", k.strip("/")) | ||
os.remove(key_path) | ||
# deleting files can leave empty dirs => trim them | ||
common_path = os.path.join("/", common_dir_path(keys).strip("/")) | ||
remove_empty_dirs(common_path) | ||
|
||
def list(self, prefix): | ||
path = "/" + prefix | ||
file_paths = [os.path.join(root, f) | ||
for root, dirs, files in os.walk(path) for f in files] | ||
# convert to an array of Keys | ||
return [FileKey(bucket=self, name=f) for f in file_paths] | ||
|
||
|
||
class Connection(object): | ||
|
||
def get_bucket(self, name, validate=False): | ||
return Bucket(name) | ||
|
||
|
||
def connect(creds): | ||
return Connection() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
class Credentials(object): | ||
def __init__(self): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
from urllib.parse import urlparse | ||
import gevent | ||
|
||
from . import calling_format | ||
from wal_e import files | ||
from wal_e import log_help | ||
from wal_e.pipeline import get_download_pipeline | ||
from wal_e.piper import PIPE | ||
|
||
logger = log_help.WalELogger(__name__) | ||
|
||
|
||
def _uri_to_key(creds, uri, conn=None): | ||
assert uri.startswith('file://') | ||
url_tup = urlparse(uri) | ||
bucket_name = url_tup.netloc | ||
if conn is None: | ||
conn = calling_format.connect(creds) | ||
return conn.get_bucket(bucket_name).get_key(url_tup.path) | ||
|
||
|
||
def uri_put_file(creds, uri, fp, content_type=None, conn=None): | ||
assert fp.tell() == 0 | ||
|
||
k = _uri_to_key(creds, uri, conn=conn) | ||
|
||
k.set_contents_from_file(fp) | ||
return k | ||
|
||
|
||
def uri_get_file(creds, uri, conn=None): | ||
k = _uri_to_key(creds, uri, conn=conn) | ||
return k.get_contents_as_string() | ||
|
||
|
||
def do_lzop_get(creds, url, path, decrypt, do_retry): | ||
""" | ||
Get and decompress a URL | ||
This streams the content directly to lzop; the compressed version | ||
is never stored on disk. | ||
""" | ||
assert url.endswith('.lzo'), 'Expect an lzop-compressed file' | ||
|
||
with files.DeleteOnError(path) as decomp_out: | ||
key = _uri_to_key(creds, url) | ||
with get_download_pipeline(PIPE, decomp_out.f, decrypt) as pl: | ||
g = gevent.spawn(write_and_return_error, key, pl.stdin) | ||
exc = g.get() | ||
if exc is not None: | ||
raise exc | ||
|
||
logger.info( | ||
msg='completed download and decompression', | ||
detail='Downloaded and decompressed "{url}" to "{path}"' | ||
.format(url=url, path=path)) | ||
|
||
return True | ||
|
||
|
||
def write_and_return_error(key, stream): | ||
try: | ||
key.get_contents_to_file(stream) | ||
stream.flush() | ||
except Exception as e: | ||
return e | ||
finally: | ||
stream.close() |
Oops, something went wrong.