Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dippy/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class HandlerContext:
"""Context passed to handlers."""

tokens: list[str]
opaque_positions: frozenset[int] = frozenset()


@dataclass(frozen=True)
Expand Down
119 changes: 116 additions & 3 deletions src/dippy/cli/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,98 @@
}


SECRET_RESOURCES = frozenset({"secret", "secrets"})

SAFE_OUTPUT_FORMATS = frozenset({"name", "wide"})

# Flags (after the verb) that consume the next token as a value
_POST_VERB_FLAGS_WITH_ARG = frozenset(
{
"-o",
"--output",
"-n",
"--namespace",
"-l",
"--selector",
"-f",
"--filename",
"--field-selector",
"--sort-by",
"--template",
"--context",
"--cluster",
}
)


def _is_secret_data_exposure(
tokens: list[str],
rest: list[str],
opaque_positions: frozenset[int] = frozenset(),
rest_offset: int = 0,
) -> bool:
"""Check if a get command targets secrets with a data-exposing output format.

Scans rest for the resource type and full tokens for -o (which can appear
before or after the verb). When opaque_positions is provided, conservatively
flags commands where opaque tokens could expand to secret resources or
data-exposing formats.
"""
# Find resource type: first non-flag token in rest
resource_type = None
resource_abs_pos = None
i = 0
while i < len(rest):
token = rest[i]
if token in _POST_VERB_FLAGS_WITH_ARG:
i += 2
continue
if token.startswith("-"):
i += 1
continue
resource_type = token
resource_abs_pos = rest_offset + i
break

if resource_type is None:
return False

# If resource position is opaque, it could expand to "secret" (or anything)
if resource_abs_pos in opaque_positions:
return True

# Handle comma-separated resources (e.g., "secret,configmap") and
# type/name syntax (e.g., "secret/my-secret")
parts = resource_type.split(",")
if not any(p.split("/")[0] in SECRET_RESOURCES for p in parts):
return False

# Resource IS secrets -- if any remaining token is opaque, it could inject
# a data-exposing format like -o yaml
if _has_opaque_after(opaque_positions, resource_abs_pos + 1):
return True

# Find output format from full token list (-o can appear before or after verb)
output_format = None
for j, token in enumerate(tokens):
if token in ("-o", "--output") and j + 1 < len(tokens):
output_format = tokens[j + 1]
break
if token.startswith("--output="):
output_format = token[len("--output=") :]
break
if len(token) > 2 and token[:2] == "-o" and token[2] != "-":
output_format = token[2:]
break

if output_format is None:
return False

# Extract format name before any = (e.g., "jsonpath='{.data}'" -> "jsonpath")
format_name = output_format.split("=")[0]
return format_name not in SAFE_OUTPUT_FORMATS


def _extract_exec_inner_command(tokens: list[str]) -> list[str] | None:
"""Extract command from kubectl exec args (after -- separator)."""
try:
Expand All @@ -108,9 +200,15 @@ def _extract_exec_inner_command(tokens: list[str]) -> list[str] | None:
return None # No -- separator


def _has_opaque_after(opaque_positions: frozenset[int], start: int) -> bool:
"""Check if any token position >= start is opaque."""
return any(p >= start for p in opaque_positions)


def classify(ctx: HandlerContext) -> Classification:
"""Classify kubectl command."""
tokens = ctx.tokens
opaque = ctx.opaque_positions
base = tokens[0] if tokens else "kubectl"
if len(tokens) < 2:
return Classification("ask", description=base)
Expand Down Expand Up @@ -147,23 +245,38 @@ def classify(ctx: HandlerContext) -> Classification:
return Classification("ask", description=base)

rest = tokens[action_idx + 1 :] if action_idx + 1 < len(tokens) else []
rest_offset = action_idx + 1
desc = f"{base} {action}"

