diff --git a/ngsarchiver/archive.py b/ngsarchiver/archive.py index dfe30ec..923e213 100644 --- a/ngsarchiver/archive.py +++ b/ngsarchiver/archive.py @@ -2214,7 +2214,8 @@ def make_empty_archive(archive_name, root_dir, base_dir=None, f"to archive: {ex}") return archive_name -def unpack_archive_multitgz(archive_list,extract_dir=None): +def unpack_archive_multitgz(archive_list, extract_dir=None, + set_permissions=False, set_times=False): """ Unpack a multi-volume 'gztar' archive @@ -2223,6 +2224,12 @@ def unpack_archive_multitgz(archive_list,extract_dir=None): unpack extract_dir (str): specifies directory to unpack volumes into (default: current directory) + set_permissions (bool): if True then set permissions + on extracted files to those from the archive + (default: don't set permissions) + set_times (bool): if True then set times on extracted + files to those from the archive (default: don't set + times) """ if extract_dir is None: extract_dir = os.getcwd() @@ -2234,20 +2241,87 @@ def unpack_archive_multitgz(archive_list,extract_dir=None): # volumes) with tarfile.open(a,'r:gz',errorlevel=1) as tgz: for o in tgz: - try: - tgz.extract(o,path=extract_dir,set_attrs=False) - except Exception as ex: - print("Exception extracting '%s' from '%s': %s" - % (o.name,a,ex)) - raise ex - atime = time.time() + if not o.isdir(): + # Extract file without attributes + try: + tgz.extract(o, path=extract_dir, set_attrs=False) + except Exception as ex: + print(f"Exception extracting '{o.name}' from '{a}': " + f"{ex}") + raise ex + else: + # Explicitly create directories rather than + # extracting them (workaround for setting + # default permissions) + try: + os.makedirs(os.path.join(extract_dir, o.name)) + except Exception as ex: + print(f"Exception creating directory '{o.name}' " + f"from '{a}': {ex}") + raise ex + # Set attributes (time and mode) on extracted files + set_attributes_from_archive_multitgz(archive_list, + extract_dir=extract_dir, + set_permissions=set_permissions, + set_times=set_times) + +def set_attributes_from_archive_multitgz(archive_list, extract_dir=None, + set_permissions=False, + set_times=False): + """ + Update permissions and/or times on extracted files + + Arguments: + archive_list (list): list of archive volumes to + copy attributes from + extract_dir (str): specifies directory where unpacked + files and directories are (default: current directory) + set_permissions (bool): if True then set permissions + on extracted files to those from the archive + (default: don't set permissions) + set_times (bool): if True then set times on extracted + files to those from the archive (default: don't set + times) + """ + if set_permissions and set_times: + attr_types = "permissions and times" + elif set_permissions and not set_times: + attr_types = "permissions" + elif set_times and not set_permissions: + attr_types = "times" + else: + # Nothing to do + return + if extract_dir is None: + extract_dir = os.getcwd() + attributes = {} for a in archive_list: - print("Updating attributes from %s..." % a) - with tarfile.open(a,'r:gz',errorlevel=1) as tgz: - for o in tgz: - o_ = os.path.join(extract_dir,o.name) - chmod(o_,o.mode) - utime(o_,(atime,o.mtime)) + print(f"Collecting attributes from {a}...") + with tarfile.open(a,'r:gz', errorlevel=1) as tgz: + for src in tgz: + tgt = os.path.join(extract_dir, src.name) + if os.path.islink(tgt): + continue + attributes[src.name] = (src.mtime, src.mode) + atime = time.time() + print(f"Updating {attr_types} on files...") + for src in attributes: + tgt = os.path.join(extract_dir, src) + if not os.path.isdir(tgt): + attrs = attributes[src] + if set_times: + utime(tgt, (atime, attrs[0])) + if set_permissions: + chmod(tgt, attrs[1]) + print(f"Updating {attr_types} on directories...") + for src in attributes: + tgt = os.path.join(extract_dir, src) + if os.path.isdir(tgt): + attrs = attributes[src] + if set_times: + utime(tgt, (atime, attrs[0])) + if set_permissions: + chmod(tgt, attrs[1]) def make_copy(d, dest, replace_symlinks=False, transform_broken_symlinks=False, diff --git a/ngsarchiver/test/test_archive.py b/ngsarchiver/test/test_archive.py index 4a29d14..d3f7980 100644 --- a/ngsarchiver/test/test_archive.py +++ b/ngsarchiver/test/test_archive.py @@ -2738,6 +2738,245 @@ def test_archivedirectory_unpack_non_standard_name(self): self.assertTrue(os.path.exists( os.path.join(extract_dir,"example","ex1.txt"))) + def test_archivedirectory_unpack_ignores_missing_owner_rw_permissions(self): + """ + ArchiveDirectory: unpack archive ignores missing owner 'rw' permissions + """ + # Build example archive dir containing file and directory + # with 'rw' permissions stripped for user for ex1. + example_archive = UnittestDir(os.path.join(self.wd, + "example.archive")) + example_archive.add("example.tar.gz", + type="binary", + content=base64.b64decode(b'H4sIAMT5mWcAA+3VQQrCMBAF0Kw9RU6gM+mkOYFLD9FiFopiqRFyfNtqRQpWF6ai/rfJooEOfObHx2Jf7fxCJUQN52x7srN0f/YUi4ghccY095hy45S2KYfqnY6hqLVW1bYcvffs+5fy1/x95HmIIck/uoRzeZw/21v+7FyTv6GclaYk0wz8ef7LS/46+Bhmnx4GJtfv//FUrjd1mmeg2/FX+1+o3X8mi/6fwiB/H837n4Gu/kf73wzyzzIR9P8UVoca5Q8AAAAAAAAAAAAAAAAA8APOCW7Y2gAoAAA=')) + example_archive.add("example.md5", + type="file", + content="""8bcc714d327b74a95a166574d0103f5c example/ex1.txt +cfac359b4837003003a79a3b237f1d32 example/subdir/ex2.txt +""") + example_archive.add("ARCHIVE_METADATA/archive_checksums.md5", + type="file", + content="c0a5d4fff64a75c6fa10ce44ee172230 example.tar.gz\n") + example_archive.add("ARCHIVE_METADATA/archiver_metadata.json", + type="file", + content="""{ + "name": "example", + "source": "/original/path/to/example", + "source_date": "2019-11-27 17:19:02", + "type": "ArchiveDirectory", + "subarchives": [ + "example.tar.gz" + ], + "files": [], + "user": "anon", + "creation_date": "2023-06-16 09:58:39", + "multi_volume": false, + "volume_size": null, + "compression_level": 6, + "ngsarchiver_version": "1.9.0" +} +""") + example_archive.add("ARCHIVE_METADATA/manifest",type="file") + example_archive.add("ARCHIVE_README.txt",type="file") + example_archive.add("ARCHIVE_FILELIST.txt",type="file") + example_archive.add("ARCHIVE_TREE.txt",type="file") + example_archive.create() + p = example_archive.path + # Expected contents + expected = ('example/ex1.txt', + 'example/subdir', + 'example/subdir/ex2.txt',) + # Check example loads as ArchiveDirectory + a = ArchiveDirectory(p) + self.assertTrue(isinstance(a,ArchiveDirectory)) + # Check subset of metadata + metadata = a.archive_metadata + self.assertEqual(metadata['name'],"example") + self.assertEqual(metadata['subarchives'],["example.tar.gz"]) + self.assertEqual(metadata['files'],[]) + self.assertEqual(metadata['multi_volume'],False) + self.assertEqual(metadata['volume_size'],None) + # List contents + for item in a.list(): + self.assertTrue(item.path in expected, + "%s: unexpected item" % item.path) + # Search for items + self.assertEqual(sorted([x.path for x in a.search(name="ex1.*")]), + ["example/ex1.txt"]) + self.assertEqual(sorted([x.path for x in a.search( + name="ex1.*", + path="*/ex1.txt")]), + ["example/ex1.txt"]) + # Verify archive + self.assertTrue(a.verify_archive()) + # Unpack (& check no extra artefacts are created) + self.assertFalse(os.path.exists(os.path.join(self.wd,"example"))) + self.assertEqual(os.listdir(self.wd), ["example.archive"]) + try: + a.unpack(extract_dir=self.wd) + self.assertTrue(os.path.exists(os.path.join(self.wd,"example"))) + self.assertEqual(sorted(os.listdir(self.wd)), + ["example", "example.archive"]) + self.assertEqual(os.path.getmtime(os.path.join(self.wd,"example")), + os.path.getmtime(a.path)) + for item in expected: + self.assertTrue( + os.path.exists(os.path.join(self.wd,item)), + "missing '%s'" % item) + # Check extra items aren't present + for item in Directory(os.path.join(self.wd,"example")).walk(): + self.assertTrue(os.path.relpath(item,self.wd) in expected, + "'%s' not expected" % item) + # Check read-write permissions are present for + # specific items + for item in ["ex1.txt", "subdir"]: + self.assertTrue( + os.access(os.path.join(self.wd, "example", item), + os.R_OK)) + self.assertTrue( + os.access(os.path.join(self.wd, "example", item), + os.W_OK)) + # Extract items + extract_dir = os.path.join(self.wd,"test_extract") + os.mkdir(extract_dir) + a.extract_files(name="example/ex1.*",extract_dir=extract_dir) + self.assertTrue(os.path.exists( + os.path.join(extract_dir,"ex1.txt"))) + a.extract_files(name="example/ex1.*",extract_dir=extract_dir, + include_path=True) + self.assertTrue(os.path.exists( + os.path.join(extract_dir,"example","ex1.txt"))) + finally: + # Reset write permissions to allow deletion + example_dir = os.path.join(self.wd,"example") + if os.path.exists(example_dir): + for o in Directory(example_dir).walk(): + if os.path.isdir(o): + os.chmod(o, 0o755) + else: + os.chmod(o, 0o644) + + def test_archivedirectory_unpack_copies_missing_owner_rw_permissions(self): + """ + ArchiveDirectory: unpack archive copies missing owner 'rw' permissions + """ + # Build example archive dir containing file and directory + # with 'rw' permissions stripped for user for ex1. + example_archive = UnittestDir(os.path.join(self.wd, + "example.archive")) + example_archive.add("example.tar.gz", + type="binary", + content=base64.b64decode(b'H4sIAMT5mWcAA+3VQQrCMBAF0Kw9RU6gM+mkOYFLD9FiFopiqRFyfNtqRQpWF6ai/rfJooEOfObHx2Jf7fxCJUQN52x7srN0f/YUi4ghccY095hy45S2KYfqnY6hqLVW1bYcvffs+5fy1/x95HmIIck/uoRzeZw/21v+7FyTv6GclaYk0wz8ef7LS/46+Bhmnx4GJtfv//FUrjd1mmeg2/FX+1+o3X8mi/6fwiB/H837n4Gu/kf73wzyzzIR9P8UVoca5Q8AAAAAAAAAAAAAAAAA8APOCW7Y2gAoAAA=')) + example_archive.add("example.md5", + type="file", + content="""8bcc714d327b74a95a166574d0103f5c example/ex1.txt +cfac359b4837003003a79a3b237f1d32 example/subdir/ex2.txt +""") + example_archive.add("ARCHIVE_METADATA/archive_checksums.md5", + type="file", + content="c0a5d4fff64a75c6fa10ce44ee172230 example.tar.gz\n") + example_archive.add("ARCHIVE_METADATA/archiver_metadata.json", + type="file", + content="""{ + "name": "example", + "source": "/original/path/to/example", + "source_date": "2019-11-27 17:19:02", + "type": "ArchiveDirectory", + "subarchives": [ + "example.tar.gz" + ], + "files": [], + "user": "anon", + "creation_date": "2023-06-16 09:58:39", + "multi_volume": false, + "volume_size": null, + "compression_level": 6, + "ngsarchiver_version": "1.9.0" +} +""") + example_archive.add("ARCHIVE_METADATA/manifest",type="file") + example_archive.add("ARCHIVE_README.txt",type="file") + example_archive.add("ARCHIVE_FILELIST.txt",type="file") + example_archive.add("ARCHIVE_TREE.txt",type="file") + example_archive.create() + p = example_archive.path + # Expected contents + expected = ('example/ex1.txt', + 'example/subdir', + 'example/subdir/ex2.txt',) + # Readable contents + readable = ('example/ex1.txt', + 'example/subdir',) + # Check example loads as ArchiveDirectory + a = ArchiveDirectory(p) + self.assertTrue(isinstance(a,ArchiveDirectory)) + # Check subset of metadata + metadata = a.archive_metadata + self.assertEqual(metadata['name'],"example") + self.assertEqual(metadata['subarchives'],["example.tar.gz"]) + self.assertEqual(metadata['files'],[]) + self.assertEqual(metadata['multi_volume'],False) + self.assertEqual(metadata['volume_size'],None) + # List contents + for item in a.list(): + self.assertTrue(item.path in expected, + "%s: unexpected item" % item.path) + # Search for items + self.assertEqual(sorted([x.path for x in a.search(name="ex1.*")]), + ["example/ex1.txt"]) + self.assertEqual(sorted([x.path for x in a.search( + name="ex1.*", + path="*/ex1.txt")]), + ["example/ex1.txt"]) + # Verify archive + self.assertTrue(a.verify_archive()) + # Unpack (& check no extra artefacts are created) + self.assertFalse(os.path.exists(os.path.join(self.wd,"example"))) + self.assertEqual(os.listdir(self.wd), ["example.archive"]) + try: + a.unpack(extract_dir=self.wd, set_permissions=True) + self.assertTrue(os.path.exists(os.path.join(self.wd,"example"))) + self.assertEqual(sorted(os.listdir(self.wd)), + ["example", "example.archive"]) + self.assertEqual(os.path.getmtime(os.path.join(self.wd,"example")), + os.path.getmtime(a.path)) + for item in readable: + self.assertTrue( + os.path.exists(os.path.join(self.wd,item)), + "missing '%s'" % item) + # Check extra items aren't present + for item in Directory(os.path.join(self.wd,"example")).walk(): + self.assertTrue(os.path.relpath(item,self.wd) in expected, + "'%s' not expected" % item) + # Check read-write permissions are missing for + # specific files + for item in ["ex1.txt", "subdir"]: + self.assertFalse( + os.access(os.path.join(self.wd, "example", item), + os.R_OK)) + self.assertFalse( + os.access(os.path.join(self.wd, "example", item), + os.W_OK)) + # Extract items + extract_dir = os.path.join(self.wd,"test_extract") + os.mkdir(extract_dir) + a.extract_files(name="example/ex1.*",extract_dir=extract_dir) + self.assertTrue(os.path.exists( + os.path.join(extract_dir,"ex1.txt"))) + a.extract_files(name="example/ex1.*",extract_dir=extract_dir, + include_path=True) + self.assertTrue(os.path.exists( + os.path.join(extract_dir,"example","ex1.txt"))) + finally: + # Reset write permissions to allow deletion + example_dir = os.path.join(self.wd,"example") + if os.path.exists(example_dir): + for o in Directory(example_dir).walk(): + if os.path.isdir(o): + os.chmod(o, 0o755) + else: + os.chmod(o, 0o644) + class TestLegacyArchiveDirectory(unittest.TestCase):