diff --git a/django_mongodb_cli/repo.py b/django_mongodb_cli/repo.py index c696aae..a3eaacd 100644 --- a/django_mongodb_cli/repo.py +++ b/django_mongodb_cli/repo.py @@ -4,6 +4,8 @@ from .utils import Package, Repo, Test repo = typer.Typer() +repo_remote = typer.Typer() +repo.add_typer(repo_remote, name="remote") def repo_command( @@ -33,52 +35,153 @@ def main( list_repos: bool = typer.Option( False, "--list-repos", "-l", help="List available repositories." ), + quiet: bool = typer.Option( + False, "--quiet", "-q", help="Suppress output messages." + ), ): if list_repos: Repo().list_repos() raise typer.Exit() # End, no further action + ctx.ensure_object(dict) + ctx.obj["quiet"] = quiet + if ctx.invoked_subcommand is None: typer.echo(ctx.get_help()) raise typer.Exit() +@repo_remote.callback(invoke_without_command=True) +def remote( + ctx: typer.Context, + repo_name: str = typer.Argument(None), + all_repos: bool = typer.Option( + False, "--all-repos", "-a", help="Show remotes of all repositories" + ), +): + """ + Show the git remotes for the specified repository. + If --all-repos is used, show remotes for all repositories. + """ + repo = Repo() + repo.ctx = ctx + repo.ctx.obj["repo_name"] = repo_name + repo_command( + all_repos, + repo_name, + all_msg="Showing remotes for all repositories...", + missing_msg="Please specify a repository name or use -a,--all-repos to show remotes of all repositories.", + single_func=lambda repo_name: repo.get_repo_remote(repo_name), + all_func=lambda repo_name: repo.get_repo_remote(repo_name), + ) + + +@repo_remote.command("add") +def remote_add( + ctx: typer.Context, + remote_name: str = typer.Argument(..., help="Name of the remote to add"), + remote_url: str = typer.Argument(..., help="URL of the remote to add"), +): + """ + Add a git remote to the specified repository. + """ + repo = Repo() + repo.ctx = ctx + repo.remote_add(remote_name, remote_url) + + +@repo_remote.command("remove") +def remote_remove( + ctx: typer.Context, + remote_name: str = typer.Argument(..., help="Name of the remote to remove"), +): + """ + Remove a git remote from the specified repository. + """ + repo = Repo() + repo.ctx = ctx + repo.remote_remove(remote_name) + + @repo.command() def branch( + ctx: typer.Context, repo_name: str = typer.Argument(None), - branch_name: str = typer.Argument(None), + branch_name: str = typer.Argument(None, help="Branch name"), list_branches: bool = typer.Option( False, "--list-branches", "-l", help="List branches of the repository" ), all_repos: bool = typer.Option( False, "--all-repos", "-a", help="Show branches of all repositories" ), + delete_branch: bool = typer.Option( + False, "--delete-branch", "-d", help="Delete the specified branch" + ), + cloned_only: bool = typer.Option( + False, "--cloned-only", "-c", help="Show branches only for cloned repositories" + ), ): """ Checkout or create a branch in a repository. - If branch_name is provided, switch to that branch or create it if it doesn't exist. If --all-repos is used, show branches for all repositories. """ repo = Repo() - if branch_name: - repo.set_branch(branch_name) - - # else: - # typer.echo( - # typer.style( - # "Create or set branch cannot be used with --all-repos.", - # fg=typer.colors.RED, - # ) - # ) - # return - + repo.ctx = ctx + repo_list = repo.map + if delete_branch and branch_name: + repo.delete_branch(repo_name, branch_name) + raise typer.Exit() + if cloned_only: + _, fs_repos = repo._list_repos() + repo_list = sorted(fs_repos) repo_command( all_repos, repo_name, - all_msg="Showing branches for all repositories...", - missing_msg="Please specify a repository name or use --all-repos to show branches of all repositories.", - single_func=lambda repo_name: repo.get_repo_branch(repo_name), - all_func=lambda repo_name: repo.get_repo_branch(repo_name), + all_msg=None, + missing_msg="Please specify a repository name or use -a,--all-repos to show branches of all repositories.", + single_func=lambda repo_name: repo.get_repo_branch(repo_name, branch_name), + all_func=lambda repo_name: repo.get_repo_branch(repo_name, branch_name), + repo_list=repo_list, + ) + + +@repo.command() +def cd( + repo_name: str = typer.Argument(None), +): + """ + Change directory to the specified repository. + """ + + repo_command( + False, + repo_name, + all_msg=None, + missing_msg="Please specify a repository name.", + single_func=Repo().cd_repo, + all_func=Repo().cd_repo, + ) + + +@repo.command() +def checkout( + repo_name: str = typer.Argument(None), + branch_name: str = typer.Argument(None, help="Branch name to checkout"), +): + """ + Checkout a branch in a repository. + """ + + def checkout_branch(name): + Repo().checkout_branch(repo_name, branch_name) + + repo_command( + False, + repo_name, + all_msg=None, + missing_msg="Please specify a repository and branch name.", + single_func=checkout_branch, + all_func=checkout_branch, ) @@ -107,7 +210,7 @@ def clone_repo(name): all_repos, repo_name, all_msg="Cloning all repositories...", - missing_msg="Please specify a repository name or use --all-repos to clone all repositories.", + missing_msg="Please specify a repository name or use -a,--all-repos to clone all repositories.", single_func=clone_repo, all_func=clone_repo, ) @@ -144,7 +247,7 @@ def do_commit(name): @repo.command() -def delete( +def rm( repo_name: str = typer.Argument(None), all_repos: bool = typer.Option( False, "--all-repos", "-a", help="Delete all repositories" @@ -168,13 +271,34 @@ def do_delete(name): all_repos, repo_name, all_msg="Deleting all repositories...", - missing_msg="Please specify a repository name or use --all-repos to delete all repositories.", + missing_msg="Please specify a repository name or use -a,--all-repos to delete all repositories.", single_func=do_delete, all_func=do_delete, fg=typer.colors.RED, # Red for delete ) +@repo.command() +def diff( + repo_name: str = typer.Argument(None), + all_repos: bool = typer.Option( + False, "--all-repos", "-a", help="Show diffs of all repositories" + ), +): + """ + Show the git diff for the specified repository. + If --all-repos is used, show diffs for all repositories. + """ + repo_command( + all_repos, + repo_name, + all_msg="Showing diffs for all repositories...", + missing_msg="Please specify a repository name or use -a,--all-repos to show diffs of all repositories.", + single_func=lambda repo_name: Repo().get_repo_diff(repo_name), + all_func=lambda repo_name: Repo().get_repo_diff(repo_name), + ) + + @repo.command() def install( repo_name: str = typer.Argument(None), @@ -190,7 +314,7 @@ def install( all_repos, repo_name, all_msg="Installing all repositories...", - missing_msg="Please specify a repository name or use --all-repos to install all repositories.", + missing_msg="Please specify a repository name or use -a,--all-repos to install all repositories.", single_func=lambda repo_name: Package().install_package(repo_name), all_func=lambda repo_name: Package().install_package(repo_name), ) @@ -238,39 +362,6 @@ def open( ) -@repo.command() -def origin( - repo_name: str = typer.Argument(None), - repo_user: str = typer.Argument(None), - all_repos: bool = typer.Option( - False, "--all-repos", "-a", help="Show origin of all repositories" - ), -): - """ - Show or set the origin of a repository. - """ - repo = Repo() - if repo_user and all_repos: - typer.echo( - typer.style( - "Set origin cannot be used with --all-repos.", - fg=typer.colors.RED, - ) - ) - return - if repo_user: - repo.set_user(repo_user) - - repo_command( - all_repos, - repo_name, - all_msg="Showing origin for all repositories...", - missing_msg="Please specify a repository name or use --all-repos to show origins of all repositories.", - single_func=lambda name: repo.get_repo_origin(name), - all_func=lambda repo_name: repo.get_repo_origin(repo_name), - ) - - @repo.command() def patch( repo_name: str = typer.Argument(None), @@ -327,7 +418,7 @@ def reset_repo(name): all_repos, repo_name, all_msg="Resetting all repositories...", - missing_msg="Please specify a repository name or use --all-repos to reset all repositories.", + missing_msg="Please specify a repository name or use -a,--all-repos to reset all repositories.", single_func=reset_repo, all_func=reset_repo, ) @@ -335,6 +426,7 @@ def reset_repo(name): @repo.command() def status( + ctx: typer.Context, repo_name: str = typer.Argument(None), all_repos: bool = typer.Option( False, "--all-repos", "-a", help="Show status of all repos" @@ -345,11 +437,12 @@ def status( If --all-repos is used, show the status for all repositories. """ repo = Repo() + repo.ctx = ctx repo_command( all_repos, repo_name, all_msg="Showing status for all repositories...", - missing_msg="Please specify a repository name or use --all-repos to show all repositories.", + missing_msg="Please specify a repository name or use -a,--all-repos to show all repositories.", single_func=lambda repo_name: repo.get_repo_status(repo_name), all_func=lambda repo_name: repo.get_repo_status(repo_name), ) @@ -357,6 +450,7 @@ def status( @repo.command() def sync( + ctx: typer.Context, repo_name: str = typer.Argument(None), all_repos: bool = typer.Option( False, "--all-repos", "-a", help="Sync all repositories" @@ -367,6 +461,7 @@ def sync( If --all-repos is used, sync all repositories. """ repo = Repo() + repo.ctx = ctx if not repo.map: typer.echo( typer.style( @@ -380,7 +475,7 @@ def sync( all_repos, repo_name, all_msg="Syncing all repositories...", - missing_msg="Please specify a repository name or use --all-repos to sync all repositories.", + missing_msg="Please specify a repository name or use -a,--all-repos to sync all repositories.", single_func=lambda repo_name: repo.sync_repo(repo_name), all_func=lambda repo_name: repo.sync_repo(repo_name), ) @@ -388,6 +483,7 @@ def sync( @repo.command() def test( + ctx: typer.Context, repo_name: str = typer.Argument(None), modules: list[str] = typer.Argument(None), keep_db: bool = typer.Option( @@ -414,6 +510,7 @@ def test( If --setenv is used, set the DJANGO_SETTINGS_MODULE environment variable. """ test_runner = Test() + test_runner.ctx = ctx if modules: test_runner.set_modules(modules) if keep_db: diff --git a/django_mongodb_cli/utils.py b/django_mongodb_cli/utils.py index 5d3db21..b6b2244 100644 --- a/django_mongodb_cli/utils.py +++ b/django_mongodb_cli/utils.py @@ -1,14 +1,14 @@ -import toml -import typer import os import re import shutil import subprocess import tempfile +from pathlib import Path -from git import Repo as GitRepo +import toml +import typer from git import GitCommandError -from pathlib import Path +from git import Repo as GitRepo URL_PATTERN = re.compile(r"git\+ssh://(?:[^@]+@)?([^/]+)/([^@]+)") BRANCH_PATTERN = re.compile( @@ -26,382 +26,381 @@ class Repo: def __init__(self, pyproject_file: Path = Path("pyproject.toml")): self.pyproject_file = pyproject_file self.config = self._load_config() - self.path = Path( - self.config.get("tool", {}).get("django-mongodb-cli", {}).get("path", ".") - ).resolve() + self._tool_cfg = self.config.get("tool", {}).get("django-mongodb-cli", {}) or {} + self.path = Path(self._tool_cfg.get("path", ".")).resolve() self.map = self.get_map() - self.branch = None self.user = None + self.reset = False + # ----------------------------- + # Core utilities / helpers + # ----------------------------- def _load_config(self) -> dict: return toml.load(self.pyproject_file) + def _msg(self, text: str, fg) -> None: + typer.echo(typer.style(text, fg=fg)) + + def info(self, text: str) -> None: + self._msg(text, typer.colors.CYAN) + + def warn(self, text: str) -> None: + self._msg(text, typer.colors.YELLOW) + + def ok(self, text: str) -> None: + self._msg(text, typer.colors.GREEN) + + def err(self, text: str) -> None: + self._msg(text, typer.colors.RED) + + def run(self, args, cwd: Path | str | None = None, check: bool = True) -> bool: + try: + subprocess.run(args, cwd=str(cwd) if cwd else None, check=check) + return True + except subprocess.CalledProcessError as e: + self.err(f"Command failed: {' '.join(str(a) for a in args)} ({e})") + return False + + def ensure_repo( + self, repo_name: str, must_exist: bool = True + ) -> tuple[Path | None, GitRepo | None]: + path = self.get_repo_path(repo_name) + if must_exist and not path.exists(): + if not self.ctx.obj.get("quiet", False): + self.err(f"Repository '{repo_name}' not found at path: {path}") + return None, None + repo = self.get_repo(str(path)) if path.exists() else None + return path, repo + + @property + def tool_cfg(self) -> dict: + return self._tool_cfg + + def test_cfg(self, repo_name: str) -> dict: + return self.tool_cfg.get("test", {}).get(repo_name, {}) or {} + + def evergreen_cfg(self, repo_name: str) -> dict: + return self.tool_cfg.get("evergreen", {}).get(repo_name, {}) or {} + + def origin_cfg(self) -> dict: + return self.tool_cfg.get("origin", {}) or {} + + @staticmethod + def parse_git_url(raw: str) -> tuple[str, str]: + m_branch = BRANCH_PATTERN.search(raw) + branch = m_branch.group(1) if m_branch else "main" + m_url = URL_PATTERN.search(raw) + url = m_url.group(0) if m_url else raw + return url, branch + + def copy_file( + self, src: str | Path, dst: str | Path, what: str, repo_name: str + ) -> None: + shutil.copyfile(src, dst) + self.info(f"Copied {what} from {src} to {dst} for {repo_name}.") + + # ----------------------------- + # Repo operations + # ----------------------------- + + def cd_repo(self, repo_name: str) -> None: + """ + Change directory to the specified repository. + """ + self.info(f"Changing directory to repository: {repo_name}") + path, _ = self.ensure_repo(repo_name) + if not path: + return + + try: + os.chdir(path) + self.ok(f"✅ Changed directory to {path}.") + subprocess.run(os.environ.get("SHELL", "/bin/zsh")) + except Exception as e: + self.err(f"❌ Failed to change directory: {e}") + + def checkout_branch(self, repo_name: str, branch_name) -> None: + _, repo = self.ensure_repo(repo_name) + try: + self.info(f"Checking out branch: {branch_name}") + repo.git.checkout(branch_name) + self.ok(branch_name) + except GitCommandError: + self.warn(f"Branch '{branch_name}' does not exist. Creating new branch.") + repo.git.checkout("-b", branch_name) + self.err(branch_name) + def clone_repo(self, repo_name: str) -> None: """ Clone a repository into the specified path. If the repository already exists, it will skip cloning. """ - typer.echo( - typer.style(f"Cloning repository: {repo_name}", fg=typer.colors.CYAN) - ) + self.info(f"Cloning repository: {repo_name}") if repo_name not in self.map: - typer.echo( - typer.style( - f"Repository '{repo_name}' not found in configuration.", - fg=typer.colors.RED, - ) - ) + self.err(f"Repository '{repo_name}' not found in configuration.") return - url = self.map[repo_name] - path = self.get_repo_path(repo_name) - branch = ( - BRANCH_PATTERN.search(url).group(1) - if BRANCH_PATTERN.search(url) - else "main" - ) - url = URL_PATTERN.search(url).group(0) - if os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' already exists at path: {path}", - fg=typer.colors.YELLOW, - ) - ) + raw = self.map[repo_name] + url, branch = self.parse_git_url(raw) + + path, _ = self.ensure_repo(repo_name, must_exist=False) + if not path: + return + if path.exists(): + self.warn(f"Repository '{repo_name}' already exists at path: {path}") return - typer.echo( - typer.style( - f"Cloning {url} into {path} (branch: {branch})", fg=typer.colors.CYAN - ) - ) - GitRepo.clone_from(url, path, branch=branch) + self.info(f"Cloning {url} into {path} (branch: {branch})") + GitRepo.clone_from(url, str(path), branch=branch) # Install pre-commit hooks if config exists - pre_commit_config = os.path.join(path, ".pre-commit-config.yaml") - if os.path.exists(pre_commit_config): - typer.echo( - typer.style("Installing pre-commit hooks...", fg=typer.colors.CYAN) - ) - try: - subprocess.run(["pre-commit", "install"], cwd=path, check=True) - typer.echo( - typer.style("Pre-commit hooks installed!", fg=typer.colors.GREEN) - ) - except subprocess.CalledProcessError as e: - typer.echo( - typer.style( - f"Failed to install pre-commit hooks for {repo_name}: {e}", - fg=typer.colors.RED, - ) - ) + pc_cfg = path / ".pre-commit-config.yaml" + if pc_cfg.exists(): + self.info("Installing pre-commit hooks...") + if self.run(["pre-commit", "install", "-t", "pre-commit"], cwd=path): + self.ok("Pre-commit hooks installed!") else: - typer.echo( - typer.style( - "No .pre-commit-config.yaml found. Skipping pre-commit hook installation.", - fg=typer.colors.YELLOW, - ) + self.warn( + "No .pre-commit-config.yaml found. Skipping pre-commit hook installation." ) + def _compose_commit_message(self, initial: str = "") -> str | None: + msg = initial.strip() + if msg: + return msg + editor = os.environ.get("EDITOR", "vi") + with tempfile.NamedTemporaryFile(suffix=".tmp", delete=False) as tf: + tf.write( + b"# Enter commit message. Lines starting with '#' will be ignored.\n" + ) + tf.flush() + temp_name = tf.name + try: + subprocess.call([editor, temp_name]) + with open(temp_name, "r", encoding="utf-8") as f: + lines = [ln for ln in f.readlines() if not ln.startswith("#")] + msg = "".join(lines).strip() + if not msg: + self.warn("Aborting commit due to empty commit message.") + return None + return msg + finally: + try: + os.unlink(temp_name) + except OSError: + pass + def commit_repo(self, repo_name: str, message: str = "") -> None: """ Commit changes to the specified repository with a commit message. If no message is given, open editor for the commit message. """ - typer.echo( - typer.style( - f"Committing changes to repository: {repo_name}", fg=typer.colors.CYAN - ) - ) - - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + self.info(f"Committing changes to repository: {repo_name}") + _, repo = self.ensure_repo(repo_name) + if not repo: return - if not message.strip(): - # Open editor - editor = os.environ.get("EDITOR", "vi") - with tempfile.NamedTemporaryFile(suffix=".tmp") as tf: - tf.write( - b"# Enter commit message. Lines starting with '#' will be ignored.\n" - ) - tf.flush() - subprocess.call([editor, tf.name]) - tf.seek(0) - # Read and filter lines - content = [] - for line in tf: - # skip comment lines like in git - if not line.decode().startswith("#"): - content.append(line.decode()) - # Join lines, strip whitespace - message = "".join(content).strip() - if not message: - typer.echo( - typer.style( - "Aborting commit due to empty commit message.", - fg=typer.colors.YELLOW, - ) - ) - return + msg = self._compose_commit_message(message) + if msg is None: + return - repo = self.get_repo(path) - repo.git.add(A=True) - repo.git.commit(m=message) + try: + repo.git.add(A=True) + repo.git.commit(m=msg) + self.ok("✅ Commit created.") + except GitCommandError as e: + self.err(f"❌ Failed to commit changes: {e}") def create_pr(self, repo_name: str) -> None: """ Create a pull request for the specified repository. """ - typer.echo( - typer.style( - f"Creating pull request for repository: {repo_name}", - fg=typer.colors.CYAN, - ) - ) - - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + self.info(f"Creating pull request for repository: {repo_name}") + path, repo = self.ensure_repo(repo_name) + if not repo or not path: return - repo = self.get_repo(path) try: repo.git.push("origin", repo.active_branch.name) - subprocess.run( - ["gh", "pr", "create"], - check=True, - cwd=path, - ) - typer.echo( - typer.style( - f"✅ Pull request created for {repo_name}.", fg=typer.colors.GREEN - ) - ) - except subprocess.CalledProcessError as e: - typer.echo( - typer.style( - f"❌ Failed to create pull request: {e}", fg=typer.colors.RED - ) - ) + except GitCommandError as e: + self.err(f"❌ Failed to push branch: {e}") + return + + if self.run(["gh", "pr", "create"], cwd=path): + self.ok(f"✅ Pull request created for {repo_name}.") + + def delete_branch(self, repo_name: str, branch_name: str) -> None: + """ + Delete the specified branch from the repository. + """ + self.info(f"Deleting branch '{branch_name}' from repository: {repo_name}") + _, repo = self.ensure_repo(repo_name) + if not repo: + return + try: + repo.git.branch("-D", branch_name) + self.ok(f"✅ Successfully deleted branch '{branch_name}' from {repo_name}.") + except GitCommandError as e: + self.err(f"❌ Failed to delete branch '{branch_name}': {e}") def delete_repo(self, repo_name: str) -> None: """ Delete the specified repository. """ - typer.echo( - typer.style(f"Deleting repository: {repo_name}", fg=typer.colors.CYAN) - ) - - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + self.info(f"Deleting repository: {repo_name}") + path, _ = self.ensure_repo(repo_name) + if not path: return - try: shutil.rmtree(path) - typer.echo( - typer.style( - f"✅ Successfully deleted {repo_name}.", fg=typer.colors.GREEN - ) - ) + self.ok(f"✅ Successfully deleted {repo_name}.") except Exception as e: - typer.echo( - typer.style( - f"❌ Failed to delete {repo_name}: {e}", fg=typer.colors.RED - ) - ) + self.err(f"❌ Failed to delete {repo_name}: {e}") def get_repo_log(self, repo_name: str) -> None: """ Get the commit log for the specified repository. """ - typer.echo( - typer.style( - f"Getting commit log for repository: {repo_name}", fg=typer.colors.CYAN - ) - ) - - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + self.info(f"Getting commit log for repository: {repo_name}") + _, repo = self.ensure_repo(repo_name) + if not repo: + return + try: + log_entries = repo.git.log( + "--pretty=format:%h - %an, %ar : %s", + "--abbrev-commit", + "--date=relative", + "--graph", + ).splitlines() + for entry in log_entries: + typer.echo(f" - {entry}") + except GitCommandError as e: + self.err(f"❌ Failed to get log: {e}") + + def get_repo_remote(self, repo_name: str) -> None: + """ + Get the remote URL of the specified repository. + """ + _, repo = self.ensure_repo(repo_name) + if not repo: return - repo = self.get_repo(path) - log_entries = repo.git.log( - "--pretty=format:%h - %an, %ar : %s", - "--abbrev-commit", - "--date=relative", - "--graph", - ).splitlines() - for entry in log_entries: - typer.echo(f" - {entry}") + quiet = self.ctx.obj.get("quiet", False) + if not quiet: + self.info(f"Remotes for {repo_name}:") + for remote in repo.remotes: + try: + prefix = "" if quiet else f"- {remote.name} -> " + self.ok(f"{prefix}{remote.url}") + except Exception as e: + self.err(f"Could not get remote URL: {e}") def get_map(self) -> dict: """ Return a dict mapping repo_name to repo_url from repos in [tool.django-mongodb-cli.repos]. """ - return { - repo.split("@", 1)[0].strip(): repo.split("@", 1)[1].strip() - for repo in self.config.get("tool", {}) - .get("django-mongodb-cli", {}) - .get("repos", []) - if "@" in repo - } + repos = self.tool_cfg.get("repos", []) or [] + result = {} + for repo in repos: + if "@" in repo: + name, url = repo.split("@", 1) + result[name.strip()] = url.strip() + return result - def get_repo_branch(self, repo_name: str) -> list: + def get_repo_branch(self, repo_name: str, branch_name: str) -> list: """ - Get the current branch of the specified repository. + Get branches for the specified repository. + If no branch is specified, return all local and remote branches. If the repository does not exist, return an empty list. + If a branch is specified, switch to it (if it exists) or create it (checkout -b). """ - path = self.get_repo_path(repo_name) - repo = self.get_repo(path) + _, repo = self.ensure_repo(repo_name) + if not repo: + return [] - # Checkout or create branch if self.branch is set - if self.branch: + if branch_name: + # Specific branch requested → switch to it (existing) or create new (checkout -b) + self.info(f"Switching to branch '{branch_name}' for {repo_name}") try: - typer.echo( - typer.style( - f"Checking out branch: {self.branch}", fg=typer.colors.CYAN - ) - ) - repo.git.checkout(self.branch) - return - except GitCommandError: - typer.echo( - typer.style( - f"Branch '{self.branch}' does not exist. Creating new branch.", - fg=typer.colors.YELLOW, - ) - ) - repo.git.checkout("-b", self.branch) - return + self.checkout_branch(repo_name, branch_name) + return [branch_name] + except Exception as e: + self.err(f"Could not switch/create branch '{branch_name}': {e}") + return [] + + # No specific branch requested → list local + remote branches + self.info(f"{repo_name}:") + try: + # Local branches + local_branches = [head.name for head in repo.heads] - typer.echo( - typer.style(f"Getting current branch for {repo_name}", fg=typer.colors.CYAN) - ) + # Remote branches + remote_branches = [] + for remote in repo.remotes: + remote_branches.extend([ref.name for ref in remote.refs]) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + # Combine & remove duplicates + all_branches = sorted(set(local_branches + remote_branches)) + except Exception as e: + self.warn(f"Could not list branches: {e}") return [] - current_branch = repo.active_branch.name - typer.echo( - typer.style( - f"Current branch for {repo_name}: {current_branch}", - fg=typer.colors.GREEN, - ) - ) - return [current_branch] + self.ok("\n".join(f"- {branch}" for branch in all_branches)) + return all_branches def get_repo_branches(self, repo_name: str) -> list: """ Get a list of both local and remote branches for the specified repository. Optionally, if self.branch is set, switch to it (existing) or create new (checkout -b). """ - - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + _, repo = self.ensure_repo(repo_name) + if not repo: return [] - repo = self.get_repo(path) - # Get local branches local_branches = [branch.name for branch in repo.branches] # Get remote branches; skip HEAD pointer - remote_branches = [ - ref.name.replace("origin/", "") - for ref in repo.remotes.origin.refs - if ref.name != "origin/HEAD" - ] - - typer.echo( - typer.style( - f"Getting branches for repository: {repo_name}", fg=typer.colors.CYAN - ) - ) + remote_branches = [] + try: + remote_branches = [ + ref.name.replace("origin/", "") + for ref in repo.remotes.origin.refs + if ref.name != "origin/HEAD" + ] + except Exception: + pass - # Merge, deduplicate, and sort + self.info(f"Getting branches for repository: {repo_name}") all_branches = sorted(set(local_branches + remote_branches)) - for name in sorted(all_branches): + for name in all_branches: typer.echo(f" - {name}") + return all_branches def get_repo_origin(self, repo_name: str) -> str: """ Get the origin URL of the specified repository. """ - typer.echo( - typer.style( - f"Getting origin for repository: {repo_name}", fg=typer.colors.CYAN - ) - ) - - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + self.info(f"Getting origin for repository: {repo_name}") + _, repo = self.ensure_repo(repo_name) + if not repo: return "" - repo = self.get_repo(path) origin_url = repo.remotes.origin.url - origin_users = ( - self.config.get("tool").get("django-mongodb-cli").get("origin", []) - ) - if repo_name in origin_users and self.user: - for user in origin_users[repo_name]: - if user.get("user") == self.user: - typer.echo( - typer.style( - f"Setting origin URL for {repo_name}: {origin_url}", - fg=typer.colors.GREEN, - ) - ) - origin_url = user.get("repo") - origin = repo.remotes.origin - origin.set_url(origin_url) - typer.echo( - typer.style( - f"Origin URL for {repo_name}: {origin_url}", fg=typer.colors.GREEN - ) - ) + overrides = self.origin_cfg().get(repo_name, []) + if self.user and isinstance(overrides, list): + for entry in overrides: + if entry.get("user") == self.user: + new_url = entry.get("repo") + if new_url: + repo.remotes.origin.set_url(new_url) + origin_url = new_url + self.ok(f"Setting origin URL for {repo_name}: {origin_url}") + break + + self.ok(f"Origin URL for {repo_name}: {origin_url}") + return origin_url def get_repo_path(self, repo_name: str) -> Path: return (self.path / repo_name).resolve() @@ -409,120 +408,95 @@ def get_repo_path(self, repo_name: str) -> Path: def get_repo(self, path: str) -> GitRepo: return GitRepo(path) - def get_repo_status(self, repo_name: str) -> str: + def get_repo_status(self, repo_name: str) -> None: """ Get the status of a repository. """ - - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + path, repo = self.ensure_repo(repo_name) + if not repo or not path: return - typer.echo( - typer.style( - f"Repository '{repo_name}' found at path: {path}", fg=typer.colors.GREEN - ) - ) - repo = self.get_repo(path) - typer.echo( - typer.style(f"On branch: {repo.active_branch}", fg=typer.colors.CYAN) - ) + + self.ok(f"Repository '{repo_name}' found at path: {path}") + self.info(f"On branch: {repo.active_branch}") + + # Show origin self.get_repo_origin(repo_name) + def get_repo_diff(self, repo_name: str) -> None: + """ + Get the diff of a repository. + """ + + self.get_repo_status(repo_name) + + path, repo = self.ensure_repo(repo_name) + if not repo or not path: + return + unstaged = repo.index.diff(None) if unstaged: - typer.echo( - typer.style("\nChanges not staged for commit:", fg=typer.colors.YELLOW) - ) + self.warn("\nChanges not staged for commit:") for diff in unstaged: - typer.echo( - typer.style(f" modified: {diff.a_path}", fg=typer.colors.YELLOW) - ) + self.warn(f" modified: {diff.a_path}") + staged = repo.index.diff("HEAD") if staged: - typer.echo(typer.style("\nChanges to be committed:", fg=typer.colors.GREEN)) + self.ok("\nChanges to be committed:") for diff in staged: - typer.echo( - typer.style(f" staged: {diff.a_path}", fg=typer.colors.GREEN) - ) + self.ok(f" staged: {diff.a_path}") + if repo.untracked_files: - typer.echo(typer.style("\nUntracked files:", fg=typer.colors.MAGENTA)) + self._msg("\nUntracked files:", typer.colors.MAGENTA) for f in repo.untracked_files: - typer.echo(typer.style(f" {f}", fg=typer.colors.MAGENTA)) - if not unstaged and not staged and not repo.untracked_files: - typer.echo( - typer.style( - "\nNothing to commit, working tree clean.", fg=typer.colors.GREEN - ) - ) - # Diff the working tree - working_tree_diff = repo.git.diff() - if working_tree_diff: - typer.echo( - typer.style("\nWorking tree differences:", fg=typer.colors.YELLOW) - ) - typer.echo(working_tree_diff) + self._msg(f" {f}", typer.colors.MAGENTA) - def list_repos(self) -> None: - """ - List all repositories found either in self.map or as directories in self.path. - """ - typer.echo(typer.style("Listing repositories...", fg=typer.colors.CYAN)) + if not unstaged and not staged and not repo.untracked_files: + self.ok("\nNothing to commit, working tree clean.") - # Set from self.map + try: + working_tree_diff = repo.git.diff() + if working_tree_diff: + self.warn("\nWorking tree differences:") + typer.echo(working_tree_diff) + except GitCommandError as e: + self.err(f"❌ Failed to diff working tree: {e}") + + def _list_repos(self) -> set: map_repos = set(self.map.keys()) - # Set from filesystem try: fs_entries = os.listdir(self.path) - fs_repos = { - entry - for entry in fs_entries - if os.path.isdir(os.path.join(self.path, entry)) - } + fs_repos = {entry for entry in fs_entries if (self.path / entry).is_dir()} except Exception as e: - typer.echo( - typer.style( - f"❌ Failed to list repositories in filesystem: {e}", - fg=typer.colors.RED, - ) - ) + self.err(f"❌ Failed to list repositories in filesystem: {e}") return - # Compute differences + return map_repos, fs_repos + + def list_repos(self) -> None: + """ + List all repositories found either in self.map or as directories in self.path. + """ + self.info("Listing repositories...") + + map_repos, fs_repos = self._list_repos() + only_in_map = map_repos - fs_repos only_in_fs = fs_repos - map_repos in_both = map_repos & fs_repos - # Output if in_both: - typer.echo( - typer.style( - "Repositories in pyproject.toml and on filesystem:", - fg=typer.colors.GREEN, - ) - ) + self.ok("Repositories in pyproject.toml and on filesystem:") for name in sorted(in_both): typer.echo(f" - {name}") if only_in_map: - typer.echo( - typer.style( - "Repositories only in pyproject.toml:", fg=typer.colors.YELLOW - ) - ) + self.ok("Repositories only in pyproject.toml:") for name in sorted(only_in_map): typer.echo(f" - {name}") if only_in_fs: - typer.echo( - typer.style("Repositories only on filesystem:", fg=typer.colors.MAGENTA) - ) + self._msg("Repositories only on filesystem:", typer.colors.MAGENTA) for name in sorted(only_in_fs): typer.echo(f" - {name}") @@ -533,108 +507,24 @@ def open_repo(self, repo_name: str) -> None: """ Open the specified repository with `gh browse` command. """ - typer.echo( - typer.style(f"Opening repository: {repo_name}", fg=typer.colors.CYAN) - ) - - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) - return - try: - subprocess.run( - ["gh", "browse"], - check=True, - cwd=path, - ) - typer.echo( - typer.style( - f"✅ Successfully opened {repo_name} in browser.", - fg=typer.colors.GREEN, - ) - ) - except subprocess.CalledProcessError as e: - typer.echo( - typer.style( - f"❌ Failed to open {repo_name} in browser: {e}", - fg=typer.colors.RED, - ) - ) - - def reset_repo(self, repo_name: str) -> None: - typer.echo( - typer.style(f"Resetting repository: {repo_name}", fg=typer.colors.CYAN) - ) - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + self.info(f"Opening repository: {repo_name}") + path, _ = self.ensure_repo(repo_name) + if not path: return - repo = self.get_repo(path) - repo.git.reset("--hard") - typer.echo( - typer.style( - f"✅ Repository {repo_name} has been reset.", fg=typer.colors.GREEN - ) - ) - def _list_tests(self, repo_name: str) -> None: - """ - List all tests for the specified repository. - """ - typer.echo( - typer.style( - f"Listing tests for repository: {repo_name}", fg=typer.colors.CYAN - ) - ) + if self.run(["gh", "browse"], cwd=path): + self.ok(f"✅ Successfully opened {repo_name} in browser.") - test_dir = ( - self.config.get("tool") - .get("django-mongodb-cli") - .get("test") - .get(repo_name) - .get("test_dir") - ) - if not os.path.exists(test_dir): - typer.echo( - typer.style( - f"Test directory '{test_dir}' does not exist for {repo_name}.", - fg=typer.colors.RED, - ) - ) + def reset_repo(self, repo_name: str) -> None: + self.info(f"Resetting repository: {repo_name}") + _, repo = self.ensure_repo(repo_name) + if not repo: return try: - test_files = [ - f - for f in os.listdir(test_dir) - if f.endswith(".py") and not f.startswith("__") - ] - if not test_files: - typer.echo( - typer.style( - f"No tests found in {test_dir} for {repo_name}.", - fg=typer.colors.YELLOW, - ) - ) - return - typer.echo(typer.style("Found tests:", fg=typer.colors.GREEN)) - for test_file in sorted(test_files): - typer.echo(f" - {test_file}") - except Exception as e: - typer.echo( - typer.style( - f"❌ Failed to list tests for {repo_name}: {e}", fg=typer.colors.RED - ) - ) + repo.git.reset("--hard") + self.ok(f"✅ Repository {repo_name} has been reset.") + except GitCommandError as e: + self.err(f"❌ Failed to reset {repo_name}: {e}") def set_branch(self, branch: str) -> None: self.branch = branch @@ -649,57 +539,55 @@ def sync_repo(self, repo_name: str) -> None: """ Synchronize the repository by pulling the latest changes and then pushing local changes. """ - typer.echo(typer.style("Synchronizing repository...", fg=typer.colors.CYAN)) - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + _, repo = self.ensure_repo(repo_name) + if not repo: return + try: - repo = self.get_repo(path) - typer.echo( - typer.style( - f"Pulling latest changes for {repo_name}...", fg=typer.colors.CYAN - ) - ) repo.remotes.origin.pull() - typer.echo( - typer.style( - f"✅ Successfully pulled latest changes for {repo_name}.", - fg=typer.colors.GREEN, - ) - ) - # Identify current branch + self.ok(f"✅ Successfully pulled latest changes for {repo_name}.") + current_branch = repo.active_branch.name - typer.echo( - typer.style( - f"Pushing local changes to origin/{current_branch}...", - fg=typer.colors.CYAN, - ) - ) repo.remotes.origin.push(refspec=current_branch) - typer.echo( - typer.style( - f"✅ Successfully pushed to origin/{current_branch}.", - fg=typer.colors.GREEN, - ) - ) - typer.echo( - typer.style( - f"✅ Repository {repo_name} is synchronized (pull & push complete).", - fg=typer.colors.GREEN, - ) - ) + self.ok(f"✅ Successfully pushed latest commits to {repo_name}.") except Exception as e: - typer.echo( - typer.style( - f"❌ Failed to synchronize {repo_name}: {e}", fg=typer.colors.RED - ) + self.err(f"❌ Failed to synchronize {repo_name}: {e}") + + def remote_add(self, remote_name: str, remote_url: str) -> None: + """ + Add a new remote to the specified repository. + """ + self.info( + f"Adding remote '{remote_name}' to repository: {self.ctx.obj.get('repo_name')}" + ) + _, repo = self.ensure_repo(self.ctx.obj.get("repo_name")) + if not repo: + return + + try: + repo.create_remote(remote_name, remote_url) + self.ok( + f"✅ Successfully added remote '{remote_name}' with URL '{remote_url}'." ) + except Exception as e: + self.err(f"❌ Failed to add remote '{remote_name}': {e}") + + def remote_remove(self, remote_name: str) -> None: + """ + Remove a remote from the specified repository. + """ + self.info( + f"Removing remote '{remote_name}' from repository: {self.ctx.obj.get('repo_name')}" + ) + _, repo = self.ensure_repo(self.ctx.obj.get("repo_name")) + if not repo: + return + + try: + repo.delete_remote(remote_name) + self.ok(f"✅ Successfully removed remote '{remote_name}'.") + except Exception as e: + self.err(f"❌ Failed to remove remote '{remote_name}': {e}") class Package(Repo): @@ -707,80 +595,32 @@ def install_package(self, repo_name: str) -> None: """ Install a package from the cloned repository. """ - typer.echo( - typer.style( - f"Installing package from repository: {repo_name}", fg=typer.colors.CYAN - ) - ) - - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + self.info(f"Installing package from repository: {repo_name}") + path, _ = self.ensure_repo(repo_name) + if not path: return - try: - subprocess.run( - [os.sys.executable, "-m", "pip", "install", "-e", path], - check=True, - ) - typer.echo( - typer.style( - f"✅ Successfully installed package from {repo_name}.", - fg=typer.colors.GREEN, - ) - ) - except subprocess.CalledProcessError as e: - typer.echo( - typer.style( - f"❌ Failed to install package from {repo_name}: {e}", - fg=typer.colors.RED, - ) - ) + install_dir = ( + self.tool_cfg.get("install", {}).get(repo_name, {}).get("install_dir") + ) + if install_dir: + path = Path(path / install_dir).resolve() + self.info(f"Using custom install directory: {path}") + + if self.run([os.sys.executable, "-m", "pip", "install", "-e", str(path)]): + self.ok(f"✅ Successfully installed package from {repo_name}.") def uninstall_package(self, repo_name: str) -> None: """ Uninstall a package from the cloned repository. """ - typer.echo( - typer.style( - f"Uninstalling package from repository: {repo_name}", - fg=typer.colors.CYAN, - ) - ) - - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + self.info(f"Uninstalling package from repository: {repo_name}") + path, _ = self.ensure_repo(repo_name) + if not path: return - try: - subprocess.run( - [os.sys.executable, "-m", "pip", "uninstall", "-y", repo_name], - check=True, - ) - typer.echo( - typer.style( - f"✅ Successfully uninstalled package from {repo_name}.", - fg=typer.colors.GREEN, - ) - ) - except subprocess.CalledProcessError as e: - typer.echo( - typer.style( - f"❌ Failed to uninstall package from {repo_name}: {e}", - fg=typer.colors.RED, - ) - ) + if self.run([os.sys.executable, "-m", "pip", "uninstall", "-y", repo_name]): + self.ok(f"✅ Successfully uninstalled package from {repo_name}.") class Test(Repo): @@ -798,176 +638,168 @@ def __init__(self, pyproject_file: Path = Path("pyproject.toml")): self.keyword = None self.setenv = False self.list_tests = False + self.test_settings = {} def copy_settings(self, repo_name: str) -> None: """ Copy test settings from this repository to the repository specified by repo_name. """ - source = self.test_settings["settings"]["test"]["source"] - target = self.test_settings["settings"]["test"]["target"] - shutil.copyfile(source, target) - typer.echo( - typer.style( - f"Copied test settings from {source} to {target} for {repo_name}.", - fg=typer.colors.CYAN, - ) - ) + settings = self.test_settings.get("settings") + if not settings: + self.warn("'settings' is missing in test_settings") + return + source = settings["test"]["source"] + target = settings["test"]["target"] + self.copy_file(source, target, "test settings", repo_name) def copy_apps(self, repo_name: str) -> None: """ - Copy test settings from this repository to the repository - specified by repo_name. + Copy apps file configuration for tests. """ - if "apps_file" not in self.test_settings: - typer.echo( - typer.style( - f"No apps_file settings found for {repo_name}.", - fg=typer.colors.YELLOW, - ) - ) + apps = self.test_settings.get("apps_file") + if not apps: + self.warn(f"No apps_file settings found for {repo_name}.") return - source = self.test_settings["apps_file"]["source"] - target = self.test_settings["apps_file"]["target"] - shutil.copyfile(source, target) - typer.echo( - typer.style( - f"Copied apps from {source} to {target} for {repo_name}.", - fg=typer.colors.CYAN, - ) - ) + self.copy_file(apps["source"], apps["target"], "apps", repo_name) def copy_migrations(self, repo_name: str) -> None: """ Copy migrations from this repository to the repository specified by repo_name. """ - source = self.test_settings["migrations_dir"]["source"] - target = self.test_settings["migrations_dir"]["target"] + migrations_dir = self.test_settings.get("migrations_dir") + if not migrations_dir: + self.warn("'migrations_dir' is missing in test_settings") + return + source = migrations_dir.get("source") + target = migrations_dir.get("target") + if not source or not target: + raise KeyError( + "'source' or 'target' is missing under 'migrations_dir' in test_settings" + ) + + self.info(f"Copying migrations from {source} to {target} for repo {repo_name}") if not os.path.exists(target): shutil.copytree(source, target) - typer.echo( - typer.style( - f"Copied migrations from {source} to {target} for {repo_name}.", - fg=typer.colors.CYAN, - ) - ) + self.info(f"Copied migrations from {source} to {target} for {repo_name}.") def patch_repo(self, repo_name: str) -> None: """ Run evergreen patching operations on the specified repository. """ - typer.echo( - typer.style( - f"Running `evergreen patch` for: {repo_name}", fg=typer.colors.CYAN - ) - ) - project_name = ( - self.config.get("tool", {}) - .get("django-mongodb-cli", {}) - .get("evergreen", {}) - .get(repo_name) - .get("project_name") - ) - subprocess.run( + self.info(f"Running `evergreen patch` for: {repo_name}") + project_name = self.evergreen_cfg(repo_name).get("project_name") + if not project_name: + self.err(f"❌ No evergreen project_name configured for {repo_name}.") + return + self.run( ["evergreen", "patch", "-p", project_name, "-u"], - check=True, cwd=self.get_repo_path(repo_name), ) + def _list_tests(self, repo_name: str) -> None: + """ + List all test files (recursively) and subdirectories for the specified repository. + """ + test_dir = self.tool_cfg.get("test", {}).get(repo_name, {}).get("test_dir") + + self.info(f"Listing tests for repository `{repo_name}` in `{test_dir}`:") + + if not test_dir or not os.path.exists(test_dir): + self.err(f"Test directory '{test_dir}' does not exist for {repo_name}.") + return + + try: + found_any = False + for root, dirs, files in os.walk(test_dir): + # Ignore __pycache__ dirs + dirs[:] = [d for d in dirs if not d.startswith("__")] + dirs.sort() + + rel_path = os.path.relpath(root, test_dir) + display_path = "." if rel_path == "." else rel_path + + test_files = [ + f for f in files if f.endswith(".py") and not f.startswith("__") + ] + quiet = self.ctx.obj.get("quiet", False) + + if not quiet or test_files: + self.ok(f"\n📂 {display_path}") + + if test_files: + found_any = True + for test_file in sorted(test_files): + typer.echo(f" - {test_file}") + else: + if not quiet: + typer.echo(" (no test files)") + + if not found_any: + self.warn( + f"No Python test files found in {test_dir} (including subdirectories) for {repo_name}." + ) + except Exception as e: + self.err(f"❌ Failed to list tests for {repo_name}: {e}") + def _run_tests(self, repo_name: str) -> None: - self.test_settings = ( - self.config.get("tool", {}) - .get("django-mongodb-cli", {}) - .get("test", {}) - .get(repo_name, {}) - ) + self.test_settings = self.test_cfg(repo_name) if not self.test_settings: - typer.echo( - typer.style( - f"No test settings found for {repo_name}.", fg=typer.colors.YELLOW - ) - ) + self.warn(f"No test settings found for {repo_name}.") return + test_dir = self.test_settings.get("test_dir") - if not os.path.exists(test_dir): - typer.echo( - typer.style( - f"Test directory '{test_dir}' does not exist for {repo_name}.", - fg=typer.colors.RED, - ) - ) + if not test_dir or not os.path.exists(test_dir): + self.err(f"Test directory '{test_dir}' does not exist for {repo_name}.") return + + # Prepare environment/files self.copy_apps(repo_name) self.copy_migrations(repo_name) self.copy_settings(repo_name) test_command_name = self.test_settings.get("test_command") test_command = [test_command_name] if test_command_name else ["pytest"] - test_options = self.test_settings.get("test_options") + + test_options = self.test_settings.get("test_options") or [] test_settings_module = ( self.test_settings.get("settings", {}).get("module", {}).get("test") ) - if test_command_name == "./runtests.py": + if test_command_name == "./runtests.py" and test_settings_module: test_command.extend(["--settings", test_settings_module]) + + if test_command_name == "pytest": + test_command.extend(["-v"]) + if test_options: test_command.extend(test_options) if self.keep_db: - test_command.extend("--keepdb") + test_command.extend(["--keepdb"]) if self.keyword: test_command.extend(["-k", self.keyword]) if self.modules: test_command.extend(self.modules) - typer.echo( - typer.style( - f"Running tests with command: {' '.join(test_command)}", - fg=typer.colors.CYAN, - ) - ) - subprocess.run( - test_command, - cwd=test_dir, - ) + + self.info(f"Running tests in {test_dir} with command: {' '.join(test_command)}") + subprocess.run(test_command, cwd=test_dir) def run_tests(self, repo_name: str) -> None: """ Run tests for the specified repository. """ - if self.list_tests: - typer.echo( - typer.style( - f"Listing tests for repository: {repo_name}", fg=typer.colors.CYAN - ) - ) self._list_tests(repo_name) return - typer.echo( - typer.style( - f"Running tests for repository: {repo_name}", fg=typer.colors.CYAN - ) - ) - - path = self.get_repo_path(repo_name) - if not os.path.exists(path): - typer.echo( - typer.style( - f"Repository '{repo_name}' not found at path: {path}", - fg=typer.colors.RED, - ) - ) + self.info(f"Running tests for repository: {repo_name}") + path, _ = self.ensure_repo(repo_name) + if not path: return self._run_tests(repo_name) - typer.echo( - typer.style( - f"✅ Tests completed successfully for {repo_name}.", - fg=typer.colors.GREEN, - ) - ) def set_modules(self, modules: list) -> None: self.modules = modules diff --git a/justfile b/justfile index 5ad8cef..fa9b806 100644 --- a/justfile +++ b/justfile @@ -9,12 +9,8 @@ alias i := install [group('git')] git-clone: dm repo clone django --install - dm repo clone django-mongodb-app dm repo clone django-mongodb-backend --install - dm repo origin django-mongodb-backend aclark4life - dm repo sync django-mongodb-backend - dm repo clone django-mongodb-extensions --install - dm repo clone django-mongodb-project + dm repo clone libmongocrypt --install dm repo clone mongo-python-driver --install # ---------------------------------------- django ---------------------------------------- diff --git a/pyproject.toml b/pyproject.toml index 0da43d2..f10eb45 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "pymongocrypt", "pymongo-auth-aws", "pytest", + "pytest-asyncio", "pytest-html", "python-webpack-boilerplate", "rich", @@ -84,8 +85,11 @@ repos = [ ] path = "src" +[tool.django-mongodb-cli.install.libmongocrypt] +install_dir = "bindings/python" + [tool.django-mongodb-cli.test.mongo-python-driver] -test_command = "just" +test_command = "pytest" test_dir = "src/mongo-python-driver/test" clone_dir = "src/mongo-python-driver" test_dirs = ["src/mongo-python-driver/test"] diff --git a/test/settings/django.py b/test/settings/django.py index 042a234..6f76df4 100644 --- a/test/settings/django.py +++ b/test/settings/django.py @@ -1,8 +1,10 @@ import os from bson.binary import Binary -from django_mongodb_backend import encryption, parse_uri -from pymongo.encryption import AutoEncryptionOpts + +# from django_mongodb_backend import encryption, parse_uri +from django_mongodb_backend import parse_uri +# from pymongo.encryption import AutoEncryptionOpts EXPECTED_ENCRYPTED_FIELDS_MAP = { "billing": { @@ -103,27 +105,29 @@ ] }, } -DATABASE_ROUTERS = [encryption.EncryptedRouter()] +# DATABASE_ROUTERS = [encryption.EncryptedRouter()] DATABASE_URL = os.environ.get("MONGODB_URI", "mongodb://localhost:27017") -KEY_VAULT_NAMESPACE = "encrypted.__keyvault" +# KEY_VAULT_NAMESPACE = "encrypted.__keyvault" DATABASES = { "default": parse_uri( DATABASE_URL, db_name="test", ), - "other": parse_uri( - DATABASE_URL, - options={ - "auto_encryption_opts": AutoEncryptionOpts( - key_vault_namespace=KEY_VAULT_NAMESPACE, - kms_providers=encryption.KMS_PROVIDERS, - # schema_map=EXPECTED_ENCRYPTED_FIELDS_MAP, - ) - }, - db_name="other", - ), + # "other": parse_uri( + # DATABASE_URL, + # options={ + # "auto_encryption_opts": AutoEncryptionOpts( + # key_vault_namespace=KEY_VAULT_NAMESPACE, + # kms_providers={ + # "local": {"key": encryption.KMS_CREDENTIALS["local"]["key"]} + # }, + # # schema_map=EXPECTED_ENCRYPTED_FIELDS_MAP, + # ) + # }, + # db_name="other", + # ), } -DATABASES["other"]["KMS_CREDENTIALS"] = encryption.KMS_CREDENTIALS +# DATABASES["other"]["KMS_CREDENTIALS"] = encryption.KMS_CREDENTIALS DEFAULT_AUTO_FIELD = "django_mongodb_backend.fields.ObjectIdAutoField" PASSWORD_HASHERS = ("django.contrib.auth.hashers.MD5PasswordHasher",)