# Check for subcommands first
# Check for subcommands first (config/auth/rollout)
if action in SAFE_SUBCOMMANDS and rest:
for token in rest:
for idx, token in enumerate(rest):
if not token.startswith("-"):
abs_pos = rest_offset + idx
if abs_pos in opaque:
return Classification("ask", description=desc)
if token in SAFE_SUBCOMMANDS[action]:
# config view --raw exposes unredacted kubeconfig credentials
if action == "config" and token == "view":
if "--raw" in rest or _has_opaque_after(opaque, abs_pos + 1):
return Classification("ask", description=f"{desc} {token}")
return Classification("allow", description=f"{desc} {token}")
break

if action in UNSAFE_SUBCOMMANDS and rest:
for token in rest:
for idx, token in enumerate(rest):
if not token.startswith("-"):
abs_pos = rest_offset + idx
if abs_pos in opaque:
return Classification("ask", description=desc)
if token in UNSAFE_SUBCOMMANDS[action]:
return Classification("ask", description=f"{desc} {token}")
break

# Sensitive data checks (before blanket safe-action approval)
if action == "get" and _is_secret_data_exposure(tokens, rest, opaque, rest_offset):
return Classification("ask", description=f"{desc} (secret data)")

# Simple safe actions
if action in SAFE_ACTIONS:
return Classification("allow", description=desc)
Expand Down
42 changes: 37 additions & 5 deletions src/dippy/core/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,18 @@ def _analyze_command(

# Get base command for injection check
words = [_get_word_value(w) for w in node.words]
# Pre-compute which word positions have runtime-determined values (opaque).
# The parser identifies these as cmdsub ($(...), `...`), param ($VAR, ${VAR}),
# or param-indirect (${!var}) parts. A word consisting entirely of one such
# expansion is opaque — handlers can't statically analyze its value.
_OPAQUE_PART_KINDS = {"cmdsub", "param", "param-indirect"}
opaque_positions = frozenset(
i
for i, word in enumerate(node.words)
if len(getattr(word, "parts", [])) == 1
and getattr(getattr(word, "parts", [None])[0], "kind", None)
in _OPAQUE_PART_KINDS
)
# Skip env var assignments to find base command
base_idx = 0
while (
Expand Down Expand Up @@ -287,7 +299,14 @@ def _analyze_command(
and position > base_idx
):
handler = get_handler(base)
outer_result = handler.classify(HandlerContext(words[base_idx:]))
adjusted_opaque = frozenset(
p - base_idx for p in opaque_positions if p >= base_idx
)
outer_result = handler.classify(
HandlerContext(
words[base_idx:], opaque_positions=adjusted_opaque
)
)
if outer_result.action != "allow":
inner_cmd = _get_word_value(word).strip("$()")
return Decision("ask", f"cmdsub injection risk: {inner_cmd}")
Expand Down Expand Up @@ -319,7 +338,9 @@ def _analyze_command(
decisions.append(Decision("allow", "conditional test"))
return _combine(decisions)

cmd_decision = _analyze_simple_command(words, config, cwd, remote=remote)
cmd_decision = _analyze_simple_command(
words, config, cwd, remote=remote, opaque_positions=opaque_positions
)
decisions.append(cmd_decision)

return _combine(decisions)
Expand Down Expand Up @@ -382,7 +403,12 @@ def _analyze_redirects(


def _analyze_simple_command(
words: list[str], config: Config, cwd: Path, *, remote: bool = False
words: list[str],
config: Config,
cwd: Path,
*,
remote: bool = False,
opaque_positions: frozenset[int] = frozenset(),
) -> Decision:
"""Analyze a simple command (list of words)."""
if not words:
Expand All @@ -398,6 +424,7 @@ def _analyze_simple_command(

base = words[i]
tokens = words[i:]
adjusted_opaque = frozenset(p - i for p in opaque_positions if p >= i)

# 1. Check config rules first (highest priority)
from dippy.core.config import SimpleCommand, match_command
Expand Down Expand Up @@ -434,7 +461,10 @@ def _analyze_simple_command(
break

if j < len(tokens):
return _analyze_simple_command(tokens[j:], config, cwd, remote=remote)
wrapper_opaque = frozenset(p - j for p in adjusted_opaque if p >= j)
return _analyze_simple_command(
tokens[j:], config, cwd, remote=remote, opaque_positions=wrapper_opaque
)
return Decision("ask", base)

# 3. Simple safe commands
Expand All @@ -448,7 +478,9 @@ def _analyze_simple_command(
# 5. CLI-specific handlers
handler = get_handler(base)
if handler:
result = handler.classify(HandlerContext(tokens))
result = handler.classify(
HandlerContext(tokens, opaque_positions=adjusted_opaque)
)
desc = result.description or get_description(tokens, base)
# Check handler-provided redirect targets against config (skip in remote mode)
if result.redirect_targets and not remote:
Expand Down
54 changes: 54 additions & 0 deletions tests/cli/test_kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
("kubectl get all -A", True),
("kubectl get configmaps", True),
("kubectl get secrets", True),
("kubectl get secret my-secret", True), # default table view, no values
("kubectl get ingress", True),
("kubectl get pv", True),
("kubectl get pvc", True),
Expand Down Expand Up @@ -205,6 +206,59 @@
("kubectl expose deployment nginx --port=80 --target-port=8080", False),
("kubectl expose pod nginx --port=80 --type=NodePort", False),
#
# kubectl get secret - sensitive data exposure
# Output formats that could expose secret values require confirmation
#
("kubectl get secret my-secret -o yaml", False),
("kubectl get secret my-secret -o json", False),
("kubectl get secrets -o yaml", False),
("kubectl get secrets -o json", False),
("kubectl get secret my-secret -o jsonpath='{.data.password}'", False),
("kubectl get secret my-secret -o go-template='{{.data}}'", False),
(
"kubectl get secret my-secret -o custom-columns=NAME:.metadata.name,DATA:.data",
False,
),
("kubectl get secret my-secret --output=yaml", False),
("kubectl get secret my-secret --output json", False),
("kubectl get secret/my-secret -o yaml", False), # type/name syntax
("kubectl get secret,configmap -o yaml", False), # comma-separated includes secret
("kubectl -n kube-system get secret my-secret -o yaml", False), # flags before verb
("kubectl get secret my-secret -o name", True), # -o name is safe (no values)
("kubectl get secret my-secret -o wide", True), # -o wide is safe (no values)
("kubectl describe secret my-secret", True), # describe never shows values
("kubectl get pods -o yaml", True), # non-secret resource is fine
("kubectl get configmap my-config -o yaml", True), # non-secret resource is fine
#
# kubectl get secret - opaque tokens (cmdsubs, param expansions) in arguments
#
(
'kubectl get secret somesecret $(echo "-o yaml")',
False,
), # cmdsub could inject format
("kubectl get secret somesecret `echo '-o yaml'`", False), # backtick cmdsub
("kubectl get $(echo secret) -o yaml", False), # cmdsub resource, could be secrets
("kubectl get $RESOURCE -o yaml", False), # param expansion resource
("kubectl get secret somesecret -o $FORMAT", False), # param expansion format
("kubectl get secret somesecret -o ${FORMAT}", False), # braced param expansion
("kubectl get pods $(echo '-o yaml')", True), # pods aren't secrets, always safe
("kubectl get pods -o $FORMAT", True), # pods aren't secrets, always safe
#
# kubectl config view --raw - exposes unredacted kubeconfig credentials
#
("kubectl config view --raw", False),
("kubectl config view --raw --minify", False),
('kubectl config view $(echo "--raw")', False), # cmdsub could be --raw
("kubectl config view $RAW_FLAG", False), # param expansion could be --raw
#
# kubectl config/auth/rollout - opaque subcommands
#
('kubectl config $(echo "set-context") production', False), # opaque subcommand
('kubectl describe $(echo "pod nginx")', True), # describe is always safe
("kubectl describe $RESOURCE", True), # describe is always safe
('kubectl logs $(echo "nginx")', True), # logs is always safe
("kubectl logs $POD", True), # logs is always safe
#
# kubectl exec - delegation to inner command
#
("kubectl exec pod -- cat /etc/passwd", True), # cat is safe
Expand Down
Loading