"""AuthService - Pure authentication service for OAuth flows.
This service handles OAuth flows and token operations but does NOT
store credentials. The calling code (Commands layer) is responsible
for deciding when/if to persist tokens using CredentialStore.
"""
from __future__ import annotations
import time
import webbrowser
from collections.abc import Callable
from typing import Final, Optional
import httpx
from kelvin.sdk.services.auth.callback_server import CallbackServer
from kelvin.sdk.services.auth.errors import KeycloakError, OAuthFlowError
from kelvin.sdk.services.auth.keycloak_client import KeycloakClient
from kelvin.sdk.services.auth.models import DeviceAuthResponse, DevicePollStatus, TokenResponse
from kelvin.sdk.services.auth.pkce import (
build_authorization_url,
generate_code_challenge,
generate_code_verifier,
generate_state,
)
from kelvin.sdk.services.session_models import PlatformMetadata
[docs]
class AuthService:
"""Pure authentication service - returns tokens, no storage.
This service handles OAuth flows and token operations but does NOT
store credentials. The calling code (Commands layer) is responsible
for deciding when/if to persist tokens using CredentialStore.
"""
REALM: Final[str] = "kelvin"
BROWSER_CLIENT_ID: Final[str] = "kelvin-sdk"
USER_CLIENT_ID: Final[str] = "kelvin-client"
CALLBACK_SERVER_PORT: Final[int] = 38638
[docs]
def __init__(self) -> None:
"""Initialize AuthService. No dependencies required."""
[docs]
def login_browser(
self,
url: str,
open_browser: bool = True,
on_auth_url: Optional[Callable[[str], None]] = None,
) -> TokenResponse:
"""Start browser-based OAuth flow, return tokens.
Args:
url: The base URL of the Kelvin platform.
open_browser: Whether to automatically open the browser.
on_auth_url: Optional callback invoked with the authorization URL
before waiting for the browser callback.
Returns:
TokenResponse containing access and refresh tokens.
Raises:
OAuthFlowError: If the OAuth flow fails.
"""
keycloak_client = KeycloakClient(
base_url=url,
realm=self.REALM,
client_id=self.BROWSER_CLIENT_ID,
)
callback_server = CallbackServer(port=self.CALLBACK_SERVER_PORT)
redirect_uri = callback_server.get_callback_url()
# Generate PKCE parameters
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
state = generate_state()
# Get the authorization endpoint
try:
authorization_endpoint = keycloak_client.get_authorization_endpoint()
except KeycloakError as e:
raise OAuthFlowError(f"Failed to get authorization endpoint: {e}") from e
# Build the authorization URL
auth_url = build_authorization_url(
authorization_endpoint=authorization_endpoint,
client_id=self.BROWSER_CLIENT_ID,
redirect_uri=redirect_uri,
code_challenge=code_challenge,
state=state,
)
callback_server.start()
try:
if open_browser:
_ = webbrowser.open(auth_url)
if on_auth_url is not None:
on_auth_url(auth_url)
# Wait for the callback
callback_result = callback_server.wait_for_callback()
# Verify the state parameter (CSRF protection)
if callback_result.state != state:
raise OAuthFlowError("State mismatch in OAuth callback")
authorization_code = callback_result.code
if not authorization_code:
raise OAuthFlowError("No authorization code received")
# Exchange the authorization code for tokens
try:
token_data = keycloak_client.exchange_code_for_tokens(
code=authorization_code,
redirect_uri=redirect_uri,
code_verifier=code_verifier,
)
except KeycloakError as e:
raise OAuthFlowError(f"Token exchange failed: {e}") from e
return token_data
finally:
callback_server.stop()
[docs]
def login_device_code(
self,
url: str,
open_browser: bool = True,
on_device_code: Optional[Callable[[DeviceAuthResponse], None]] = None,
) -> TokenResponse:
"""Start the OAuth 2.0 Device Authorization Grant flow (RFC 8628).
Args:
url: The base URL of the Kelvin platform.
open_browser: Whether to automatically open the verification URI in the browser.
on_device_code: Optional callback invoked with the device auth response
so the caller can display the user code and verification URI.
Returns:
TokenResponse containing access and refresh tokens.
Raises:
OAuthFlowError: If the device code flow fails.
"""
keycloak_client = KeycloakClient(
base_url=url,
realm=self.REALM,
client_id=self.BROWSER_CLIENT_ID,
)
try:
device_auth = keycloak_client.request_device_code()
except KeycloakError as e:
raise OAuthFlowError(str(e)) from e
if on_device_code is not None:
on_device_code(device_auth)
if open_browser:
target_url = device_auth.verification_uri_complete or device_auth.verification_uri
_ = webbrowser.open(target_url)
interval = device_auth.interval
deadline = time.time() + device_auth.expires_in
while time.time() < deadline:
time.sleep(interval)
try:
result = keycloak_client.poll_device_token(device_auth.device_code)
except KeycloakError as e:
raise OAuthFlowError(f"Device authorization failed: {e}") from e
if isinstance(result, TokenResponse):
return result
if result == DevicePollStatus.SLOW_DOWN:
interval += 5
raise OAuthFlowError("Device code expired before authorization was completed.")
[docs]
def get_authorization_url(self, url: str) -> str:
"""Get the authorization URL for manual browser-based login.
This is useful when automatic browser opening is not desired.
Args:
url: The base URL of the Kelvin platform.
Returns:
The authorization URL to open in a browser.
Raises:
OAuthFlowError: If fetching the authorization endpoint fails.
"""
keycloak_client = KeycloakClient(
base_url=url,
realm=self.REALM,
client_id=self.BROWSER_CLIENT_ID,
)
redirect_uri = f"http://localhost:{self.CALLBACK_SERVER_PORT}/callback"
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
state = generate_state()
try:
authorization_endpoint = keycloak_client.get_authorization_endpoint()
except KeycloakError as e:
raise OAuthFlowError(f"Failed to get authorization endpoint: {e}") from e
return build_authorization_url(
authorization_endpoint=authorization_endpoint,
client_id=self.BROWSER_CLIENT_ID,
redirect_uri=redirect_uri,
code_challenge=code_challenge,
state=state,
)
[docs]
def login_password(
self,
url: str,
username: str,
password: str,
totp: Optional[str] = None,
) -> TokenResponse:
"""Authenticate with username/password, return tokens.
Args:
url: The base URL of the Kelvin platform.
username: The user's username.
password: The user's password.
totp: Optional TOTP code for 2FA.
Returns:
TokenResponse containing access and refresh tokens.
Raises:
OAuthFlowError: If authentication fails.
"""
try:
keycloak_client = KeycloakClient(
base_url=url,
realm=self.REALM,
client_id=self.USER_CLIENT_ID,
)
return keycloak_client.authenticate_with_password(
username=username,
password=password,
totp=totp,
)
except KeycloakError as e:
raise OAuthFlowError(f"Password authentication failed: {e}") from e
[docs]
def login_client_credentials(
self,
url: str,
client_id: str,
client_secret: str,
) -> TokenResponse:
"""Authenticate with client credentials, return tokens.
Args:
url: The base URL of the Kelvin platform.
client_id: The OAuth client ID.
client_secret: The OAuth client secret.
Returns:
TokenResponse containing access token (no refresh token for this grant).
Raises:
OAuthFlowError: If authentication fails.
"""
try:
keycloak_client = KeycloakClient(
base_url=url,
realm=self.REALM,
client_id=self.USER_CLIENT_ID,
)
return keycloak_client.authenticate_with_client_credentials(
client_id=client_id,
client_secret=client_secret,
)
except KeycloakError as e:
raise OAuthFlowError(f"Client credentials authentication failed: {e}") from e
[docs]
def refresh_tokens(self, url: str, refresh_token: str, oauth_client_id: Optional[str] = None) -> TokenResponse:
"""Refresh expired tokens, return new tokens.
Args:
url: The base URL of the Kelvin platform.
refresh_token: The refresh token.
oauth_client_id: The Keycloak OAuth client ID that was used to obtain the tokens.
Falls back to USER_CLIENT_ID if not provided.
Returns:
TokenResponse containing new tokens.
Raises:
OAuthFlowError: If token refresh fails.
"""
try:
keycloak_client = KeycloakClient(
base_url=url,
realm=self.REALM,
client_id=oauth_client_id or self.USER_CLIENT_ID,
)
return keycloak_client.refresh_access_token(refresh_token=refresh_token)
except KeycloakError as e:
raise OAuthFlowError(f"Token refresh failed: {e}") from e
[docs]
def verify_token(self, url: str, access_token: str) -> dict[str, object]:
"""Verify token validity, return user info.
Args:
url: The base URL of the Kelvin platform.
access_token: The access token to verify.
Returns:
Dictionary containing user information (sub, email, name, etc.).
Raises:
OAuthFlowError: If the token is invalid or verification fails.
"""
try:
keycloak_client = KeycloakClient(
base_url=url,
realm=self.REALM,
client_id=self.USER_CLIENT_ID,
)
return keycloak_client.get_user_info(access_token)
except KeycloakError as e:
raise OAuthFlowError(f"Failed to verify token: {e}") from e
[docs]
def logout(self, url: str, refresh_token: str, oauth_client_id: Optional[str] = None) -> None:
"""Revoke tokens on the server.
Args:
url: The base URL of the Kelvin platform.
refresh_token: The refresh token to revoke.
oauth_client_id: The Keycloak OAuth client ID that was used to obtain the tokens.
Falls back to USER_CLIENT_ID if not provided.
Raises:
OAuthFlowError: If logout fails.
"""
try:
keycloak_client = KeycloakClient(
base_url=url,
realm=self.REALM,
client_id=oauth_client_id or self.USER_CLIENT_ID,
)
keycloak_client.logout(refresh_token=refresh_token)
except KeycloakError as e:
raise OAuthFlowError(f"Logout failed: {e}") from e