-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy path_repl.py
170 lines (136 loc) · 5.3 KB
/
_repl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from __future__ import annotations
import sys
from typing import Any, MutableMapping, cast
import click
from prompt_toolkit.history import InMemoryHistory
from ._completer import ClickCompleter
from .core import ReplContext
from .exceptions import ClickExit # type: ignore[attr-defined]
from .exceptions import CommandLineParserError, ExitReplException, InvalidGroupFormat
from .globals_ import ISATTY, get_current_repl_ctx
from .utils import _execute_internal_and_sys_cmds
__all__ = ["bootstrap_prompt", "register_repl", "repl"]
def bootstrap_prompt(
group: click.MultiCommand,
prompt_kwargs: dict[str, Any],
ctx: click.Context,
) -> dict[str, Any]:
"""
Bootstrap prompt_toolkit kwargs or use user defined values.
:param group: click.MultiCommand object
:param prompt_kwargs: The user specified prompt kwargs.
"""
defaults = {
"history": InMemoryHistory(),
"completer": ClickCompleter(group, ctx=ctx),
"message": "> ",
}
defaults.update(prompt_kwargs)
return defaults
def repl(
old_ctx: click.Context,
prompt_kwargs: dict[str, Any] = {},
allow_system_commands: bool = True,
allow_internal_commands: bool = True,
) -> None:
"""
Start an interactive shell. All subcommands are available in it.
:param old_ctx: The current Click context.
:param prompt_kwargs: Parameters passed to
:py:func:`prompt_toolkit.PromptSession`.
If stdin is not a TTY, no prompt will be printed, but only commands read
from stdin.
"""
group_ctx = old_ctx
# Switching to the parent context that has a Group as its command
# as a Group acts as a CLI for all of its subcommands
if old_ctx.parent is not None and not isinstance(
old_ctx.command, click.MultiCommand
):
group_ctx = old_ctx.parent
group = cast(click.MultiCommand, group_ctx.command)
# An Optional click.Argument in the CLI Group, that has no value
# will consume the first word from the REPL input, causing issues in
# executing the command
# So, if there's an empty Optional Argument
for param in group.params:
if (
isinstance(param, click.Argument)
and group_ctx.params[param.name] is None # type: ignore[index]
and not param.required
):
raise InvalidGroupFormat(
f"{type(group).__name__} '{group.name}' requires value for "
f"an optional argument '{param.name}' in REPL mode"
)
# Delete the REPL command from those available, as we don't want to allow
# nesting REPLs (note: pass `None` to `pop` as we don't want to error if
# REPL command already not present for some reason).
repl_command_name = old_ctx.command.name
available_commands: MutableMapping[str, click.Command] = {}
if isinstance(group, click.CommandCollection):
available_commands = {
cmd_name: source.get_command(group_ctx, cmd_name) # type: ignore[misc]
for source in group.sources
for cmd_name in source.list_commands(group_ctx)
}
elif isinstance(group, click.Group):
available_commands = group.commands
original_command = available_commands.pop(repl_command_name, None) # type: ignore
repl_ctx = ReplContext(
group_ctx,
bootstrap_prompt(group, prompt_kwargs, group_ctx),
get_current_repl_ctx(silent=True),
)
if ISATTY:
# If stdin is a TTY, prompt the user for input using PromptSession.
def get_command() -> str:
return repl_ctx.session.prompt() # type: ignore
else:
# If stdin is not a TTY, read input from stdin directly.
def get_command() -> str:
inp = sys.stdin.readline().strip()
repl_ctx._history.append(inp)
return inp
with repl_ctx:
while True:
try:
command = get_command()
except KeyboardInterrupt:
continue
except EOFError:
break
if not command:
if ISATTY:
continue
else:
break
try:
args = _execute_internal_and_sys_cmds(
command, allow_internal_commands, allow_system_commands
)
if args is None:
continue
except CommandLineParserError:
continue
except ExitReplException:
break
try:
# The group command will dispatch based on args.
old_protected_args = group_ctx.protected_args
try:
group_ctx.protected_args = args
group.invoke(group_ctx)
finally:
group_ctx.protected_args = old_protected_args
except click.ClickException as e:
e.show()
except (ClickExit, SystemExit):
pass
except ExitReplException:
break
if original_command is not None:
available_commands[repl_command_name] = original_command # type: ignore[index]
def register_repl(group: click.Group, name="repl") -> None:
"""Register :func:`repl()` as sub-command *name* of *group*."""
group.command(name=name)(click.pass_context(repl))