Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ poetry run ruff format src/ tests/

# Type checking
poetry run mypy src/

Note: The file `py.typed` has added to the package and specified in pyproject.toml to ensure mypy treats
tkseal as a typed package and avoids "Skipping analyzing 'tkseal': found module but no type hints or library stubs" warnings.
```

## Available Commands
Expand All @@ -63,7 +66,48 @@ poetry run mypy src/
- 💻 `tkseal seal PATH` - Convert plain_secrets.json to sealed_secrets.json
-

## Previous logic documentation
## Logic documentation

### Forbidden Secrets Warning

**Core Functionality**

This application allows users to pull different kinds of Kubernetes secrets into their local Tanka environment.
However, certain types of secrets are considered forbidden for pulling due to their sensitive nature or
system management roles.

The pull command includes the `forbidden secrets warning` feature that prevents accidental exposure of sensitive
system secrets while keeping users informed about what's being filtered out when pulling secrets from a Kubernetes
namespace into a local Tanka environment using the `tkseal pull` command.

**Implementation Details**

The detection method involves checking the types of secrets present in the specified Kubernetes namespace against
a predefined list of forbidden secret types. These forbidden types typically include:

- `kubernetes.io/service-account-token`
- `helm.sh/release.v1`
- Any other secret types deemed sensitive or system-managed
- The forbidden and allowed secret types are defined in `/src/tkseal/configuration.py`

The implementation uses the following logic:

1. Fetch all secrets from the specified Kubernetes namespace using `kubectl`.
2. Iterate through each secret and check its type.
3. If a secret's type matches any in the forbidden list, it is flagged.
4. Collect all flagged secrets and prepare a warning message.

**Usage Example**
When a user runs `tkseal pull`, if there are forbidden secrets (like service-account-tokens, helm releases, etc.)
in the namespace:

`tkseal pull environments/testing/`

This shows how "plain_secrets.json" would change based on what's in the Kubernetes cluster

`⚠ Warning: Found forbidden secrets in namespace that cannot be pulled:
- default-token-abc (type: kubernetes.io/service-account-token)
- helm-release-v1 (type: helm.sh/release.v1)`

### ready Command

Expand Down Expand Up @@ -196,5 +240,3 @@ The command will:
2. Display diff of what would change
3. Ask for confirmation
4. Seal secrets to sealed_secrets.json


186 changes: 93 additions & 93 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ description = "A command-line utility for managing sealed secrets in Kubernetes
readme = "README.md"
authors = ["K'Ron Spar <[email protected]>", "Lianet Sepulveda Torres <[email protected]>"]
packages = [{include = "tkseal", from = "src"}]
include = ["src/tkseal/py.typed"]
keywords = ["kubernetes", "secrets", "sealed-secrets", "tanka", "k8s"]
classifiers = [
"Development Status :: 4 - Beta",
Expand Down Expand Up @@ -73,6 +74,8 @@ ignore = [
line-ending = "auto"

[tool.mypy]
mypy_path = "src"
files = "src/tkseal"
python_version = "3.12"
warn_return_any = true
warn_unused_configs = true
Expand Down
51 changes: 30 additions & 21 deletions src/tkseal/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,25 @@ def pull(path: str) -> None:
# Create SecretState from path
secret_state = SecretState.from_path(path)

# Create Pull instance and show differences
pull_obj = Pull(secret_state)
result = pull_obj.run()

# Check and warn about forbidden secrets
forbidden_secrets = secret_state.get_forbidden_secrets()
if forbidden_secrets:
click.secho(
"\nThese secrets are system-managed and will not be included in plain_secrets.json:",
fg="yellow",
)
for secret in forbidden_secrets:
click.secho(f" - {secret.name} (type: {secret.type})", fg="yellow")

# Show informational message
click.secho(
'This shows how "plain_secrets.json" would change based on what\'s in the Kubernetes cluster',
fg="yellow",
)

# Create Pull instance and show differences
pull_obj = Pull(secret_state)
result = pull_obj.run()

# Display diff results
if result.has_differences:
click.echo(result.diff_output)
Expand Down Expand Up @@ -130,26 +139,26 @@ def seal(path: str) -> None:
secret_state = SecretState.from_path(path)

# Show informational message
click.secho(
'This shows what would change in the cluster based on "plain_secrets.json"',
fg="yellow",
)
# click.secho(
# 'This shows what would change in the cluster based on "plain_secrets.json"',
# fg="yellow",
# )

# Show diff to preview changes
diff_obj = Diff(secret_state)
result = diff_obj.plain()
# diff_obj = Diff(secret_state)
# result = diff_obj.plain()

# Display diff results
if result.has_differences:
click.echo(result.diff_output)

# Confirm before sealing
if click.confirm("Are you sure?"):
seal_obj = Seal(secret_state)
seal_obj.run()
click.echo("Successfully sealed secrets to sealed_secrets.json")
else:
click.echo("No differences")
# if result.has_differences:
# click.echo(result.diff_output)

# Confirm before sealing
if click.confirm("Are you sure?"):
seal_obj = Seal(secret_state)
seal_obj.run()
click.echo("Successfully sealed secrets to sealed_secrets.json")
# else:
# click.echo("No differences")

except TKSealError as e:
click.echo(f"Error: {e}", err=True)
Expand Down
21 changes: 21 additions & 0 deletions src/tkseal/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,24 @@

# File name for sealed (encrypted) secrets JSON file
SEALED_SECRETS_FILE = "sealed_secrets.json"

# Allowed secret types that tkseal can manage
MANAGED_SECRET_TYPES = {
"Opaque", # Standard application secrets
"kubernetes.io/basic-auth", # HTTP basic auth
"kubernetes.io/ssh-auth", # SSH keys
}

# Allowed secret types that tkseal can manage but with extra caution - showing warnings in the CLI
MANAGED_SECRET_CAREFULLY_TYPES = {
"kubernetes.io/dockerconfigjson", # Docker registry credentials. Users manage all these secrets manually,
# so is safe to handle by tkseal.
}

# Never allow these (system-managed, high risk)
FORBIDDEN_SECRET_TYPES = {
"kubernetes.io/service-account-token", # Cluster API tokens
"bootstrap.kubernetes.io/token", # Bootstrap tokens
"helm.sh/release.v1", # Helm release data
"kubernetes.io/tls", # TLS certificates
}
3 changes: 2 additions & 1 deletion src/tkseal/kubectl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import shutil

import yaml

from tkseal.exceptions import TKSealError
Expand Down Expand Up @@ -48,4 +49,4 @@ def get_secrets(context: str, namespace: str) -> dict:
try:
return yaml.safe_load(output)
except yaml.YAMLError as e:
raise TKSealError(f"Failed to parse secrets YAML: {str(e)}") from e
raise TKSealError(f"Failed to parse secrets YAML: {str(e)}") from e
18 changes: 9 additions & 9 deletions src/tkseal/kubeseal.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ def seal(context: str, namespace: str, name: str, value: str) -> str:
"""
# Construct kubeseal command
cmd = [
"kubeseal",
"--raw",
"--namespace",
namespace,
"--name",
name,
"--context",
context,
"kubeseal",
"--raw",
"--namespace",
namespace,
"--name",
name,
"--context",
context,
]

# Execute kubeseal command with value piped via stdin
result = run_command(cmd, value=value)

return result
return result
Empty file added src/tkseal/py.typed
Empty file.
18 changes: 16 additions & 2 deletions src/tkseal/seal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import json

from tkseal import TKSealError
from tkseal.configuration import PLAIN_SECRETS_FILE
from tkseal.kubeseal import KubeSeal
from tkseal.secret_state import SecretState

Expand Down Expand Up @@ -56,7 +58,17 @@ def run(self) -> None:
"""
# Read and parse plain secrets
plain_secrets_text = self.secret_state.plain_secrets()
plain_secrets = json.loads(plain_secrets_text)

# Check if plain_secrets_text is empty or exists
if not plain_secrets_text or plain_secrets_text.strip() == "":
raise TKSealError(
f"No plain secrets found. Please create {PLAIN_SECRETS_FILE} "
f"or run 'tkseal pull' first."
)
try:
plain_secrets = json.loads(plain_secrets_text)
except json.decoder.JSONDecodeError as e:
raise TKSealError(f"Invalid JSON in {PLAIN_SECRETS_FILE}: {str(e)}") from e

# Process each secret
sealed_secrets = []
Expand All @@ -79,7 +91,9 @@ def run(self) -> None:
"metadata": {
"name": secret["name"],
"namespace": self.secret_state.namespace,
}
},
# Preserve secret type if specified in plain_secrets.json
**({"type": secret["type"]} if "type" in secret else {}),
},
"encryptedData": encrypted_data,
},
Expand Down
64 changes: 63 additions & 1 deletion src/tkseal/secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
from typing import Any, cast

from tkseal import TKSealError
from tkseal.configuration import (
FORBIDDEN_SECRET_TYPES,
MANAGED_SECRET_CAREFULLY_TYPES,
MANAGED_SECRET_TYPES,
)
from tkseal.kubectl import KubeCtl
from tkseal.tk import TKEnvironment

Expand Down Expand Up @@ -36,14 +41,44 @@ def data(self) -> list[SecretDataPair]:
)
return result

@property
def type(self) -> str:
return cast(str, self._raw.get("type", ""))


class ForbiddenSecret(Secret):
"""Represents a secret that are not allowed to pull or process due to security policies."""

def __init__(self, raw: dict[str, Any]):
super().__init__(raw)

@property
def data(self) -> list[SecretDataPair]:
"""Forbidden secrets do not expose data, so accessing this raises an error."""
raise TKSealError(
f'Forbidden secret "{self.name}" data cannot be accessed or sealed'
)


class Secrets:
# Security check: prevent processing of forbidden types
# Store forbidden secrets for reporting - [{"secret1":"kubernetes.io/service-account-token"}]
forbidden_secrets: list[ForbiddenSecret]

def __init__(self, raw_secrets: dict[str, Any]):
"""Initialize Secrets from YAML-parsed kubectl output format.
Args:
raw_secrets: kubectl output dict with 'items' key
Raises:
TKSealError: If raw_secrets does not have the items key

Secret type - Filtering Strategy (to be implemented):

1. Default to Opaque only (safest, matches Ruby)
2. Add explicit allow-list for basic-auth and ssh-auth
3. Add explicit deny-list for service-account-token and bootstrap tokens
4. Raise error if user tries to include forbidden types (fail-safe)
5. Log warning when filtering secrets (visibility)
"""

# Handle kubectl format with the "items" key
Expand All @@ -53,7 +88,16 @@ def __init__(self, raw_secrets: dict[str, Any]):
f"Got keys: {list(raw_secrets.keys())}"
)

self.items = [Secret(raw) for raw in raw_secrets["items"]]
self.allowed_types = MANAGED_SECRET_TYPES.union(MANAGED_SECRET_CAREFULLY_TYPES)

self.forbidden_secrets = Secrets.get_forbidden_secrets(raw_secrets)

# TODO: If the secret does not have type, assume "Opaque" (Kubernetes default)
self.items = [
Secret(raw)
for raw in raw_secrets["items"]
if raw.get("type", "Opaque") in self.allowed_types
]

@classmethod
def for_tk_env(cls, path: str) -> "Secrets":
Expand All @@ -72,6 +116,23 @@ def for_tk_env(cls, path: str) -> "Secrets":
raw_secrets = KubeCtl.get_secrets(context=env.context, namespace=env.namespace)
return cls(raw_secrets)

@staticmethod
def get_forbidden_secrets(
raw_secrets: dict[str, Any],
) -> list[ForbiddenSecret]:
"""Create a list of ForbiddenSecret objects.

Args:
raw_secrets: kubectl output dict with 'items' key
Returns:
List of secrets with forbidden types
"""
filtered_items: list[ForbiddenSecret] = []
for raw in raw_secrets.get("items", []):
if raw.get("type") in FORBIDDEN_SECRET_TYPES:
filtered_items.append(ForbiddenSecret(raw))
return filtered_items

def to_json(self) -> str:
"""Convert secrets to JSON format with decoded plain values.

Expand All @@ -91,6 +152,7 @@ def to_json(self) -> str:
secret_dict = {
"name": secret.name,
"data": {pair.key: pair.plain_value for pair in secret.data},
"type": secret.type,
}
output.append(secret_dict)
return json.dumps(output, indent=2)
Loading