Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,5 @@ print(res.exit_code)
The following features are not supported at the moment:

* Special subresources `attach`, `portforward` and `proxy`.
* `auth-provider` authentication method is not supported. The supported authentication methods are `token`, `username` + `password` and `exec`.
* `auth-provider` authentication method is not supported. The supported authentication methods are `token`, `tokenFile`, `username` + `password` and `exec`.

5 changes: 5 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ config = KubeConfig.from_service_account()
client = Client(config=config)
```

The in-cluster configuration automatically supports **token refresh**. Kubernetes rotates projected service account
tokens before they expire, and lightkube will re-read the token from the mounted file whenever the API server returns a
`401 Unauthorized` response. This ensures long-running controllers and operators continue to work without restarts, even
with short-lived tokens.

## Auto-detect configuration from the environment

By default lightkube will do his best to detect the configuration looking
Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,5 +435,5 @@ Execute a command inside a pod
The following features are not supported at the moment:

* Special subresources `attach`, `portforward` and `proxy`.
* `auth-provider` authentication method is not supported. The supported authentication methods are `token`, `username` + `password` and `exec`.
* `auth-provider` authentication method is not supported. The supported authentication methods are `token`, `tokenFile`, `username` + `password` and `exec`.

37 changes: 37 additions & 0 deletions src/lightkube/config/client_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import ssl
import subprocess
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import AsyncGenerator, Callable, Dict, Generator, List, Mapping, Optional, Sequence, Tuple, overload

import httpx
Expand Down Expand Up @@ -48,6 +49,39 @@ def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Re
yield request


class BearerTokenFileAuth(httpx.Auth):
"""Auth that reads a bearer token from a file, re-reading on 401.

Used for in-cluster service account tokens which are periodically
rotated by the kubelet. Similar to Go client-go's tokenFileAuth.
"""

def __init__(self, token_file: str) -> None:
self._token_file = token_file
self._last_bearer: Optional[str] = None

def _read_token(self) -> str:
try:
token = Path(self._token_file).read_text().strip()
except FileNotFoundError as e:
raise ConfigError(f"Token file {self._token_file} not found") from e
return f"Bearer {token}"

def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Response, None]:
if self._last_bearer is None:
self._last_bearer = self._read_token()

request.headers["Authorization"] = self._last_bearer
response = yield request
if response.status_code != 401:
return

# Token may have been rotated, re-read from file
self._last_bearer = self._read_token()
request.headers["Authorization"] = self._last_bearer
yield request


async def async_check_output(command: Sequence[str], env: Mapping[str, str]) -> bytes:
PIPE = asyncio.subprocess.PIPE
proc = await asyncio.create_subprocess_exec(*command, env=env, stdin=None, stdout=PIPE, stderr=PIPE)
Expand Down Expand Up @@ -125,6 +159,9 @@ def user_auth(user: Optional[User]) -> Optional[httpx.Auth]:
if user is None:
return None

if user.token_file is not None:
return BearerTokenFileAuth(user.token_file)

if user.token is not None:
return BearerAuth(user.token)

Expand Down
6 changes: 4 additions & 2 deletions src/lightkube/config/kubeconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,10 @@ def from_service_account(cls, service_account: StrOrPath = SERVICE_ACCOUNT) -> "
"""
account_dir = Path(service_account)

token_file = str(account_dir.joinpath("token"))

try:
token = account_dir.joinpath("token").read_text()
token = Path(token_file).read_text()
namespace = account_dir.joinpath("namespace").read_text()
except FileNotFoundError as e:
raise exceptions.ConfigError(str(e)) from e
Expand All @@ -230,7 +232,7 @@ def from_service_account(cls, service_account: StrOrPath = SERVICE_ACCOUNT) -> "
server=f"https://{host}:{port}",
certificate_auth=str(account_dir.joinpath("ca.crt")),
),
user=User(token=token),
user=User(token=token, token_file=token_file),
namespace=namespace,
)

Expand Down
1 change: 1 addition & 0 deletions src/lightkube/config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class User(DataclassDictMixIn):
username: Optional[str] = None
password: Optional[str] = None
token: Optional[str] = None
token_file: Optional[str] = field(metadata={"json": "tokenFile"}, default=None)
auth_provider: Optional[Dict] = field(metadata={"json": "auth-provider"}, default=None)
client_cert: Optional[str] = field(metadata={"json": "client-certificate"}, default=None)
client_cert_data: Optional[str] = field(metadata={"json": "client-certificate-data"}, default=None)
Expand Down
141 changes: 141 additions & 0 deletions tests/test_client_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,147 @@ def test_user_auth_bearer():
assert m.headers["Authorization"] == "Bearer abcd"


def test_user_auth_bearer_token_file(tmp_path):
"""BearerTokenFileAuth is used when token_file is set"""
token_file = tmp_path / "token"
token_file.write_text("my-sa-token")
auth = client_adapter.user_auth(models.User(token_file=str(token_file)))
assert isinstance(auth, client_adapter.BearerTokenFileAuth)
m = Mock(headers={})
next(auth.auth_flow(m))
assert m.headers["Authorization"] == "Bearer my-sa-token"


def test_user_auth_bearer_token_file_priority(tmp_path):
"""token_file takes priority over static token"""
token_file = tmp_path / "token"
token_file.write_text("file-token")
auth = client_adapter.user_auth(models.User(token="static-token", token_file=str(token_file)))
assert isinstance(auth, client_adapter.BearerTokenFileAuth)
m = Mock(headers={})
next(auth.auth_flow(m))
assert m.headers["Authorization"] == "Bearer file-token"


def test_bearer_token_file_auth_refresh(tmp_path):
"""Token is re-read from file on 401"""
token_file = tmp_path / "token"
token_file.write_text("old-token")

auth = client_adapter.BearerTokenFileAuth(str(token_file))
m = Mock(headers={})

# First request uses the initial token
flow = auth.auth_flow(m)
next(flow)
assert m.headers["Authorization"] == "Bearer old-token"

# Simulate token rotation
token_file.write_text("new-token")

# Server returns 401
m.headers["Authorization"] = None
flow.send(httpx.Response(status_code=401, request=m))
assert m.headers["Authorization"] == "Bearer new-token"


def test_bearer_token_file_auth_no_refresh_on_success(tmp_path):
"""Token is NOT re-read on non-401 responses"""
token_file = tmp_path / "token"
token_file.write_text("my-token")

auth = client_adapter.BearerTokenFileAuth(str(token_file))
m = Mock(headers={})

flow = auth.auth_flow(m)
next(flow)
assert m.headers["Authorization"] == "Bearer my-token"

# Token changes on disk, but server returned 200
token_file.write_text("new-token")
with pytest.raises(StopIteration):
flow.send(httpx.Response(status_code=200, request=m))

# Cached token is still used on next request
m2 = Mock(headers={})
flow2 = auth.auth_flow(m2)
next(flow2)
assert m2.headers["Authorization"] == "Bearer my-token"


def test_bearer_token_file_auth_caching(tmp_path):
"""Token is cached between requests"""
token_file = tmp_path / "token"
token_file.write_text("cached-token")

auth = client_adapter.BearerTokenFileAuth(str(token_file))

# First request reads from file
m1 = Mock(headers={})
flow1 = auth.auth_flow(m1)
next(flow1)
assert m1.headers["Authorization"] == "Bearer cached-token"
with pytest.raises(StopIteration):
flow1.send(httpx.Response(status_code=200, request=m1))

# Token changes but cache is used
token_file.write_text("different-token")

m2 = Mock(headers={})
flow2 = auth.auth_flow(m2)
next(flow2)
assert m2.headers["Authorization"] == "Bearer cached-token"


def test_bearer_token_file_auth_file_not_found(tmp_path):
"""ConfigError raised if token file doesn't exist"""
auth = client_adapter.BearerTokenFileAuth(str(tmp_path / "nonexistent"))
with pytest.raises(ConfigError, match="not found"):
next(auth.auth_flow(Mock(headers={})))


@pytest.mark.asyncio
async def test_bearer_token_file_auth_async_refresh(tmp_path):
"""Token refresh works through the inherited async_auth_flow delegation"""
token_file = tmp_path / "token"
token_file.write_text("old-token")

auth = client_adapter.BearerTokenFileAuth(str(token_file))
m = Mock(headers={})

flow = auth.async_auth_flow(m)
await flow.__anext__()
assert m.headers["Authorization"] == "Bearer old-token"

# Simulate token rotation
token_file.write_text("new-token")

# Server returns 401
m.headers["Authorization"] = None
await flow.asend(httpx.Response(status_code=401, request=m))
assert m.headers["Authorization"] == "Bearer new-token"

with pytest.raises(StopAsyncIteration):
await flow.__anext__()


@pytest.mark.asyncio
async def test_bearer_token_file_auth_async_no_refresh_on_success(tmp_path):
"""No refresh on success through inherited async_auth_flow delegation"""
token_file = tmp_path / "token"
token_file.write_text("my-token")

auth = client_adapter.BearerTokenFileAuth(str(token_file))
m = Mock(headers={})

flow = auth.async_auth_flow(m)
await flow.__anext__()
assert m.headers["Authorization"] == "Bearer my-token"

with pytest.raises(StopAsyncIteration):
await flow.asend(httpx.Response(status_code=200, request=m))


def test_user_auth_provider():
"""Auth provider not supported"""
with pytest.raises(ConfigError):
Expand Down
1 change: 1 addition & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def test_from_service_account(service_account):
c = cfg.get()
assert c.namespace == "my-namespace"
assert c.user.token == "ABCD"
assert c.user.token_file == str(service_account.joinpath("token"))
assert c.cluster.server == "https://k8s.local:9443"


Expand Down
Loading