Skip to content

Commit

Permalink
archive: split out attr setting from 'unpack_archive_multitgz' into n…
Browse files Browse the repository at this point in the history
…ew function.
  • Loading branch information
pjbriggs committed Feb 7, 2025
1 parent 2dbe8b3 commit a44d438
Show file tree
Hide file tree
Showing 2 changed files with 327 additions and 14 deletions.
102 changes: 88 additions & 14 deletions ngsarchiver/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -2214,7 +2214,8 @@ def make_empty_archive(archive_name, root_dir, base_dir=None,
f"to archive: {ex}")
return archive_name

def unpack_archive_multitgz(archive_list,extract_dir=None):
def unpack_archive_multitgz(archive_list, extract_dir=None,
set_permissions=False, set_times=False):
"""
Unpack a multi-volume 'gztar' archive
Expand All @@ -2223,6 +2224,12 @@ def unpack_archive_multitgz(archive_list,extract_dir=None):
unpack
extract_dir (str): specifies directory to unpack
volumes into (default: current directory)
set_permissions (bool): if True then set permissions
on extracted files to those from the archive
(default: don't set permissions)
set_times (bool): if True then set times on extracted
files to those from the archive (default: don't set
times)
"""
if extract_dir is None:
extract_dir = os.getcwd()
Expand All @@ -2234,20 +2241,87 @@ def unpack_archive_multitgz(archive_list,extract_dir=None):
# volumes)
with tarfile.open(a,'r:gz',errorlevel=1) as tgz:
for o in tgz:
try:
tgz.extract(o,path=extract_dir,set_attrs=False)
except Exception as ex:
print("Exception extracting '%s' from '%s': %s"
% (o.name,a,ex))
raise ex
atime = time.time()
if not o.isdir():
# Extract file without attributes
try:
tgz.extract(o, path=extract_dir, set_attrs=False)
except Exception as ex:
print(f"Exception extracting '{o.name}' from '{a}': "
f"{ex}")
raise ex
else:
# Explicitly create directories rather than
# extracting them (workaround for setting
# default permissions)
try:
os.makedirs(os.path.join(extract_dir, o.name))
except Exception as ex:
print(f"Exception creating directory '{o.name}' "
f"from '{a}': {ex}")
raise ex
# Set attributes (time and mode) on extracted files
set_attributes_from_archive_multitgz(archive_list,
extract_dir=extract_dir,
set_permissions=set_permissions,
set_times=set_times)

def set_attributes_from_archive_multitgz(archive_list, extract_dir=None,
set_permissions=False,
set_times=False):
"""
Update permissions and/or times on extracted files
Arguments:
archive_list (list): list of archive volumes to
copy attributes from
extract_dir (str): specifies directory where unpacked
files and directories are (default: current directory)
set_permissions (bool): if True then set permissions
on extracted files to those from the archive
(default: don't set permissions)
set_times (bool): if True then set times on extracted
files to those from the archive (default: don't set
times)
"""
if set_permissions and set_times:
attr_types = "permissions and times"
elif set_permissions and not set_times:
attr_types = "permissions"
elif set_times and not set_permissions:
attr_types = "times"
else:
# Nothing to do
return
if extract_dir is None:
extract_dir = os.getcwd()
attributes = {}
for a in archive_list:
print("Updating attributes from %s..." % a)
with tarfile.open(a,'r:gz',errorlevel=1) as tgz:
for o in tgz:
o_ = os.path.join(extract_dir,o.name)
chmod(o_,o.mode)
utime(o_,(atime,o.mtime))
print(f"Collecting attributes from {a}...")
with tarfile.open(a,'r:gz', errorlevel=1) as tgz:
for src in tgz:
tgt = os.path.join(extract_dir, src.name)
if os.path.islink(tgt):
continue
attributes[src.name] = (src.mtime, src.mode)
atime = time.time()
print(f"Updating {attr_types} on files...")
for src in attributes:
tgt = os.path.join(extract_dir, src)
if not os.path.isdir(tgt):
attrs = attributes[src]
if set_times:
utime(tgt, (atime, attrs[0]))
if set_permissions:
chmod(tgt, attrs[1])
print(f"Updating {attr_types} on directories...")
for src in attributes:
tgt = os.path.join(extract_dir, src)
if os.path.isdir(tgt):
attrs = attributes[src]
if set_times:
utime(tgt, (atime, attrs[0]))
if set_permissions:
chmod(tgt, attrs[1])

