Skip to content

Commit

Permalink
Add subjectAccessReview test for resource attributes
Browse files Browse the repository at this point in the history
Signed-off-by: averevki <[email protected]>
  • Loading branch information
averevki committed Jan 29, 2025
1 parent d646efd commit 2de1f9c
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import pytest

from testsuite.httpx.auth import HeaderApiKeyAuth
from testsuite.kuadrant.policy.authorization import ValueFrom
from testsuite.kubernetes.cluster_role import ClusterRole, ClusterRoleBinding
from testsuite.kubernetes.cluster_role import ClusterRoleBinding


@pytest.fixture(scope="module")
Expand All @@ -17,8 +16,6 @@ def audience(hostname):
def authorization(authorization):
"""Add kubernetes token-review and subject-access-review identity"""
authorization.identity.add_kubernetes("token-review-host")
user = ValueFrom("auth.identity.user.username")
authorization.authorization.add_kubernetes("subject-access-review-host", user)
return authorization


Expand All @@ -38,37 +35,27 @@ def _create_cluster_role_binding(cluster_role, service_accounts):


@pytest.fixture(scope="module")
def cluster_role(request, cluster, blame, module_label):
"""Creates and returns a ClusterRole"""
rules = [{"nonResourceURLs": ["/get"], "verbs": ["get"]}]
cluster_role = ClusterRole.create_instance(cluster, blame("cr"), rules, labels={"app": module_label})
request.addfinalizer(cluster_role.delete)
cluster_role.commit()
return cluster_role


@pytest.fixture(scope="module")
def bound_service_account_token(cluster_role, create_service_account, create_cluster_role_binding, audience):
def service_account_token(create_service_account, create_cluster_role_binding, cluster_role, audience):
"""Create a ServiceAccount, bind it to a ClusterRole and return its token with a given audience"""
service_account = create_service_account("tkn-auth")
create_cluster_role_binding(cluster_role.model.metadata.name, [service_account.model.metadata.name])
service_account = create_service_account("auth")
create_cluster_role_binding(cluster_role.name(), [service_account.name()])
return service_account.get_auth_token(audience)


@pytest.fixture(scope="module")
def auth(bound_service_account_token):
def auth(service_account_token):
"""Create request auth with service account token as API key"""
return HeaderApiKeyAuth(bound_service_account_token, "Bearer")
return HeaderApiKeyAuth(service_account_token, "Bearer")


@pytest.fixture(scope="module")
def service_account_token(create_service_account, audience):
def service_account_token2(create_service_account, audience):
"""Create a non-authorized service account and request its bound token with the hostname as audience"""
service_account = create_service_account("tkn-non-auth")
service_account = create_service_account("no-auth")
return service_account.get_auth_token(audience)


@pytest.fixture(scope="module")
def auth2(service_account_token):
def auth2(service_account_token2):
"""Create request auth with service account token as API key"""
return HeaderApiKeyAuth(service_account_token, "Bearer")
return HeaderApiKeyAuth(service_account_token2, "Bearer")
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Test kubernetes SubjectAccessReview non-resource attributes authorization by verifying only a
ServiceAccount bound to a ClusterRole is authorized to access a resource"""

import pytest

from testsuite.kuadrant.policy.authorization import ValueFrom
from testsuite.kubernetes.cluster_role import ClusterRole, Rule

pytestmark = [pytest.mark.authorino]


@pytest.fixture(scope="module")
def authorization(authorization):
"""Add kubernetes subject-access-review authorization with non-resource attributes (omit resource_attributes)"""
authorization.authorization.add_kubernetes(
"subject-access-review-username", ValueFrom("auth.identity.user.username")
)
return authorization


@pytest.fixture(scope="module")
def cluster_role(request, cluster, blame, module_label):
"""Creates and returns a ClusterRole"""
rules = [Rule(verbs=["get"], nonResourceURLs=["/get"])]
cluster_role = ClusterRole.create_instance(cluster, blame("cr"), rules, labels={"app": module_label})
request.addfinalizer(cluster_role.delete)
cluster_role.commit()
return cluster_role


def test_subject_access_review_non_resource_attributes(client, auth, auth2):
"""Test Kubernetes SubjectAccessReview functionality by setting up authentication and authorization for an endpoint
and querying it with authorized and non-authorized ServiceAccount."""
response = client.get("/get", auth=auth)
assert response.status_code == 200

response = client.get("/get", auth=auth2)
assert response.status_code == 403
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Test kubernetes SubjectAccessReview with resource attributes"""

import pytest

from testsuite.kuadrant.policy import CelExpression
from testsuite.kuadrant.policy.authorization import ValueFrom, Value, ResourceAttributes
from testsuite.kubernetes.cluster_role import ClusterRole, Rule

pytestmark = [pytest.mark.authorino]


@pytest.fixture(scope="module")
def authorization(authorization):
"""Add kubernetes subject-access-review identity with resource attributes for authpolicy resource"""
authorization.authorization.add_kubernetes(
"subject-access-review-host",
ValueFrom("auth.identity.user.username"),
ResourceAttributes(
resource=Value("authpolicy"), group=Value("kuadrant.io"), verb=CelExpression("request.method.lowerAscii()")
),
)
return authorization


@pytest.fixture(scope="module")
def cluster_role(request, cluster, blame, module_label):
"""Creates ClusterRole with rules only for accessing authpolicy resource"""
rules = [Rule(verbs=["get"], resources=["authpolicy"], apiGroups=["kuadrant.io"])]
cluster_role = ClusterRole.create_instance(cluster, blame("cr"), rules, labels={"app": module_label})
request.addfinalizer(cluster_role.delete)
cluster_role.commit()
return cluster_role


def test_subject_access_review_resource_attributes(client, auth, auth2):
"""Test if the client is authorized to access the api based on the service account token resource attributes"""
response = client.get("/get", auth=auth)
assert response.status_code == 200

response = client.post("/post", auth=auth)
assert response.status_code == 403

response = client.get("/get", auth=auth2)
assert response.status_code == 403

This file was deleted.

0 comments on commit 2de1f9c

Please sign in to comment.