"""Keycloak OAuth/OIDC client implementation."""
from __future__ import annotations
from typing import Final, Optional
import httpx
from pydantic import ValidationError
from kelvin.sdk.services.auth.errors import KeycloakError
from kelvin.sdk.services.auth.models import DeviceAuthResponse, DevicePollStatus, TokenResponse
[docs]
class KeycloakClient:
"""Client for interacting with Keycloak OAuth/OIDC endpoints."""
TIMEOUT: Final[float] = 10.0
[docs]
def __init__(
self,
base_url: str,
realm: str,
client_id: str,
client_secret: Optional[str] = None,
) -> None:
"""Initialize the Keycloak client.
Args:
base_url: The base URL of the Kelvin platform.
realm: The Keycloak realm name.
client_id: The OAuth client ID.
client_secret: Optional client secret for confidential clients.
"""
self.base_url: str = base_url
self.realm: str = realm
self.client_id: str = client_id
self.client_secret: Optional[str] = client_secret
self.keycloak_base_url: str = f"{self.base_url}/auth/realms/{self.realm}"
self._oidc_config: Optional[dict[str, object]] = None
[docs]
def get_oidc_configuration(self) -> dict[str, object]:
"""Fetch the OIDC configuration from Keycloak's well-known endpoint.
Returns:
Dictionary containing OIDC configuration.
Raises:
KeycloakError: If the request fails.
"""
if self._oidc_config is not None:
return self._oidc_config
oidc_url = f"{self.keycloak_base_url}/.well-known/openid-configuration"
try:
response = httpx.get(oidc_url, timeout=self.TIMEOUT)
_ = response.raise_for_status()
config: object = response.json()
if not isinstance(config, dict):
raise KeycloakError("Invalid OIDC configuration format")
self._oidc_config = config # ty: ignore[invalid-assignment]
return self._oidc_config # ty: ignore[invalid-return-type]
except httpx.HTTPError as e:
raise KeycloakError(f"Failed to fetch OIDC configuration: {e}") from e
[docs]
def get_authorization_endpoint(self) -> str:
"""Get the authorization endpoint URL.
Returns:
Authorization endpoint URL.
Raises:
KeycloakError: If endpoint is not found.
"""
config = self.get_oidc_configuration()
endpoint = config.get("authorization_endpoint")
if not isinstance(endpoint, str):
raise KeycloakError("Authorization endpoint not found in OIDC configuration")
return endpoint
[docs]
def get_token_endpoint(self) -> str:
"""Get the token endpoint URL.
Returns:
Token endpoint URL.
Raises:
KeycloakError: If endpoint is not found.
"""
config = self.get_oidc_configuration()
endpoint = config.get("token_endpoint")
if not isinstance(endpoint, str):
raise KeycloakError("Token endpoint not found in OIDC configuration")
return endpoint
[docs]
def get_userinfo_endpoint(self) -> str:
"""Get the userinfo endpoint URL.
Returns:
Userinfo endpoint URL.
Raises:
KeycloakError: If endpoint is not found.
"""
config = self.get_oidc_configuration()
endpoint = config.get("userinfo_endpoint")
if not isinstance(endpoint, str):
raise KeycloakError("Userinfo endpoint not found in OIDC configuration")
return endpoint
[docs]
def get_logout_endpoint(self) -> str:
"""Get the logout endpoint URL.
Returns:
Logout endpoint URL.
Raises:
KeycloakError: If endpoint is not found.
"""
config = self.get_oidc_configuration()
endpoint = config.get("end_session_endpoint")
if not isinstance(endpoint, str):
raise KeycloakError("Logout endpoint not found in OIDC configuration")
return endpoint
[docs]
def get_revocation_endpoint(self) -> str:
"""Get the token revocation endpoint URL (RFC 7009).
Tries ``revocation_endpoint`` from OIDC config first, then falls back
to the standard Keycloak path.
Returns:
Revocation endpoint URL.
"""
config = self.get_oidc_configuration()
endpoint = config.get("revocation_endpoint")
if isinstance(endpoint, str):
return endpoint
return f"{self.keycloak_base_url}/protocol/openid-connect/revoke"
[docs]
def exchange_code_for_tokens(
self,
code: str,
redirect_uri: str,
code_verifier: str,
) -> TokenResponse:
"""Exchange an authorization code for access and refresh tokens.
Args:
code: The authorization code from the callback.
redirect_uri: The redirect URI used in the authorization request.
code_verifier: The PKCE code verifier.
Returns:
TokenResponse containing access and refresh tokens.
Raises:
KeycloakError: If the token exchange fails.
"""
token_endpoint = self.get_token_endpoint()
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": self.client_id,
"code_verifier": code_verifier,
}
try:
response = httpx.post(token_endpoint, data=data, timeout=self.TIMEOUT)
_ = response.raise_for_status()
result: object = response.json()
if not isinstance(result, dict):
raise KeycloakError("Invalid token response format")
return TokenResponse.model_validate(result)
except ValidationError as e:
raise KeycloakError(f"Invalid token response: {e}") from e
except httpx.HTTPStatusError as e:
error_detail = e.response.text
raise KeycloakError(f"Failed to exchange code for tokens: {e.response.status_code} - {error_detail}") from e
except httpx.HTTPError as e:
raise KeycloakError(f"Failed to exchange code for tokens: {e}") from e
[docs]
def authenticate_with_password(
self,
username: str,
password: str,
totp: Optional[str] = None,
) -> TokenResponse:
"""Authenticate using Resource Owner Password Credentials Grant.
Args:
username: The user's username.
password: The user's password.
totp: Optional TOTP code for 2FA.
Returns:
TokenResponse containing access and refresh tokens.
Raises:
KeycloakError: If the authentication fails.
"""
token_endpoint = self.get_token_endpoint()
data: dict[str, str] = {
"grant_type": "password",
"username": username,
"password": password,
"client_id": self.client_id,
}
if totp:
data["totp"] = totp
try:
response = httpx.post(token_endpoint, data=data, timeout=self.TIMEOUT)
_ = response.raise_for_status()
result: object = response.json()
if not isinstance(result, dict):
raise KeycloakError("Invalid token response format")
return TokenResponse.model_validate(result)
except ValidationError as e:
raise KeycloakError(f"Invalid token response: {e}") from e
except httpx.HTTPStatusError as e:
error_detail = e.response.text
raise KeycloakError(
f"Failed to authenticate with password: {e.response.status_code} - {error_detail}"
) from e
except httpx.HTTPError as e:
raise KeycloakError(f"Failed to authenticate with password: {e}") from e
[docs]
def authenticate_with_client_credentials(
self,
client_id: str,
client_secret: str,
) -> TokenResponse:
"""Authenticate using Client Credentials Grant.
Args:
client_id: The OAuth client ID.
client_secret: The OAuth client secret.
Returns:
TokenResponse containing access token (no refresh token for this grant).
Raises:
KeycloakError: If the authentication fails.
"""
token_endpoint = self.get_token_endpoint()
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
}
try:
response = httpx.post(token_endpoint, data=data, timeout=self.TIMEOUT)
_ = response.raise_for_status()
result: object = response.json()
if not isinstance(result, dict):
raise KeycloakError("Invalid token response format")
return TokenResponse.model_validate(result)
except ValidationError as e:
raise KeycloakError(f"Invalid token response: {e}") from e
except httpx.HTTPStatusError as e:
error_detail = e.response.text
raise KeycloakError(
f"Failed to authenticate with client credentials: {e.response.status_code} - {error_detail}"
) from e
except httpx.HTTPError as e:
raise KeycloakError(f"Failed to authenticate with client credentials: {e}") from e
[docs]
def refresh_access_token(self, refresh_token: str) -> TokenResponse:
"""Refresh the access token using a refresh token.
Args:
refresh_token: The refresh token.
Returns:
TokenResponse containing new tokens.
Raises:
KeycloakError: If the token refresh fails.
"""
token_endpoint = self.get_token_endpoint()
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": self.client_id,
}
try:
response = httpx.post(token_endpoint, data=data, timeout=self.TIMEOUT)
_ = response.raise_for_status()
result: object = response.json()
if not isinstance(result, dict):
raise KeycloakError("Invalid token response format")
return TokenResponse.model_validate(result)
except ValidationError as e:
raise KeycloakError(f"Invalid token response: {e}") from e
except httpx.HTTPStatusError as e:
if e.response.status_code == 400:
raise KeycloakError("Refresh token expired or invalid. Please log in again.") from e
error_detail = e.response.text
raise KeycloakError(f"Failed to refresh token: {e.response.status_code} - {error_detail}") from e
except httpx.HTTPError as e:
raise KeycloakError(f"Failed to refresh token: {e}") from e
[docs]
def get_user_info(self, access_token: str) -> dict[str, object]:
"""Fetch user information from Keycloak's userinfo endpoint.
Args:
access_token: The access token.
Returns:
Dictionary containing user information.
Raises:
KeycloakError: If the request fails.
"""
userinfo_endpoint = self.get_userinfo_endpoint()
headers = {"Authorization": f"Bearer {access_token}"}
try:
response = httpx.get(userinfo_endpoint, headers=headers, timeout=self.TIMEOUT)
_ = response.raise_for_status()
result: object = response.json()
if not isinstance(result, dict):
raise KeycloakError("Invalid userinfo response format")
user_info: dict[str, object] = result # pyright: ignore[reportUnknownVariableType] # ty: ignore[invalid-assignment]
return user_info
except httpx.HTTPError as e:
raise KeycloakError(f"Failed to fetch user info: {e}") from e
[docs]
def logout(self, refresh_token: str) -> None:
"""Logout by revoking the refresh token (RFC 7009).
Args:
refresh_token: The refresh token to revoke.
Raises:
KeycloakError: If the revocation fails.
"""
revocation_endpoint = self.get_revocation_endpoint()
data = {
"client_id": self.client_id,
"token": refresh_token,
"token_type_hint": "refresh_token",
}
try:
response = httpx.post(revocation_endpoint, data=data, timeout=self.TIMEOUT)
_ = response.raise_for_status()
except httpx.HTTPStatusError as e:
error_detail = e.response.text
raise KeycloakError(f"Failed to revoke token: {e.response.status_code} - {error_detail}") from e
except httpx.HTTPError as e:
raise KeycloakError(f"Failed to revoke token: {e}") from e
[docs]
def request_device_code(self) -> DeviceAuthResponse:
"""Request a device code for the OAuth 2.0 Device Authorization Grant (RFC 8628).
Returns:
DeviceAuthResponse containing device_code, user_code, and verification URIs.
Raises:
KeycloakError: If the request fails.
"""
device_auth_url = f"{self.keycloak_base_url}/protocol/openid-connect/auth/device"
try:
response = httpx.post(
device_auth_url,
data={"client_id": self.client_id},
timeout=self.TIMEOUT,
)
_ = response.raise_for_status()
try:
result: object = response.json()
except ValueError as e:
raise KeycloakError(f"Non-JSON response from device endpoint: {response.text[:200]}") from e
if not isinstance(result, dict):
raise KeycloakError("Invalid device authorization response format")
return DeviceAuthResponse.model_validate(result)
except ValidationError as e:
raise KeycloakError(f"Invalid device authorization response: {e}") from e
except httpx.HTTPStatusError as e:
detail = e.response.text
msg = f"Device authorization request failed: {e.response.status_code} - {detail}"
raise KeycloakError(msg) from e
except httpx.HTTPError as e:
raise KeycloakError(f"Device authorization request failed: {e}") from e
[docs]
def poll_device_token(self, device_code: str) -> TokenResponse | DevicePollStatus:
"""Poll the token endpoint for a device code grant.
Returns the token response if the user has authorized, or a
DevicePollStatus indicating the client should keep polling.
Args:
device_code: The device code from the device authorization response.
Returns:
TokenResponse if authorized, or DevicePollStatus if still pending.
Raises:
KeycloakError: If the polling encounters a terminal error (expired, denied, etc.).
"""
token_endpoint = self.get_token_endpoint()
data = {
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device_code,
"client_id": self.client_id,
}
try:
response = httpx.post(token_endpoint, data=data, timeout=self.TIMEOUT)
except httpx.HTTPError as e:
raise KeycloakError(f"Failed to poll for device token: {e}") from e
try:
result: object = response.json()
except ValueError as e:
raise KeycloakError(
f"Non-JSON response from token endpoint ({response.status_code}): {response.text[:200]}"
) from e
if not isinstance(result, dict):
raise KeycloakError("Invalid token response format")
response_data: dict[str, object] = result # pyright: ignore[reportUnknownVariableType] # ty: ignore[invalid-assignment]
if response.status_code == 200:
try:
return TokenResponse.model_validate(response_data)
except ValidationError as e:
raise KeycloakError(f"Invalid token response: {e}") from e
error = str(response_data.get("error", ""))
if error == "authorization_pending":
return DevicePollStatus.AUTHORIZATION_PENDING
if error == "slow_down":
return DevicePollStatus.SLOW_DOWN
error_description = str(response_data.get("error_description", error))
if error == "expired_token":
raise KeycloakError("Device code has expired. Please try again.")
if error == "access_denied":
raise KeycloakError("Authorization request was denied by the user.")
raise KeycloakError(f"Device authorization failed: {error_description}")