diff --git a/CHANGELOG.md b/CHANGELOG.md index 267c942b..837f4947 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased +### Added + +- Added `filter_glob` and `exclude_glob` parameters to `fs.walk.Walker`. + Closes [#459](https://github.com/PyFilesystem/pyfilesystem2/issues/459). + +### Fixed +- Elaborated documentation of `filter_dirs` and `exclude_dirs` in `fs.walk.Walker`. + Closes [#371](https://github.com/PyFilesystem/pyfilesystem2/issues/371). + ## [2.4.16] - 2022-05-02 diff --git a/fs/base.py b/fs/base.py index 07a16756..56e5cf99 100644 --- a/fs/base.py +++ b/fs/base.py @@ -21,7 +21,7 @@ from contextlib import closing from functools import partial, wraps -from . import copy, errors, fsencode, iotools, tools, walk, wildcard +from . import copy, errors, fsencode, iotools, tools, walk, wildcard, glob from .copy import copy_modified_time from .glob import BoundGlobber from .mode import validate_open_mode @@ -1696,6 +1696,63 @@ def match(self, patterns, name): matcher = wildcard.get_matcher(patterns, case_sensitive) return matcher(name) + def match_glob(self, patterns, path, accept_prefix=False): + # type: (Optional[Iterable[Text]], Text, bool) -> bool + """Check if a path matches any of a list of glob patterns. + + If a filesystem is case *insensitive* (such as Windows) then + this method will perform a case insensitive match (i.e. ``*.py`` + will match the same names as ``*.PY``). Otherwise the match will + be case sensitive (``*.py`` and ``*.PY`` will match different + names). + + Arguments: + patterns (list, optional): A list of patterns, e.g. + ``['*.py']``, or `None` to match everything. + path (str): A resource path, starting with "/". + accept_prefix (bool): If ``True``, the path is + not required to match the patterns themselves + but only need to be a prefix of a string that does. + + Returns: + bool: `True` if ``path`` matches any of the patterns. + + Raises: + TypeError: If ``patterns`` is a single string instead of + a list (or `None`). + ValueError: If ``path`` is not a string starting with "/". + + Example: + >>> my_fs.match_glob(['*.py'], '/__init__.py') + True + >>> my_fs.match_glob(['*.jpg', '*.png'], '/foo.gif') + False + >>> my_fs.match_glob(['dir/file.txt'], '/dir/', accept_prefix=True) + True + >>> my_fs.match_glob(['dir/file.txt'], '/dir/', accept_prefix=False) + False + >>> my_fs.match_glob(['dir/file.txt'], '/dir/gile.txt', accept_prefix=True) + False + + Note: + If ``patterns`` is `None` (or ``['*']``), then this + method will always return `True`. + + """ + if patterns is None: + return True + if not path or path[0] != "/": + raise ValueError("%s needs to be a string starting with /" % path) + if isinstance(patterns, six.text_type): + raise TypeError("patterns must be a list or sequence") + case_sensitive = not typing.cast( + bool, self.getmeta().get("case_insensitive", False) + ) + matcher = glob.get_matcher( + patterns, case_sensitive, accept_prefix=accept_prefix + ) + return matcher(path) + def tree(self, **kwargs): # type: (**Any) -> None """Render a tree view of the filesystem to stdout or a file. diff --git a/fs/errors.py b/fs/errors.py index 400bac76..32c795d9 100644 --- a/fs/errors.py +++ b/fs/errors.py @@ -41,6 +41,7 @@ "OperationFailed", "OperationTimeout", "PathError", + "PatternError", "PermissionDenied", "RemoteConnectionError", "RemoveRootError", @@ -346,3 +347,19 @@ class UnsupportedHash(ValueError): not supported by hashlib. """ + + +class PatternError(ValueError): + """A string pattern with invalid syntax was given.""" + + default_message = "pattern '{pattern}' is invalid at position {position}" + + def __init__(self, pattern, position, exc=None, msg=None): # noqa: D107 + # type: (Text, int, Optional[Exception], Optional[Text]) -> None + self.pattern = pattern + self.position = position + self.exc = exc + super(ValueError, self).__init__() + + def __reduce__(self): + return type(self), (self.path, self.position, self.exc, self._msg) diff --git a/fs/glob.py b/fs/glob.py index cd6473d2..4e783652 100644 --- a/fs/glob.py +++ b/fs/glob.py @@ -4,49 +4,135 @@ from __future__ import unicode_literals import typing +from functools import partial import re from collections import namedtuple -from . import wildcard from ._repr import make_repr from .lrucache import LRUCache from .path import iteratepath + GlobMatch = namedtuple("GlobMatch", ["path", "info"]) Counts = namedtuple("Counts", ["files", "directories", "data"]) LineCounts = namedtuple("LineCounts", ["lines", "non_blank"]) if typing.TYPE_CHECKING: - from typing import Iterator, List, Optional, Pattern, Text, Tuple - + from typing import ( + Iterator, + List, + Optional, + Pattern, + Text, + Tuple, + Iterable, + Callable, + ) from .base import FS _PATTERN_CACHE = LRUCache( 1000 -) # type: LRUCache[Tuple[Text, bool], Tuple[int, bool, Pattern]] +) # type: LRUCache[Tuple[Text, bool], Tuple[Optional[int], Pattern]] + + +def _split_pattern_by_sep(pattern): + # type: (Text) -> List[Text] + """Split a glob pattern at its directory seperators (/). + + Takes into account escaped cases like [/]. + """ + indices = [-1] + bracket_open = False + for i, c in enumerate(pattern): + if c == "/" and not bracket_open: + indices.append(i) + elif c == "[": + bracket_open = True + elif c == "]": + bracket_open = False + + indices.append(len(pattern)) + return [pattern[i + 1 : j] for i, j in zip(indices[:-1], indices[1:])] + + +def _translate(pattern): + # type: (Text) -> Text + """Translate a glob pattern without '**' to a regular expression. + + There is no way to quote meta-characters. + + Arguments: + pattern (str): A glob pattern. + + Returns: + str: A regex equivalent to the given pattern. + + """ + i, n = 0, len(pattern) + res = [] + while i < n: + c = pattern[i] + i = i + 1 + if c == "*": + if i < n and pattern[i] == "*": + raise ValueError("glob._translate does not support '**' patterns.") + res.append("[^/]*") + elif c == "?": + res.append("[^/]") + elif c == "[": + j = i + if j < n and pattern[j] == "!": + j = j + 1 + if j < n and pattern[j] == "]": + j = j + 1 + while j < n and pattern[j] != "]": + j = j + 1 + if j >= n: + res.append("\\[") + else: + stuff = pattern[i:j].replace("\\", "\\\\") + i = j + 1 + if stuff[0] == "!": + stuff = "^/" + stuff[1:] + elif stuff[0] == "^": + stuff = "\\" + stuff + res.append("[%s]" % stuff) + else: + res.append(re.escape(c)) + return "".join(res) + +def _translate_glob(pattern): + # type: (Text) -> Tuple[Optional[int], Text] + """Translate a glob pattern to a regular expression. -def _translate_glob(pattern, case_sensitive=True): - levels = 0 + There is no way to quote meta-characters. + + Arguments: + pattern (str): A glob pattern. + + Returns: + Tuple[Optional[int], Text]: The first component describes the levels + of depth this glob pattern goes to; basically the number of "/" in + the pattern. If there is a "**" in the glob pattern, the depth is + basically unbounded, and this component is `None` instead. + The second component is the regular expression. + + """ recursive = False re_patterns = [""] for component in iteratepath(pattern): - if component == "**": - re_patterns.append(".*/?") + if "**" in component: recursive = True + split = component.split("**") + split_re = [_translate(s) for s in split] + re_patterns.append("/?" + ".*/?".join(split_re)) else: - re_patterns.append( - "/" + wildcard._translate(component, case_sensitive=case_sensitive) - ) - levels += 1 + re_patterns.append("/" + _translate(component)) re_glob = "(?ms)^" + "".join(re_patterns) + ("/$" if pattern.endswith("/") else "$") - return ( - levels, - recursive, - re.compile(re_glob, 0 if case_sensitive else re.IGNORECASE), - ) + return pattern.count("/") + 1 if not recursive else None, re_glob def match(pattern, path): @@ -68,10 +154,13 @@ def match(pattern, path): """ try: - levels, recursive, re_pattern = _PATTERN_CACHE[(pattern, True)] + levels, re_pattern = _PATTERN_CACHE[(pattern, True)] except KeyError: - levels, recursive, re_pattern = _translate_glob(pattern, case_sensitive=True) - _PATTERN_CACHE[(pattern, True)] = (levels, recursive, re_pattern) + levels, re_str = _translate_glob(pattern) + re_pattern = re.compile(re_str) + _PATTERN_CACHE[(pattern, True)] = (levels, re_pattern) + if path and path[0] != "/": + path = "/" + path return bool(re_pattern.match(path)) @@ -88,13 +177,101 @@ def imatch(pattern, path): """ try: - levels, recursive, re_pattern = _PATTERN_CACHE[(pattern, False)] + levels, re_pattern = _PATTERN_CACHE[(pattern, False)] except KeyError: - levels, recursive, re_pattern = _translate_glob(pattern, case_sensitive=True) - _PATTERN_CACHE[(pattern, False)] = (levels, recursive, re_pattern) + levels, re_str = _translate_glob(pattern) + re_pattern = re.compile(re_str, re.IGNORECASE) + _PATTERN_CACHE[(pattern, False)] = (levels, re_pattern) + if path and path[0] != "/": + path = "/" + path return bool(re_pattern.match(path)) +def match_any(patterns, path): + # type: (Iterable[Text], Text) -> bool + """Test if a path matches any of a list of patterns. + + Will return `True` if ``patterns`` is an empty list. + + Arguments: + patterns (list): A list of wildcard pattern, e.g ``["*.py", + "*.pyc"]`` + path (str): A resource path. + + Returns: + bool: `True` if the path matches at least one of the patterns. + + """ + if not patterns: + return True + return any(match(pattern, path) for pattern in patterns) + + +def imatch_any(patterns, path): + # type: (Iterable[Text], Text) -> bool + """Test if a path matches any of a list of patterns (case insensitive). + + Will return `True` if ``patterns`` is an empty list. + + Arguments: + patterns (list): A list of wildcard pattern, e.g ``["*.py", + "*.pyc"]`` + path (str): A resource path. + + Returns: + bool: `True` if the path matches at least one of the patterns. + + """ + if not patterns: + return True + return any(imatch(pattern, path) for pattern in patterns) + + +def get_matcher(patterns, case_sensitive, accept_prefix=False): + # type: (Iterable[Text], bool, bool) -> Callable[[Text], bool] + """Get a callable that matches paths against the given patterns. + + Arguments: + patterns (list): A list of wildcard pattern. e.g. ``["*.py", + "*.pyc"]`` + case_sensitive (bool): If ``True``, then the callable will be case + sensitive, otherwise it will be case insensitive. + accept_prefix (bool): If ``True``, the name is + not required to match the patterns themselves + but only need to be a prefix of a string that does. + + Returns: + callable: a matcher that will return `True` if the paths given as + an argument matches any of the given patterns, or if no patterns + exist. + + Example: + >>> from fs import glob + >>> is_python = glob.get_matcher(['*.py'], True) + >>> is_python('__init__.py') + True + >>> is_python('foo.txt') + False + + """ + if not patterns: + return lambda path: True + + if accept_prefix: + new_patterns = [] + for pattern in patterns: + split = _split_pattern_by_sep(pattern) + for i in range(1, len(split)): + new_pattern = "/".join(split[:i]) + new_patterns.append(new_pattern) + new_patterns.append(new_pattern + "/") + new_patterns.append(pattern) + patterns = new_patterns + + matcher = match_any if case_sensitive else imatch_any + return partial(matcher, patterns) + + class Globber(object): """A generator of glob results.""" @@ -143,18 +320,15 @@ def __repr__(self): def _make_iter(self, search="breadth", namespaces=None): # type: (str, List[str]) -> Iterator[GlobMatch] try: - levels, recursive, re_pattern = _PATTERN_CACHE[ - (self.pattern, self.case_sensitive) - ] + levels, re_pattern = _PATTERN_CACHE[(self.pattern, self.case_sensitive)] except KeyError: - levels, recursive, re_pattern = _translate_glob( - self.pattern, case_sensitive=self.case_sensitive - ) + levels, re_str = _translate_glob(self.pattern) + re_pattern = re.compile(re_str, 0 if self.case_sensitive else re.IGNORECASE) for path, info in self.fs.walk.info( path=self.path, namespaces=namespaces or self.namespaces, - max_depth=None if recursive else levels, + max_depth=levels, search=search, exclude_dirs=self.exclude_dirs, ): diff --git a/fs/walk.py b/fs/walk.py index 31dc6b0e..b743e6f2 100644 --- a/fs/walk.py +++ b/fs/walk.py @@ -60,6 +60,8 @@ def __init__( filter_dirs=None, # type: Optional[List[Text]] exclude_dirs=None, # type: Optional[List[Text]] max_depth=None, # type: Optional[int] + filter_glob=None, # type: Optional[List[Text]] + exclude_glob=None, # type: Optional[List[Text]] ): # type: (...) -> None """Create a new `Walker` instance. @@ -82,12 +84,23 @@ def __init__( a list of filename patterns, e.g. ``["~*"]``. Files matching any of these patterns will be removed from the walk. filter_dirs (list, optional): A list of patterns that will be used - to match directories paths. The walk will only open directories - that match at least one of these patterns. + to match directories names. The walk will only open directories + that match at least one of these patterns. Directories will + only be returned if the final component matches one of the + patterns. exclude_dirs (list, optional): A list of patterns that will be used to filter out directories from the walk. e.g. - ``['*.svn', '*.git']``. + ``['*.svn', '*.git']``. Directory names matching any of these + patterns will be removed from the walk. max_depth (int, optional): Maximum directory depth to walk. + filter_glob (list, optional): If supplied, this parameter + should be a list of path patterns e.g. ``["foo/**/*.py"]``. + Resources will only be returned if their global path or + an extension of it matches one of the patterns. + exclude_glob (list, optional): If supplied, this parameter + should be a list of path patterns e.g. ``["foo/**/*.pyc"]``. + Resources will not be returned if their global path or + an extension of it matches one of the patterns. """ if search not in ("breadth", "depth"): @@ -107,6 +120,8 @@ def __init__( self.exclude = exclude self.filter_dirs = filter_dirs self.exclude_dirs = exclude_dirs + self.filter_glob = filter_glob + self.exclude_glob = exclude_glob self.max_depth = max_depth super(Walker, self).__init__() @@ -178,6 +193,8 @@ def __repr__(self): filter_dirs=(self.filter_dirs, None), exclude_dirs=(self.exclude_dirs, None), max_depth=(self.max_depth, None), + filter_glob=(self.filter_glob, None), + exclude_glob=(self.exclude_glob, None), ) def _iter_walk( @@ -196,10 +213,19 @@ def _iter_walk( def _check_open_dir(self, fs, path, info): # type: (FS, Text, Info) -> bool """Check if a directory should be considered in the walk.""" + full_path = combine(path, info.name) if self.exclude_dirs is not None and fs.match(self.exclude_dirs, info.name): return False + if self.exclude_glob is not None and fs.match_glob( + self.exclude_glob, full_path + ): + return False if self.filter_dirs is not None and not fs.match(self.filter_dirs, info.name): return False + if self.filter_glob is not None and not fs.match_glob( + self.filter_glob, full_path, accept_prefix=True + ): + return False return self.check_open_dir(fs, path, info) def check_open_dir(self, fs, path, info): @@ -245,6 +271,26 @@ def check_scan_dir(self, fs, path, info): """ return True + def _check_file(self, fs, dir_path, info): + # type: (FS, Text, Info) -> bool + """Check if a filename should be included.""" + # Weird check required for backwards compatibility, + # when _check_file did not exist. + if Walker._check_file == type(self)._check_file: + if self.exclude is not None and fs.match(self.exclude, info.name): + return False + if self.exclude_glob is not None and fs.match_glob( + self.exclude_glob, dir_path + "/" + info.name + ): + return False + if self.filter is not None and not fs.match(self.filter, info.name): + return False + if self.filter_glob is not None and not fs.match_glob( + self.filter_glob, dir_path + "/" + info.name, accept_prefix=True + ): + return False + return self.check_file(fs, info) + def check_file(self, fs, info): # type: (FS, Info) -> bool """Check if a filename should be included. @@ -259,9 +305,7 @@ def check_file(self, fs, info): bool: `True` if the file should be included. """ - if self.exclude is not None and fs.match(self.exclude, info.name): - return False - return fs.match(self.filter, info.name) + return True def _scan( self, @@ -418,7 +462,7 @@ def _walk_breadth( _calculate_depth = self._calculate_depth _check_open_dir = self._check_open_dir _check_scan_dir = self._check_scan_dir - _check_file = self.check_file + _check_file = self._check_file depth = _calculate_depth(path) @@ -432,7 +476,7 @@ def _walk_breadth( if _check_scan_dir(fs, dir_path, info, _depth): push(_combine(dir_path, info.name)) else: - if _check_file(fs, info): + if _check_file(fs, dir_path, info): yield dir_path, info # Found a file yield dir_path, None # End of directory @@ -451,7 +495,7 @@ def _walk_depth( _calculate_depth = self._calculate_depth _check_open_dir = self._check_open_dir _check_scan_dir = self._check_scan_dir - _check_file = self.check_file + _check_file = self._check_file depth = _calculate_depth(path) stack = [ @@ -483,7 +527,7 @@ def _walk_depth( else: yield dir_path, info else: - if _check_file(fs, info): + if _check_file(fs, dir_path, info): yield dir_path, info diff --git a/fs/wildcard.py b/fs/wildcard.py index 7a34d6b7..cc6c0530 100644 --- a/fs/wildcard.py +++ b/fs/wildcard.py @@ -147,14 +147,14 @@ def _translate(pattern, case_sensitive=True): if not case_sensitive: pattern = pattern.lower() i, n = 0, len(pattern) - res = "" + res = [] while i < n: c = pattern[i] i = i + 1 if c == "*": - res = res + "[^/]*" + res.append("[^/]*") elif c == "?": - res = res + "." + res.append(".") elif c == "[": j = i if j < n and pattern[j] == "!": @@ -164,7 +164,7 @@ def _translate(pattern, case_sensitive=True): while j < n and pattern[j] != "]": j = j + 1 if j >= n: - res = res + "\\[" + res.append("\\[") else: stuff = pattern[i:j].replace("\\", "\\\\") i = j + 1 @@ -172,7 +172,7 @@ def _translate(pattern, case_sensitive=True): stuff = "^" + stuff[1:] elif stuff[0] == "^": stuff = "\\" + stuff - res = "%s[%s]" % (res, stuff) + res.append("[%s]" % stuff) else: - res = res + re.escape(c) - return res + res.append(re.escape(c)) + return "".join(res) diff --git a/tests/test_glob.py b/tests/test_glob.py index 51eb7779..9a5d8827 100644 --- a/tests/test_glob.py +++ b/tests/test_glob.py @@ -1,7 +1,10 @@ from __future__ import unicode_literals +import re import unittest +from parameterized import parameterized + from fs import glob, open_fs @@ -17,10 +20,11 @@ def setUp(self): fs.makedirs("a/b/c/").writetext("foo.py", "import fs") repr(fs.glob) - def test_match(self): - tests = [ + @parameterized.expand( + [ ("*.?y", "/test.py", True), ("*.py", "/test.py", True), + ("*.py", "__init__.py", True), ("*.py", "/test.pc", False), ("*.py", "/foo/test.py", False), ("foo/*.py", "/foo/test.py", True), @@ -28,21 +32,23 @@ def test_match(self): ("?oo/*.py", "/foo/test.py", True), ("*/*.py", "/foo/test.py", True), ("foo/*.py", "/bar/foo/test.py", False), + ("/foo/**", "/foo/test.py", True), ("**/foo/*.py", "/bar/foo/test.py", True), ("foo/**/bar/*.py", "/foo/bar/test.py", True), ("foo/**/bar/*.py", "/foo/baz/egg/bar/test.py", True), ("foo/**/bar/*.py", "/foo/baz/egg/bar/egg/test.py", False), ("**", "/test.py", True), + ("/**", "/test.py", True), ("**", "/test", True), ("**", "/test/", True), ("**/", "/test/", True), ("**/", "/test.py", False), ] - for pattern, path, expected in tests: - self.assertEqual(glob.match(pattern, path), expected) + ) + def test_match(self, pattern, path, expected): + self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path)) # Run a second time to test cache - for pattern, path, expected in tests: - self.assertEqual(glob.match(pattern, path), expected) + self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path)) def test_count_1dir(self): globber = glob.BoundGlobber(self.fs) @@ -96,3 +102,49 @@ def test_remove_all(self): globber = glob.BoundGlobber(self.fs) globber("**").remove() self.assertEqual(sorted(self.fs.listdir("/")), []) + + translate_test_cases = [ + ("foo.py", ["foo.py"], ["Foo.py", "foo_py", "foo", ".py"]), + ("foo?py", ["foo.py", "fooapy"], ["foo/py", "foopy", "fopy"]), + ("bar/foo.py", ["bar/foo.py"], []), + ("bar?foo.py", ["barafoo.py"], ["bar/foo.py"]), + ("???.py", ["foo.py", "bar.py", "FOO.py"], [".py", "foo.PY"]), + ("bar/*.py", ["bar/.py", "bar/foo.py"], ["bar/foo"]), + ("bar/foo*.py", ["bar/foo.py", "bar/foobaz.py"], ["bar/foo", "bar/.py"]), + ("*/[bar]/foo.py", ["/b/foo.py", "x/a/foo.py", "/r/foo.py"], ["b/foo.py", "/bar/foo.py"]), + ("[!bar]/foo.py", ["x/foo.py"], ["//foo.py"]), + ("[.py", ["[.py"], [".py", "."]), + ] + + @parameterized.expand(translate_test_cases) + def test_translate(self, glob_pattern, expected_matches, expected_not_matches): + translated = glob._translate(glob_pattern) + for m in expected_matches: + self.assertTrue(re.match(translated, m)) + for m in expected_not_matches: + self.assertFalse(re.match(translated, m)) + + @parameterized.expand(translate_test_cases) + def test_translate_glob_simple(self, glob_pattern, expected_matches, expected_not_matches): + levels, translated = glob._translate_glob(glob_pattern) + self.assertEqual(levels, glob_pattern.count("/") + 1) + for m in expected_matches: + self.assertTrue(re.match(translated, "/" + m)) + for m in expected_not_matches: + self.assertFalse(re.match(translated, m)) + self.assertFalse(re.match(translated, "/" + m)) + + @parameterized.expand( + [ + ("foo/**/bar", ["/foo/bar", "/foo/baz/bar", "/foo/baz/qux/bar"], ["/foo"]), + ("**/*/bar", ["/foo/bar", "/foo/bar"], ["/bar", "/bar"]), + ("/**/foo/**/bar", ["/baz/foo/qux/bar", "/foo/bar"], ["/bar"]), + ] + ) + def test_translate_glob(self, glob_pattern, expected_matches, expected_not_matches): + levels, translated = glob._translate_glob(glob_pattern) + self.assertIsNone(levels) + for m in expected_matches: + self.assertTrue(re.match(translated, m)) + for m in expected_not_matches: + self.assertFalse(re.match(translated, m)) diff --git a/tests/test_walk.py b/tests/test_walk.py index 5e05d054..e0146f2d 100644 --- a/tests/test_walk.py +++ b/tests/test_walk.py @@ -21,9 +21,28 @@ def test_create(self): walk.Walker(ignore_errors=True, on_error=lambda path, error: True) walk.Walker(ignore_errors=True) + def test_on_error_invalid(self): + with self.assertRaises(TypeError): + walk.Walker(on_error="nope") + -class TestWalk(unittest.TestCase): +class TestBoundWalkerBase(unittest.TestCase): def setUp(self): + """ + Sets up the following file system with empty files: + + / + -foo1/ + - -top1.txt + - -top2.txt + -foo2/ + - -bar1/ + - -bar2/ + - - -bar3/ + - - - -test.txt + - -top3.bin + -foo3/ + """ self.fs = MemoryFS() self.fs.makedir("foo1") @@ -37,21 +56,50 @@ def setUp(self): self.fs.create("foo2/bar2/bar3/test.txt") self.fs.create("foo2/top3.bin") - def test_invalid(self): - with self.assertRaises(ValueError): - self.fs.walk(search="random") +class TestBoundWalker(TestBoundWalkerBase): def test_repr(self): repr(self.fs.walk) - def test_walk(self): + def test_readonly_wrapper_uses_same_walker(self): + class CustomWalker(walk.Walker): + @classmethod + def bind(cls, fs): + return walk.BoundWalker(fs, walker_class=CustomWalker) + + class CustomizedMemoryFS(MemoryFS): + walker_class = CustomWalker + + base_fs = CustomizedMemoryFS() + base_walker = base_fs.walk + self.assertEqual(base_walker.walker_class, CustomWalker) + + readonly_fs = read_only(CustomizedMemoryFS()) + readonly_walker = readonly_fs.walk + self.assertEqual(readonly_walker.walker_class, CustomWalker) + + +class TestWalk(TestBoundWalkerBase): + def _walk_step_names(self, *args, **kwargs): + """Performs a walk with the given arguments and returns a list of steps. + + Each step is a triple of the path, list of directory names, and list of file names. + """ _walk = [] - for step in self.fs.walk(): + for step in self.fs.walk(*args, **kwargs): self.assertIsInstance(step, walk.Step) path, dirs, files = step _walk.append( (path, [info.name for info in dirs], [info.name for info in files]) ) + return _walk + + def test_invalid_search(self): + with self.assertRaises(ValueError): + self.fs.walk(search="random") + + def test_walk_simple(self): + _walk = self._walk_step_names() expected = [ ("/", ["foo1", "foo2", "foo3"], []), ("/foo1", ["bar1"], ["top1.txt", "top2.txt"]), @@ -63,14 +111,34 @@ def test_walk(self): ] self.assertEqual(_walk, expected) + def test_walk_filter(self): + _walk = self._walk_step_names(filter=["top*.txt"]) + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ("/foo1", ["bar1"], ["top1.txt", "top2.txt"]), + ("/foo2", ["bar2"], []), + ("/foo3", [], []), + ("/foo1/bar1", [], []), + ("/foo2/bar2", ["bar3"], []), + ("/foo2/bar2/bar3", [], []), + ] + self.assertEqual(_walk, expected) + + def test_walk_exclude(self): + _walk = self._walk_step_names(exclude=["top*"]) + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ("/foo1", ["bar1"], []), + ("/foo2", ["bar2"], []), + ("/foo3", [], []), + ("/foo1/bar1", [], []), + ("/foo2/bar2", ["bar3"], []), + ("/foo2/bar2/bar3", [], ["test.txt"]), + ] + self.assertEqual(_walk, expected) + def test_walk_filter_dirs(self): - _walk = [] - for step in self.fs.walk(filter_dirs=["foo*"]): - self.assertIsInstance(step, walk.Step) - path, dirs, files = step - _walk.append( - (path, [info.name for info in dirs], [info.name for info in files]) - ) + _walk = self._walk_step_names(filter_dirs=["foo*"]) expected = [ ("/", ["foo1", "foo2", "foo3"], []), ("/foo1", [], ["top1.txt", "top2.txt"]), @@ -79,14 +147,65 @@ def test_walk_filter_dirs(self): ] self.assertEqual(_walk, expected) + def test_walk_filter_glob_1(self): + _walk = self._walk_step_names(filter_glob=["/foo*/bar*/"]) + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ("/foo1", ["bar1"], []), + ("/foo2", ["bar2"], []), + ("/foo3", [], []), + ("/foo1/bar1", [], []), + ("/foo2/bar2", [], []), + ] + self.assertEqual(_walk, expected) + + def test_walk_filter_glob_2(self): + _walk = self._walk_step_names(filter_glob=["/foo*/bar**"]) + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ("/foo1", ["bar1"], []), + ("/foo2", ["bar2"], []), + ("/foo3", [], []), + ("/foo1/bar1", [], []), + ("/foo2/bar2", ["bar3"], []), + ("/foo2/bar2/bar3", [], ["test.txt"]), + ] + self.assertEqual(_walk, expected) + + def test_walk_filter_mix(self): + _walk = self._walk_step_names(filter_glob=["/foo2/bar**"], filter=["top1.txt"]) + expected = [ + ("/", ["foo2"], []), + ("/foo2", ["bar2"], []), + ("/foo2/bar2", ["bar3"], []), + ("/foo2/bar2/bar3", [], []), + ] + self.assertEqual(_walk, expected) + + def test_walk_exclude_dirs(self): + _walk = self._walk_step_names(exclude_dirs=["bar*", "foo2"]) + expected = [ + ("/", ["foo1", "foo3"], []), + ("/foo1", [], ["top1.txt", "top2.txt"]), + ("/foo3", [], []), + ] + self.assertEqual(_walk, expected) + + def test_walk_exclude_glob(self): + _walk = self._walk_step_names(exclude_glob=["**/top*", "test.txt"]) + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ("/foo1", ["bar1"], []), + ("/foo2", ["bar2"], []), + ("/foo3", [], []), + ("/foo1/bar1", [], []), + ("/foo2/bar2", ["bar3"], []), + ("/foo2/bar2/bar3", [], ["test.txt"]), + ] + self.assertEqual(_walk, expected) + def test_walk_depth(self): - _walk = [] - for step in self.fs.walk(search="depth"): - self.assertIsInstance(step, walk.Step) - path, dirs, files = step - _walk.append( - (path, [info.name for info in dirs], [info.name for info in files]) - ) + _walk = self._walk_step_names(search="depth") expected = [ ("/foo1/bar1", [], []), ("/foo1", ["bar1"], ["top1.txt", "top2.txt"]), @@ -98,14 +217,8 @@ def test_walk_depth(self): ] self.assertEqual(_walk, expected) - def test_walk_directory(self): - _walk = [] - for step in self.fs.walk("foo2"): - self.assertIsInstance(step, walk.Step) - path, dirs, files = step - _walk.append( - (path, [info.name for info in dirs], [info.name for info in files]) - ) + def test_walk_path(self): + _walk = self._walk_step_names("foo2") expected = [ ("/foo2", ["bar2"], ["top3.bin"]), ("/foo2/bar2", ["bar3"], []), @@ -113,34 +226,22 @@ def test_walk_directory(self): ] self.assertEqual(_walk, expected) - def test_walk_levels_1(self): - results = list(self.fs.walk(max_depth=1)) - self.assertEqual(len(results), 1) - dirs = sorted(info.name for info in results[0].dirs) - self.assertEqual(dirs, ["foo1", "foo2", "foo3"]) - files = sorted(info.name for info in results[0].files) - self.assertEqual(files, []) + def test_walk_max_depth_1_breadth(self): + _walk = self._walk_step_names(max_depth=1, search="breadth") + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ] + self.assertEqual(_walk, expected) - def test_walk_levels_1_depth(self): - results = list(self.fs.walk(max_depth=1, search="depth")) - self.assertEqual(len(results), 1) - dirs = sorted(info.name for info in results[0].dirs) - self.assertEqual(dirs, ["foo1", "foo2", "foo3"]) - files = sorted(info.name for info in results[0].files) - self.assertEqual(files, []) + def test_walk_max_depth_1_depth(self): + _walk = self._walk_step_names(max_depth=1, search="depth") + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ] + self.assertEqual(_walk, expected) - def test_walk_levels_2(self): - _walk = [] - for step in self.fs.walk(max_depth=2): - self.assertIsInstance(step, walk.Step) - path, dirs, files = step - _walk.append( - ( - path, - sorted(info.name for info in dirs), - sorted(info.name for info in files), - ) - ) + def test_walk_max_depth_2(self): + _walk = self._walk_step_names(max_depth=2) expected = [ ("/", ["foo1", "foo2", "foo3"], []), ("/foo1", ["bar1"], ["top1.txt", "top2.txt"]), @@ -149,6 +250,30 @@ def test_walk_levels_2(self): ] self.assertEqual(_walk, expected) + +class TestDirs(TestBoundWalkerBase): + def test_walk_dirs(self): + dirs = list(self.fs.walk.dirs()) + self.assertEqual( + dirs, + ["/foo1", "/foo2", "/foo3", "/foo1/bar1", "/foo2/bar2", "/foo2/bar2/bar3"], + ) + + dirs = list(self.fs.walk.dirs(search="depth")) + self.assertEqual( + dirs, + ["/foo1/bar1", "/foo1", "/foo2/bar2/bar3", "/foo2/bar2", "/foo2", "/foo3"], + ) + + dirs = list(self.fs.walk.dirs(search="depth", exclude_dirs=["foo2"])) + self.assertEqual(dirs, ["/foo1/bar1", "/foo1", "/foo3"]) + + def test_foo(self): + dirs = list(self.fs.walk.dirs(search="depth", exclude_dirs=["foo2"])) + self.assertEqual(dirs, ["/foo1/bar1", "/foo1", "/foo3"]) + + +class TestFiles(TestBoundWalkerBase): def test_walk_files(self): files = list(self.fs.walk.files()) @@ -173,22 +298,6 @@ def test_walk_files(self): ], ) - def test_walk_dirs(self): - dirs = list(self.fs.walk.dirs()) - self.assertEqual( - dirs, - ["/foo1", "/foo2", "/foo3", "/foo1/bar1", "/foo2/bar2", "/foo2/bar2/bar3"], - ) - - dirs = list(self.fs.walk.dirs(search="depth")) - self.assertEqual( - dirs, - ["/foo1/bar1", "/foo1", "/foo2/bar2/bar3", "/foo2/bar2", "/foo2", "/foo3"], - ) - - dirs = list(self.fs.walk.dirs(search="depth", exclude_dirs=["foo2"])) - self.assertEqual(dirs, ["/foo1/bar1", "/foo1", "/foo3"]) - def test_walk_files_filter(self): files = list(self.fs.walk.files(filter=["*.txt"])) @@ -209,6 +318,16 @@ def test_walk_files_filter(self): self.assertEqual(files, []) + def test_walk_files_filter_glob(self): + files = list(self.fs.walk.files(filter_glob=["/foo2/**"])) + self.assertEqual( + files, + [ + "/foo2/top3.bin", + "/foo2/bar2/bar3/test.txt", + ], + ) + def test_walk_files_exclude(self): # Test exclude argument works files = list(self.fs.walk.files(exclude=["*.txt"])) @@ -222,25 +341,6 @@ def test_walk_files_exclude(self): files = list(self.fs.walk.files(exclude=["*"])) self.assertEqual(files, []) - def test_walk_info(self): - walk = [] - for path, info in self.fs.walk.info(): - walk.append((path, info.is_dir, info.name)) - - expected = [ - ("/foo1", True, "foo1"), - ("/foo2", True, "foo2"), - ("/foo3", True, "foo3"), - ("/foo1/top1.txt", False, "top1.txt"), - ("/foo1/top2.txt", False, "top2.txt"), - ("/foo1/bar1", True, "bar1"), - ("/foo2/bar2", True, "bar2"), - ("/foo2/top3.bin", False, "top3.bin"), - ("/foo2/bar2/bar3", True, "bar3"), - ("/foo2/bar2/bar3/test.txt", False, "test.txt"), - ] - self.assertEqual(walk, expected) - def test_broken(self): original_scandir = self.fs.scandir @@ -257,10 +357,6 @@ def broken_scandir(path, namespaces=None): with self.assertRaises(FSError): list(self.fs.walk.files(on_error=lambda path, error: False)) - def test_on_error_invalid(self): - with self.assertRaises(TypeError): - walk.Walker(on_error="nope") - def test_subdir_uses_same_walker(self): class CustomWalker(walk.Walker): @classmethod @@ -284,19 +380,32 @@ class CustomizedMemoryFS(MemoryFS): self.assertEqual(sub_walker.walker_class, CustomWalker) six.assertCountEqual(self, ["/c", "/d"], sub_walker.files()) - def test_readonly_wrapper_uses_same_walker(self): + def test_check_file_overwrite(self): class CustomWalker(walk.Walker): - @classmethod - def bind(cls, fs): - return walk.BoundWalker(fs, walker_class=CustomWalker) + def check_file(self, fs, info): + return False - class CustomizedMemoryFS(MemoryFS): - walker_class = CustomWalker + walker = CustomWalker() + files = list(walker.files(self.fs)) + self.assertEqual(files, []) - base_fs = CustomizedMemoryFS() - base_walker = base_fs.walk - self.assertEqual(base_walker.walker_class, CustomWalker) - readonly_fs = read_only(CustomizedMemoryFS()) - readonly_walker = readonly_fs.walk - self.assertEqual(readonly_walker.walker_class, CustomWalker) +class TestInfo(TestBoundWalkerBase): + def test_walk_info(self): + walk = [] + for path, info in self.fs.walk.info(): + walk.append((path, info.is_dir, info.name)) + + expected = [ + ("/foo1", True, "foo1"), + ("/foo2", True, "foo2"), + ("/foo3", True, "foo3"), + ("/foo1/top1.txt", False, "top1.txt"), + ("/foo1/top2.txt", False, "top2.txt"), + ("/foo1/bar1", True, "bar1"), + ("/foo2/bar2", True, "bar2"), + ("/foo2/top3.bin", False, "top3.bin"), + ("/foo2/bar2/bar3", True, "bar3"), + ("/foo2/bar2/bar3/test.txt", False, "test.txt"), + ] + self.assertEqual(walk, expected)