Source code for kelvin.sdk.services.auth.keycloak_client

"""Keycloak OAuth/OIDC client implementation."""

from __future__ import annotations

from typing import Final, Optional

import httpx
from pydantic import ValidationError

from kelvin.sdk.services.auth.errors import KeycloakError
from kelvin.sdk.services.auth.models import DeviceAuthResponse, DevicePollStatus, TokenResponse


[docs] class KeycloakClient: """Client for interacting with Keycloak OAuth/OIDC endpoints.""" TIMEOUT: Final[float] = 10.0
[docs] def __init__( self, base_url: str, realm: str, client_id: str, client_secret: Optional[str] = None, ) -> None: """Initialize the Keycloak client. Args: base_url: The base URL of the Kelvin platform. realm: The Keycloak realm name. client_id: The OAuth client ID. client_secret: Optional client secret for confidential clients. """ self.base_url: str = base_url self.realm: str = realm self.client_id: str = client_id self.client_secret: Optional[str] = client_secret self.keycloak_base_url: str = f"{self.base_url}/auth/realms/{self.realm}" self._oidc_config: Optional[dict[str, object]] = None
[docs] def get_oidc_configuration(self) -> dict[str, object]: """Fetch the OIDC configuration from Keycloak's well-known endpoint. Returns: Dictionary containing OIDC configuration. Raises: KeycloakError: If the request fails. """ if self._oidc_config is not None: return self._oidc_config oidc_url = f"{self.keycloak_base_url}/.well-known/openid-configuration" try: response = httpx.get(oidc_url, timeout=self.TIMEOUT) _ = response.raise_for_status() config: object = response.json() if not isinstance(config, dict): raise KeycloakError("Invalid OIDC configuration format") self._oidc_config = config # ty: ignore[invalid-assignment] return self._oidc_config # ty: ignore[invalid-return-type] except httpx.HTTPError as e: raise KeycloakError(f"Failed to fetch OIDC configuration: {e}") from e
[docs] def get_authorization_endpoint(self) -> str: """Get the authorization endpoint URL. Returns: Authorization endpoint URL. Raises: KeycloakError: If endpoint is not found. """ config = self.get_oidc_configuration() endpoint = config.get("authorization_endpoint") if not isinstance(endpoint, str): raise KeycloakError("Authorization endpoint not found in OIDC configuration") return endpoint
[docs] def get_token_endpoint(self) -> str: """Get the token endpoint URL. Returns: Token endpoint URL. Raises: KeycloakError: If endpoint is not found. """ config = self.get_oidc_configuration() endpoint = config.get("token_endpoint") if not isinstance(endpoint, str): raise KeycloakError("Token endpoint not found in OIDC configuration") return endpoint
[docs] def get_userinfo_endpoint(self) -> str: """Get the userinfo endpoint URL. Returns: Userinfo endpoint URL. Raises: KeycloakError: If endpoint is not found. """ config = self.get_oidc_configuration() endpoint = config.get("userinfo_endpoint") if not isinstance(endpoint, str): raise KeycloakError("Userinfo endpoint not found in OIDC configuration") return endpoint
[docs] def get_logout_endpoint(self) -> str: """Get the logout endpoint URL. Returns: Logout endpoint URL. Raises: KeycloakError: If endpoint is not found. """ config = self.get_oidc_configuration() endpoint = config.get("end_session_endpoint") if not isinstance(endpoint, str): raise KeycloakError("Logout endpoint not found in OIDC configuration") return endpoint
[docs] def get_revocation_endpoint(self) -> str: """Get the token revocation endpoint URL (RFC 7009). Tries ``revocation_endpoint`` from OIDC config first, then falls back to the standard Keycloak path. Returns: Revocation endpoint URL. """ config = self.get_oidc_configuration() endpoint = config.get("revocation_endpoint") if isinstance(endpoint, str): return endpoint return f"{self.keycloak_base_url}/protocol/openid-connect/revoke"
[docs] def exchange_code_for_tokens( self, code: str, redirect_uri: str, code_verifier: str, ) -> TokenResponse: """Exchange an authorization code for access and refresh tokens. Args: code: The authorization code from the callback. redirect_uri: The redirect URI used in the authorization request. code_verifier: The PKCE code verifier. Returns: TokenResponse containing access and refresh tokens. Raises: KeycloakError: If the token exchange fails. """ token_endpoint = self.get_token_endpoint() data = { "grant_type": "authorization_code", "code": code, "redirect_uri": redirect_uri, "client_id": self.client_id, "code_verifier": code_verifier, } try: response = httpx.post(token_endpoint, data=data, timeout=self.TIMEOUT) _ = response.raise_for_status() result: object = response.json() if not isinstance(result, dict): raise KeycloakError("Invalid token response format") return TokenResponse.model_validate(result) except ValidationError as e: raise KeycloakError(f"Invalid token response: {e}") from e except httpx.HTTPStatusError as e: error_detail = e.response.text raise KeycloakError(f"Failed to exchange code for tokens: {e.response.status_code} - {error_detail}") from e except httpx.HTTPError as e: raise KeycloakError(f"Failed to exchange code for tokens: {e}") from e
[docs] def authenticate_with_password( self, username: str, password: str, totp: Optional[str] = None, ) -> TokenResponse: """Authenticate using Resource Owner Password Credentials Grant. Args: username: The user's username. password: The user's password. totp: Optional TOTP code for 2FA. Returns: TokenResponse containing access and refresh tokens. Raises: KeycloakError: If the authentication fails. """ token_endpoint = self.get_token_endpoint() data: dict[str, str] = { "grant_type": "password", "username": username, "password": password, "client_id": self.client_id, } if totp: data["totp"] = totp try: response = httpx.post(token_endpoint, data=data, timeout=self.TIMEOUT) _ = response.raise_for_status() result: object = response.json() if not isinstance(result, dict): raise KeycloakError("Invalid token response format") return TokenResponse.model_validate(result) except ValidationError as e: raise KeycloakError(f"Invalid token response: {e}") from e except httpx.HTTPStatusError as e: error_detail = e.response.text raise KeycloakError( f"Failed to authenticate with password: {e.response.status_code} - {error_detail}" ) from e except httpx.HTTPError as e: raise KeycloakError(f"Failed to authenticate with password: {e}") from e
[docs] def authenticate_with_client_credentials( self, client_id: str, client_secret: str, ) -> TokenResponse: """Authenticate using Client Credentials Grant. Args: client_id: The OAuth client ID. client_secret: The OAuth client secret. Returns: TokenResponse containing access token (no refresh token for this grant). Raises: KeycloakError: If the authentication fails. """ token_endpoint = self.get_token_endpoint() data = { "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, } try: response = httpx.post(token_endpoint, data=data, timeout=self.TIMEOUT) _ = response.raise_for_status() result: object = response.json() if not isinstance(result, dict): raise KeycloakError("Invalid token response format") return TokenResponse.model_validate(result) except ValidationError as e: raise KeycloakError(f"Invalid token response: {e}") from e except httpx.HTTPStatusError as e: error_detail = e.response.text raise KeycloakError( f"Failed to authenticate with client credentials: {e.response.status_code} - {error_detail}" ) from e except httpx.HTTPError as e: raise KeycloakError(f"Failed to authenticate with client credentials: {e}") from e
[docs] def refresh_access_token(self, refresh_token: str) -> TokenResponse: """Refresh the access token using a refresh token. Args: refresh_token: The refresh token. Returns: TokenResponse containing new tokens. Raises: KeycloakError: If the token refresh fails. """ token_endpoint = self.get_token_endpoint() data = { "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": self.client_id, } try: response = httpx.post(token_endpoint, data=data, timeout=self.TIMEOUT) _ = response.raise_for_status() result: object = response.json() if not isinstance(result, dict): raise KeycloakError("Invalid token response format") return TokenResponse.model_validate(result) except ValidationError as e: raise KeycloakError(f"Invalid token response: {e}") from e except httpx.HTTPStatusError as e: if e.response.status_code == 400: raise KeycloakError("Refresh token expired or invalid. Please log in again.") from e error_detail = e.response.text raise KeycloakError(f"Failed to refresh token: {e.response.status_code} - {error_detail}") from e except httpx.HTTPError as e: raise KeycloakError(f"Failed to refresh token: {e}") from e
[docs] def get_user_info(self, access_token: str) -> dict[str, object]: """Fetch user information from Keycloak's userinfo endpoint. Args: access_token: The access token. Returns: Dictionary containing user information. Raises: KeycloakError: If the request fails. """ userinfo_endpoint = self.get_userinfo_endpoint() headers = {"Authorization": f"Bearer {access_token}"} try: response = httpx.get(userinfo_endpoint, headers=headers, timeout=self.TIMEOUT) _ = response.raise_for_status() result: object = response.json() if not isinstance(result, dict): raise KeycloakError("Invalid userinfo response format") user_info: dict[str, object] = result # pyright: ignore[reportUnknownVariableType] # ty: ignore[invalid-assignment] return user_info except httpx.HTTPError as e: raise KeycloakError(f"Failed to fetch user info: {e}") from e
[docs] def logout(self, refresh_token: str) -> None: """Logout by revoking the refresh token (RFC 7009). Args: refresh_token: The refresh token to revoke. Raises: KeycloakError: If the revocation fails. """ revocation_endpoint = self.get_revocation_endpoint() data = { "client_id": self.client_id, "token": refresh_token, "token_type_hint": "refresh_token", } try: response = httpx.post(revocation_endpoint, data=data, timeout=self.TIMEOUT) _ = response.raise_for_status() except httpx.HTTPStatusError as e: error_detail = e.response.text raise KeycloakError(f"Failed to revoke token: {e.response.status_code} - {error_detail}") from e except httpx.HTTPError as e: raise KeycloakError(f"Failed to revoke token: {e}") from e
[docs] def request_device_code(self) -> DeviceAuthResponse: """Request a device code for the OAuth 2.0 Device Authorization Grant (RFC 8628). Returns: DeviceAuthResponse containing device_code, user_code, and verification URIs. Raises: KeycloakError: If the request fails. """ device_auth_url = f"{self.keycloak_base_url}/protocol/openid-connect/auth/device" try: response = httpx.post( device_auth_url, data={"client_id": self.client_id}, timeout=self.TIMEOUT, ) _ = response.raise_for_status() try: result: object = response.json() except ValueError as e: raise KeycloakError(f"Non-JSON response from device endpoint: {response.text[:200]}") from e if not isinstance(result, dict): raise KeycloakError("Invalid device authorization response format") return DeviceAuthResponse.model_validate(result) except ValidationError as e: raise KeycloakError(f"Invalid device authorization response: {e}") from e except httpx.HTTPStatusError as e: detail = e.response.text msg = f"Device authorization request failed: {e.response.status_code} - {detail}" raise KeycloakError(msg) from e except httpx.HTTPError as e: raise KeycloakError(f"Device authorization request failed: {e}") from e
[docs] def poll_device_token(self, device_code: str) -> TokenResponse | DevicePollStatus: """Poll the token endpoint for a device code grant. Returns the token response if the user has authorized, or a DevicePollStatus indicating the client should keep polling. Args: device_code: The device code from the device authorization response. Returns: TokenResponse if authorized, or DevicePollStatus if still pending. Raises: KeycloakError: If the polling encounters a terminal error (expired, denied, etc.). """ token_endpoint = self.get_token_endpoint() data = { "grant_type": "urn:ietf:params:oauth:grant-type:device_code", "device_code": device_code, "client_id": self.client_id, } try: response = httpx.post(token_endpoint, data=data, timeout=self.TIMEOUT) except httpx.HTTPError as e: raise KeycloakError(f"Failed to poll for device token: {e}") from e try: result: object = response.json() except ValueError as e: raise KeycloakError( f"Non-JSON response from token endpoint ({response.status_code}): {response.text[:200]}" ) from e if not isinstance(result, dict): raise KeycloakError("Invalid token response format") response_data: dict[str, object] = result # pyright: ignore[reportUnknownVariableType] # ty: ignore[invalid-assignment] if response.status_code == 200: try: return TokenResponse.model_validate(response_data) except ValidationError as e: raise KeycloakError(f"Invalid token response: {e}") from e error = str(response_data.get("error", "")) if error == "authorization_pending": return DevicePollStatus.AUTHORIZATION_PENDING if error == "slow_down": return DevicePollStatus.SLOW_DOWN error_description = str(response_data.get("error_description", error)) if error == "expired_token": raise KeycloakError("Device code has expired. Please try again.") if error == "access_denied": raise KeycloakError("Authorization request was denied by the user.") raise KeycloakError(f"Device authorization failed: {error_description}")