Source code for kelvin.sdk.services.auth.auth_service

"""AuthService - Pure authentication service for OAuth flows.

This service handles OAuth flows and token operations but does NOT
store credentials. The calling code (Commands layer) is responsible
for deciding when/if to persist tokens using CredentialStore.
"""

from __future__ import annotations

import time
import webbrowser
from collections.abc import Callable
from typing import Final, Optional

import httpx

from kelvin.sdk.services.auth.callback_server import CallbackServer
from kelvin.sdk.services.auth.errors import KeycloakError, OAuthFlowError
from kelvin.sdk.services.auth.keycloak_client import KeycloakClient
from kelvin.sdk.services.auth.models import DeviceAuthResponse, DevicePollStatus, TokenResponse
from kelvin.sdk.services.auth.pkce import (
    build_authorization_url,
    generate_code_challenge,
    generate_code_verifier,
    generate_state,
)
from kelvin.sdk.services.session_models import PlatformMetadata


[docs] class AuthService: """Pure authentication service - returns tokens, no storage. This service handles OAuth flows and token operations but does NOT store credentials. The calling code (Commands layer) is responsible for deciding when/if to persist tokens using CredentialStore. """ REALM: Final[str] = "kelvin" BROWSER_CLIENT_ID: Final[str] = "kelvin-sdk" USER_CLIENT_ID: Final[str] = "kelvin-client" CALLBACK_SERVER_PORT: Final[int] = 38638
[docs] def __init__(self) -> None: """Initialize AuthService. No dependencies required."""
[docs] def login_browser( self, url: str, open_browser: bool = True, on_auth_url: Optional[Callable[[str], None]] = None, ) -> TokenResponse: """Start browser-based OAuth flow, return tokens. Args: url: The base URL of the Kelvin platform. open_browser: Whether to automatically open the browser. on_auth_url: Optional callback invoked with the authorization URL before waiting for the browser callback. Returns: TokenResponse containing access and refresh tokens. Raises: OAuthFlowError: If the OAuth flow fails. """ keycloak_client = KeycloakClient( base_url=url, realm=self.REALM, client_id=self.BROWSER_CLIENT_ID, ) callback_server = CallbackServer(port=self.CALLBACK_SERVER_PORT) redirect_uri = callback_server.get_callback_url() # Generate PKCE parameters code_verifier = generate_code_verifier() code_challenge = generate_code_challenge(code_verifier) state = generate_state() # Get the authorization endpoint try: authorization_endpoint = keycloak_client.get_authorization_endpoint() except KeycloakError as e: raise OAuthFlowError(f"Failed to get authorization endpoint: {e}") from e # Build the authorization URL auth_url = build_authorization_url( authorization_endpoint=authorization_endpoint, client_id=self.BROWSER_CLIENT_ID, redirect_uri=redirect_uri, code_challenge=code_challenge, state=state, ) callback_server.start() try: if open_browser: _ = webbrowser.open(auth_url) if on_auth_url is not None: on_auth_url(auth_url) # Wait for the callback callback_result = callback_server.wait_for_callback() # Verify the state parameter (CSRF protection) if callback_result.state != state: raise OAuthFlowError("State mismatch in OAuth callback") authorization_code = callback_result.code if not authorization_code: raise OAuthFlowError("No authorization code received") # Exchange the authorization code for tokens try: token_data = keycloak_client.exchange_code_for_tokens( code=authorization_code, redirect_uri=redirect_uri, code_verifier=code_verifier, ) except KeycloakError as e: raise OAuthFlowError(f"Token exchange failed: {e}") from e return token_data finally: callback_server.stop()
[docs] def login_device_code( self, url: str, open_browser: bool = True, on_device_code: Optional[Callable[[DeviceAuthResponse], None]] = None, ) -> TokenResponse: """Start the OAuth 2.0 Device Authorization Grant flow (RFC 8628). Args: url: The base URL of the Kelvin platform. open_browser: Whether to automatically open the verification URI in the browser. on_device_code: Optional callback invoked with the device auth response so the caller can display the user code and verification URI. Returns: TokenResponse containing access and refresh tokens. Raises: OAuthFlowError: If the device code flow fails. """ keycloak_client = KeycloakClient( base_url=url, realm=self.REALM, client_id=self.BROWSER_CLIENT_ID, ) try: device_auth = keycloak_client.request_device_code() except KeycloakError as e: raise OAuthFlowError(str(e)) from e if on_device_code is not None: on_device_code(device_auth) if open_browser: target_url = device_auth.verification_uri_complete or device_auth.verification_uri _ = webbrowser.open(target_url) interval = device_auth.interval deadline = time.time() + device_auth.expires_in while time.time() < deadline: time.sleep(interval) try: result = keycloak_client.poll_device_token(device_auth.device_code) except KeycloakError as e: raise OAuthFlowError(f"Device authorization failed: {e}") from e if isinstance(result, TokenResponse): return result if result == DevicePollStatus.SLOW_DOWN: interval += 5 raise OAuthFlowError("Device code expired before authorization was completed.")
[docs] def get_authorization_url(self, url: str) -> str: """Get the authorization URL for manual browser-based login. This is useful when automatic browser opening is not desired. Args: url: The base URL of the Kelvin platform. Returns: The authorization URL to open in a browser. Raises: OAuthFlowError: If fetching the authorization endpoint fails. """ keycloak_client = KeycloakClient( base_url=url, realm=self.REALM, client_id=self.BROWSER_CLIENT_ID, ) redirect_uri = f"http://localhost:{self.CALLBACK_SERVER_PORT}/callback" code_verifier = generate_code_verifier() code_challenge = generate_code_challenge(code_verifier) state = generate_state() try: authorization_endpoint = keycloak_client.get_authorization_endpoint() except KeycloakError as e: raise OAuthFlowError(f"Failed to get authorization endpoint: {e}") from e return build_authorization_url( authorization_endpoint=authorization_endpoint, client_id=self.BROWSER_CLIENT_ID, redirect_uri=redirect_uri, code_challenge=code_challenge, state=state, )
[docs] def login_password( self, url: str, username: str, password: str, totp: Optional[str] = None, ) -> TokenResponse: """Authenticate with username/password, return tokens. Args: url: The base URL of the Kelvin platform. username: The user's username. password: The user's password. totp: Optional TOTP code for 2FA. Returns: TokenResponse containing access and refresh tokens. Raises: OAuthFlowError: If authentication fails. """ try: keycloak_client = KeycloakClient( base_url=url, realm=self.REALM, client_id=self.USER_CLIENT_ID, ) return keycloak_client.authenticate_with_password( username=username, password=password, totp=totp, ) except KeycloakError as e: raise OAuthFlowError(f"Password authentication failed: {e}") from e
[docs] def login_client_credentials( self, url: str, client_id: str, client_secret: str, ) -> TokenResponse: """Authenticate with client credentials, return tokens. Args: url: The base URL of the Kelvin platform. client_id: The OAuth client ID. client_secret: The OAuth client secret. Returns: TokenResponse containing access token (no refresh token for this grant). Raises: OAuthFlowError: If authentication fails. """ try: keycloak_client = KeycloakClient( base_url=url, realm=self.REALM, client_id=self.USER_CLIENT_ID, ) return keycloak_client.authenticate_with_client_credentials( client_id=client_id, client_secret=client_secret, ) except KeycloakError as e: raise OAuthFlowError(f"Client credentials authentication failed: {e}") from e
[docs] def refresh_tokens(self, url: str, refresh_token: str, oauth_client_id: Optional[str] = None) -> TokenResponse: """Refresh expired tokens, return new tokens. Args: url: The base URL of the Kelvin platform. refresh_token: The refresh token. oauth_client_id: The Keycloak OAuth client ID that was used to obtain the tokens. Falls back to USER_CLIENT_ID if not provided. Returns: TokenResponse containing new tokens. Raises: OAuthFlowError: If token refresh fails. """ try: keycloak_client = KeycloakClient( base_url=url, realm=self.REALM, client_id=oauth_client_id or self.USER_CLIENT_ID, ) return keycloak_client.refresh_access_token(refresh_token=refresh_token) except KeycloakError as e: raise OAuthFlowError(f"Token refresh failed: {e}") from e
[docs] def verify_token(self, url: str, access_token: str) -> dict[str, object]: """Verify token validity, return user info. Args: url: The base URL of the Kelvin platform. access_token: The access token to verify. Returns: Dictionary containing user information (sub, email, name, etc.). Raises: OAuthFlowError: If the token is invalid or verification fails. """ try: keycloak_client = KeycloakClient( base_url=url, realm=self.REALM, client_id=self.USER_CLIENT_ID, ) return keycloak_client.get_user_info(access_token) except KeycloakError as e: raise OAuthFlowError(f"Failed to verify token: {e}") from e
[docs] def logout(self, url: str, refresh_token: str, oauth_client_id: Optional[str] = None) -> None: """Revoke tokens on the server. Args: url: The base URL of the Kelvin platform. refresh_token: The refresh token to revoke. oauth_client_id: The Keycloak OAuth client ID that was used to obtain the tokens. Falls back to USER_CLIENT_ID if not provided. Raises: OAuthFlowError: If logout fails. """ try: keycloak_client = KeycloakClient( base_url=url, realm=self.REALM, client_id=oauth_client_id or self.USER_CLIENT_ID, ) keycloak_client.logout(refresh_token=refresh_token) except KeycloakError as e: raise OAuthFlowError(f"Logout failed: {e}") from e
[docs] def fetch_platform_metadata(self, url: str) -> PlatformMetadata: """Fetch platform metadata from the /metadata endpoint. Args: url: The base URL of the Kelvin platform. Returns: PlatformMetadata containing docker registry info and other platform details. Raises: OAuthFlowError: If fetching metadata fails. """ try: with httpx.Client(base_url=url, timeout=30.0) as client: response = client.get("/metadata") _ = response.raise_for_status() metadata_json: dict[str, object] = response.json() return PlatformMetadata.from_api_response(url, metadata_json) except httpx.HTTPError as e: raise OAuthFlowError(f"Failed to fetch platform metadata: {e}") from e