def make_copy(d, dest, replace_symlinks=False,
transform_broken_symlinks=False,
Expand Down
239 changes: 239 additions & 0 deletions ngsarchiver/test/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -2738,6 +2738,245 @@ def test_archivedirectory_unpack_non_standard_name(self):
self.assertTrue(os.path.exists(
os.path.join(extract_dir,"example","ex1.txt")))

def test_archivedirectory_unpack_ignores_missing_owner_rw_permissions(self):
"""
ArchiveDirectory: unpack archive ignores missing owner 'rw' permissions
"""
# Build example archive dir containing file and directory
# with 'rw' permissions stripped for user for ex1.
example_archive = UnittestDir(os.path.join(self.wd,
"example.archive"))
example_archive.add("example.tar.gz",
type="binary",
content=base64.b64decode(b'H4sIAMT5mWcAA+3VQQrCMBAF0Kw9RU6gM+mkOYFLD9FiFopiqRFyfNtqRQpWF6ai/rfJooEOfObHx2Jf7fxCJUQN52x7srN0f/YUi4ghccY095hy45S2KYfqnY6hqLVW1bYcvffs+5fy1/x95HmIIck/uoRzeZw/21v+7FyTv6GclaYk0wz8ef7LS/46+Bhmnx4GJtfv//FUrjd1mmeg2/FX+1+o3X8mi/6fwiB/H837n4Gu/kf73wzyzzIR9P8UVoca5Q8AAAAAAAAAAAAAAAAA8APOCW7Y2gAoAAA='))
example_archive.add("example.md5",
type="file",
content="""8bcc714d327b74a95a166574d0103f5c example/ex1.txt
cfac359b4837003003a79a3b237f1d32 example/subdir/ex2.txt
""")
example_archive.add("ARCHIVE_METADATA/archive_checksums.md5",
type="file",
content="c0a5d4fff64a75c6fa10ce44ee172230 example.tar.gz\n")
example_archive.add("ARCHIVE_METADATA/archiver_metadata.json",
type="file",
content="""{
"name": "example",
"source": "/original/path/to/example",
"source_date": "2019-11-27 17:19:02",
"type": "ArchiveDirectory",
"subarchives": [
"example.tar.gz"
],
"files": [],
"user": "anon",
"creation_date": "2023-06-16 09:58:39",
"multi_volume": false,
"volume_size": null,
"compression_level": 6,
"ngsarchiver_version": "1.9.0"
}
""")
example_archive.add("ARCHIVE_METADATA/manifest",type="file")
example_archive.add("ARCHIVE_README.txt",type="file")
example_archive.add("ARCHIVE_FILELIST.txt",type="file")
example_archive.add("ARCHIVE_TREE.txt",type="file")
example_archive.create()
p = example_archive.path
# Expected contents
expected = ('example/ex1.txt',
'example/subdir',
'example/subdir/ex2.txt',)
# Check example loads as ArchiveDirectory
a = ArchiveDirectory(p)
self.assertTrue(isinstance(a,ArchiveDirectory))
# Check subset of metadata
metadata = a.archive_metadata
self.assertEqual(metadata['name'],"example")
self.assertEqual(metadata['subarchives'],["example.tar.gz"])
self.assertEqual(metadata['files'],[])
self.assertEqual(metadata['multi_volume'],False)
self.assertEqual(metadata['volume_size'],None)
# List contents
for item in a.list():
self.assertTrue(item.path in expected,
"%s: unexpected item" % item.path)
# Search for items
self.assertEqual(sorted([x.path for x in a.search(name="ex1.*")]),
["example/ex1.txt"])
self.assertEqual(sorted([x.path for x in a.search(
name="ex1.*",
path="*/ex1.txt")]),
["example/ex1.txt"])
# Verify archive
self.assertTrue(a.verify_archive())
# Unpack (& check no extra artefacts are created)
self.assertFalse(os.path.exists(os.path.join(self.wd,"example")))
self.assertEqual(os.listdir(self.wd), ["example.archive"])
try:
a.unpack(extract_dir=self.wd)
self.assertTrue(os.path.exists(os.path.join(self.wd,"example")))
self.assertEqual(sorted(os.listdir(self.wd)),
["example", "example.archive"])
self.assertEqual(os.path.getmtime(os.path.join(self.wd,"example")),
os.path.getmtime(a.path))
for item in expected:
self.assertTrue(
os.path.exists(os.path.join(self.wd,item)),
"missing '%s'" % item)
# Check extra items aren't present
for item in Directory(os.path.join(self.wd,"example")).walk():
self.assertTrue(os.path.relpath(item,self.wd) in expected,
"'%s' not expected" % item)
# Check read-write permissions are present for
# specific items
for item in ["ex1.txt", "subdir"]:
self.assertTrue(
os.access(os.path.join(self.wd, "example", item),
os.R_OK))
self.assertTrue(
os.access(os.path.join(self.wd, "example", item),
os.W_OK))
# Extract items
extract_dir = os.path.join(self.wd,"test_extract")
os.mkdir(extract_dir)
a.extract_files(name="example/ex1.*",extract_dir=extract_dir)
self.assertTrue(os.path.exists(
os.path.join(extract_dir,"ex1.txt")))
a.extract_files(name="example/ex1.*",extract_dir=extract_dir,
include_path=True)
self.assertTrue(os.path.exists(
os.path.join(extract_dir,"example","ex1.txt")))
finally:
# Reset write permissions to allow deletion
example_dir = os.path.join(self.wd,"example")
if os.path.exists(example_dir):
for o in Directory(example_dir).walk():
if os.path.isdir(o):
os.chmod(o, 0o755)
else:
os.chmod(o, 0o644)

def test_archivedirectory_unpack_copies_missing_owner_rw_permissions(self):
"""
ArchiveDirectory: unpack archive copies missing owner 'rw' permissions
"""
# Build example archive dir containing file and directory
# with 'rw' permissions stripped for user for ex1.
example_archive = UnittestDir(os.path.join(self.wd,
"example.archive"))
example_archive.add("example.tar.gz",
type="binary",
content=base64.b64decode(b'H4sIAMT5mWcAA+3VQQrCMBAF0Kw9RU6gM+mkOYFLD9FiFopiqRFyfNtqRQpWF6ai/rfJooEOfObHx2Jf7fxCJUQN52x7srN0f/YUi4ghccY095hy45S2KYfqnY6hqLVW1bYcvffs+5fy1/x95HmIIck/uoRzeZw/21v+7FyTv6GclaYk0wz8ef7LS/46+Bhmnx4GJtfv//FUrjd1mmeg2/FX+1+o3X8mi/6fwiB/H837n4Gu/kf73wzyzzIR9P8UVoca5Q8AAAAAAAAAAAAAAAAA8APOCW7Y2gAoAAA='))
example_archive.add("example.md5",
type="file",
content="""8bcc714d327b74a95a166574d0103f5c example/ex1.txt
cfac359b4837003003a79a3b237f1d32 example/subdir/ex2.txt
""")
example_archive.add("ARCHIVE_METADATA/archive_checksums.md5",
type="file",
content="c0a5d4fff64a75c6fa10ce44ee172230 example.tar.gz\n")
example_archive.add("ARCHIVE_METADATA/archiver_metadata.json",
type="file",
content="""{
"name": "example",
"source": "/original/path/to/example",
"source_date": "2019-11-27 17:19:02",
"type": "ArchiveDirectory",
"subarchives": [
"example.tar.gz"
],
"files": [],
"user": "anon",
"creation_date": "2023-06-16 09:58:39",
"multi_volume": false,
"volume_size": null,
"compression_level": 6,
"ngsarchiver_version": "1.9.0"
}
""")
example_archive.add("ARCHIVE_METADATA/manifest",type="file")
example_archive.add("ARCHIVE_README.txt",type="file")
example_archive.add("ARCHIVE_FILELIST.txt",type="file")
example_archive.add("ARCHIVE_TREE.txt",type="file")
example_archive.create()
p = example_archive.path
# Expected contents
expected = ('example/ex1.txt',
'example/subdir',
'example/subdir/ex2.txt',)
# Readable contents
readable = ('example/ex1.txt',
'example/subdir',)
# Check example loads as ArchiveDirectory
a = ArchiveDirectory(p)
self.assertTrue(isinstance(a,ArchiveDirectory))
# Check subset of metadata
metadata = a.archive_metadata
self.assertEqual(metadata['name'],"example")
self.assertEqual(metadata['subarchives'],["example.tar.gz"])
self.assertEqual(metadata['files'],[])
self.assertEqual(metadata['multi_volume'],False)
self.assertEqual(metadata['volume_size'],None)
# List contents
for item in a.list():
self.assertTrue(item.path in expected,
"%s: unexpected item" % item.path)
# Search for items
self.assertEqual(sorted([x.path for x in a.search(name="ex1.*")]),
["example/ex1.txt"])
self.assertEqual(sorted([x.path for x in a.search(
name="ex1.*",
path="*/ex1.txt")]),
["example/ex1.txt"])
# Verify archive
self.assertTrue(a.verify_archive())
# Unpack (& check no extra artefacts are created)
self.assertFalse(os.path.exists(os.path.join(self.wd,"example")))
self.assertEqual(os.listdir(self.wd), ["example.archive"])
try:
a.unpack(extract_dir=self.wd, set_permissions=True)
self.assertTrue(os.path.exists(os.path.join(self.wd,"example")))
self.assertEqual(sorted(os.listdir(self.wd)),
["example", "example.archive"])
self.assertEqual(os.path.getmtime(os.path.join(self.wd,"example")),
os.path.getmtime(a.path))
for item in readable:
self.assertTrue(
os.path.exists(os.path.join(self.wd,item)),
"missing '%s'" % item)
# Check extra items aren't present
for item in Directory(os.path.join(self.wd,"example")).walk():
self.assertTrue(os.path.relpath(item,self.wd) in expected,
"'%s' not expected" % item)
# Check read-write permissions are missing for
# specific files
for item in ["ex1.txt", "subdir"]:
self.assertFalse(
os.access(os.path.join(self.wd, "example", item),
os.R_OK))
self.assertFalse(
os.access(os.path.join(self.wd, "example", item),
os.W_OK))
# Extract items
extract_dir = os.path.join(self.wd,"test_extract")
os.mkdir(extract_dir)
a.extract_files(name="example/ex1.*",extract_dir=extract_dir)
self.assertTrue(os.path.exists(
os.path.join(extract_dir,"ex1.txt")))
a.extract_files(name="example/ex1.*",extract_dir=extract_dir,
include_path=True)
self.assertTrue(os.path.exists(
os.path.join(extract_dir,"example","ex1.txt")))
finally:
# Reset write permissions to allow deletion
example_dir = os.path.join(self.wd,"example")
if os.path.exists(example_dir):
for o in Directory(example_dir).walk():
if os.path.isdir(o):
os.chmod(o, 0o755)
else:
os.chmod(o, 0o644)


class TestLegacyArchiveDirectory(unittest.TestCase):

Expand Down

0 comments on commit a44d438

Please sign in to comment.