Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import fnmatch
from typing import Optional

from fastapi import HTTPException, Request
Expand Down Expand Up @@ -27,15 +28,31 @@ def __init__(self, *args, **kwargs):
self.oauth2_proxy_role_header = config(
"KEEP_OAUTH2_PROXY_ROLE_HEADER", default="x-forwarded-groups"
)
self.auto_create_user = config(
"KEEP_OAUTH2_PROXY_AUTO_CREATE_USER", default=True
)
self.role_mappings = {
config("KEEP_OAUTH2_PROXY_ADMIN_ROLE", default=""): "admin",
config("KEEP_OAUTH2_PROXY_NOC_ROLE", default=""): "noc",
config("KEEP_OAUTH2_PROXY_WEBHOOK_ROLE", default=""): "webhook",
}
self.logger.info("Oauth2proxy Auth Verifier initialized")
self.auto_create_user = config("KEEP_OAUTH2_PROXY_AUTO_CREATE_USER", default=True)

# Helper: parse comma-separated groups (supports wildcards)
def parse_role_env(var_name: str, mapped_role: str):
raw_value = config(var_name, default="")
if not raw_value:
return []
groups = [v.strip() for v in raw_value.split(",") if v.strip()]
return [(pattern, mapped_role) for pattern in groups]

# Collect mappings (pattern β†’ role)
self.role_patterns = []
self.role_patterns += parse_role_env("KEEP_OAUTH2_PROXY_ADMIN_ROLE", "admin")
self.role_patterns += parse_role_env("KEEP_OAUTH2_PROXY_NOC_ROLE", "noc")
self.role_patterns += parse_role_env("KEEP_OAUTH2_PROXY_WEBHOOK_ROLE", "webhook")

self.logger.info(f"Oauth2proxy Auth Verifier initialized with patterns: {self.role_patterns}")

def _match_role_pattern(self, role_name: str) -> Optional[str]:
"""Check if given role_name matches any configured wildcard pattern."""
for pattern, mapped_role in self.role_patterns:
if fnmatch.fnmatch(role_name, pattern):
self.logger.debug(f"Matched role '{role_name}' to pattern '{pattern}' β†’ {mapped_role}")
return mapped_role
return None

def authenticate(
self,
Expand All @@ -46,89 +63,59 @@ def authenticate(
*args,
**kwargs,
) -> AuthenticatedEntity:
# If we have an api key or an authorization header, we need to authenticate using that
# API key authentication first
if api_key or request.headers.get("Authorization"):
try:
api_key = self._extract_api_key(request, api_key, authorization)

if api_key:
self.logger.info("Attempting to authenticate with API key")
try:
return self._verify_api_key(request, api_key, authorization)
except HTTPException:
raise
except Exception:
self.logger.exception("Failed to validate API Key")
raise HTTPException(
status_code=401, detail="Invalid authentication credentials"
)
return self._verify_api_key(request, api_key, authorization)
except Exception:
# If we fail to validate the API key, we need to try to authenticate with the user and role headers
# We will either way return a 401 status code if it fails, so we don't need to handle it here
pass

# https://github.com/keephq/keep/issues/1203
# get user name
self.logger.info(
f"Authenticating user with {self.oauth2_proxy_user_header} header"
)
user_name = request.headers.get(self.oauth2_proxy_user_header)
pass # fallback to header-based authentication

# Header-based auth (via oauth2-proxy)
user_name = request.headers.get(self.oauth2_proxy_user_header)
if not user_name:
raise HTTPException(
status_code=401,
detail=f"Unauthorized - no user in {self.oauth2_proxy_user_header} header found",
)

role = request.headers.get(self.oauth2_proxy_role_header)
if not role:
role_header = request.headers.get(self.oauth2_proxy_role_header)
if not role_header:
raise HTTPException(
status_code=401,
detail=f"Unauthorized - no role in {self.oauth2_proxy_role_header} header found",
)

# else, if its a list seperated by comma e.g. org:admin, org:foobar or role:admin, role:foobar
if "," in role:
# split the roles by comma
roles = role.split(",")
# trim
roles = [r.strip() for r in roles]
else:
roles = [role]
# Multiple roles allowed in header
roles = [r.strip() for r in role_header.split(",") if r.strip()]
self.logger.debug(f"User {user_name} has roles from header: {roles}")

# Define the priority order of roles
# Define priority order
role_priority = ["admin", "noc", "webhook"]

mapped_role = None

# Try to match according to priority
for priority_role in role_priority:
self.logger.debug(f"Checking for role {priority_role}")
for role in roles:
self.logger.debug(f"Checking for role {role}")
# map the role if its a mapped one, or just use the role
mapped_role_name = self.role_mappings.get(role, role)
self.logger.debug(f"Checking for mapped role {mapped_role_name}")
if mapped_role_name == priority_role:
matched_role = self._match_role_pattern(role)
if matched_role == priority_role:
try:
self.logger.debug(f"Getting role {mapped_role_name}")
mapped_role = get_role_by_role_name(mapped_role_name)
self.logger.debug(f"Role {mapped_role_name} found")
mapped_role = get_role_by_role_name(priority_role)
break
except HTTPException:
self.logger.debug(f"Role {mapped_role_name} not found")
continue
if mapped_role:
self.logger.debug(f"Role {mapped_role_name} found")
break

# if no valid role was found, throw a 403 exception
if not mapped_role:
self.logger.debug(f"No valid role found among {roles}")
raise HTTPException(
status_code=403,
detail=f"No valid role found among {roles}",
)

# auto provision user
# Auto-create or update user
if self.auto_create_user and not user_exists(
tenant_id=SINGLE_TENANT_UUID, username=user_name
):
Expand All @@ -139,34 +126,18 @@ def authenticate(
role=mapped_role.get_name(),
password="",
)
self.logger.info(f"User {user_name} created")
elif user_exists(tenant_id=SINGLE_TENANT_UUID, username=user_name):
# update last login
self.logger.debug(f"Updating last login for user: {user_name}")
try:
update_user_last_sign_in(
tenant_id=SINGLE_TENANT_UUID, username=user_name
)
self.logger.debug(f"Last login updated for user: {user_name}")
except Exception:
self.logger.warning(
f"Failed to update last login for user: {user_name}"
)
pass
# update role
self.logger.debug(f"Updating role for user: {user_name}")
else:
try:
update_user_last_sign_in(tenant_id=SINGLE_TENANT_UUID, username=user_name)
update_user_role(
tenant_id=SINGLE_TENANT_UUID,
username=user_name,
role=mapped_role.get_name(),
)
self.logger.debug(f"Role updated for user: {user_name}")
except Exception:
self.logger.warning(f"Failed to update role for user: {user_name}")
pass
except Exception as e:
self.logger.warning(f"Failed updating user data: {e}")

self.logger.info(f"User {user_name} authenticated with role {mapped_role}")
self.logger.info(f"User {user_name} authenticated with role {mapped_role.get_name()}")
return AuthenticatedEntity(
tenant_id=SINGLE_TENANT_UUID,
email=user_name,
Expand Down
Loading