Source code for kelvin.sdk.services.credential_store

"""Credential store service for secure OAuth token storage.

This service manages secure storage and retrieval of OAuth tokens using keyring.
It is a leaf service with no dependencies.
"""

from __future__ import annotations

import json
import time
from dataclasses import asdict, dataclass
from typing import TYPE_CHECKING, Final, Optional, cast

import keyring
import structlog

from kelvin.sdk.exceptions import CLIError

if TYPE_CHECKING:
    from kelvin.sdk.services.auth import TokenResponse

logger = cast(structlog.stdlib.BoundLogger, structlog.get_logger(__name__))


# ============ Exceptions ============


[docs] class CredentialStoreError(CLIError): """Base exception for credential store operations.""" exit_code: int = 77 # EX_NOPERM
[docs] class CredentialStorageError(CredentialStoreError): """Failed to store credentials.""" pass
[docs] class CredentialRetrievalError(CredentialStoreError): """Failed to retrieve credentials.""" pass
# ============ Models ============
[docs] @dataclass class TokenCredentials: """OAuth token credentials.""" access_token: str access_expires_at: int refresh_token: Optional[str] = None refresh_expires_at: Optional[int] = None oauth_client_id: Optional[str] = None @property def is_service_account(self) -> bool: """True if these credentials were obtained via client credentials grant. Service account tokens have no refresh token. The Keycloak userinfo endpoint rejects them, so callers should skip remote token validation. """ return self.refresh_token is None
# ============ Service ============
[docs] class CredentialStore: """Secure credential storage using keyring. This service handles storage and retrieval of OAuth tokens in the system's secure credential store (keyring). It does not perform authentication - that responsibility belongs to AuthService. Example ------- >>> store = CredentialStore() >>> creds = store.retrieve("https://api.example.com") >>> if creds and not store.is_token_expired(creds): ... print("Valid credentials found") """ KEYRING_SERVICE_NAME: Final[str] = "kelvin:kelvin-sdk" EXPIRATION_THRESHOLD: Final[int] = 60 # seconds before actual expiry to consider expired
[docs] def store( self, url: str, access_token: str, expires_in: int, refresh_token: Optional[str] = None, refresh_expires_in: Optional[int] = None, oauth_client_id: Optional[str] = None, ) -> TokenCredentials: """Store OAuth tokens in the secure keyring. Parameters ---------- url The platform URL to associate with these credentials. access_token The OAuth access token. expires_in Number of seconds until the access token expires. refresh_token Optional OAuth refresh token. refresh_expires_in Number of seconds until the refresh token expires. Returns ------- TokenCredentials The stored credentials with calculated expiration timestamps. Raises ------ CredentialStorageError If storage fails. """ try: current_time = int(time.time()) # Calculate expiration timestamps with threshold access_expires_at = current_time + expires_in - self.EXPIRATION_THRESHOLD refresh_expires_at: Optional[int] = None if refresh_token and refresh_expires_in: refresh_expires_at = current_time + refresh_expires_in - self.EXPIRATION_THRESHOLD credentials = TokenCredentials( access_token=access_token, access_expires_at=access_expires_at, refresh_token=refresh_token, refresh_expires_at=refresh_expires_at, oauth_client_id=oauth_client_id, ) credentials_json = json.dumps(asdict(credentials)) keyring.set_password(self.KEYRING_SERVICE_NAME, url, credentials_json) logger.debug("stored credentials", url=url) return credentials except Exception as e: logger.warning("failed to store credentials", url=url, error=str(e)) raise CredentialStorageError(f"Failed to store credentials: {e}") from e
[docs] def store_token_response( self, url: str, tokens: TokenResponse, oauth_client_id: Optional[str] = None ) -> TokenCredentials: """Store OAuth tokens from a TokenResponse. Parameters ---------- url The platform URL to associate with these credentials. tokens The TokenResponse from AuthService. oauth_client_id The Keycloak OAuth client ID used to obtain these tokens. Returns ------- TokenCredentials The stored credentials with calculated expiration timestamps. Raises ------ CredentialStorageError If storage fails. """ return self.store( url=url, access_token=tokens.access_token, expires_in=tokens.expires_in, refresh_token=tokens.refresh_token, refresh_expires_in=tokens.refresh_expires_in, oauth_client_id=oauth_client_id, )
[docs] def retrieve(self, url: str) -> Optional[TokenCredentials]: """Retrieve OAuth tokens from the secure keyring. Parameters ---------- url The platform URL to retrieve credentials for. Returns ------- Optional[TokenCredentials] The stored credentials if found, None otherwise. """ credentials_json = keyring.get_password(self.KEYRING_SERVICE_NAME, url) if not credentials_json: logger.debug("no credentials found", url=url) return None try: data: object = json.loads(credentials_json) # pyright: ignore[reportAny] except json.JSONDecodeError as e: logger.warning("failed to parse credentials", url=url, error=str(e)) raise CredentialRetrievalError(f"Failed to parse stored credentials: {e}") from e if not isinstance(data, dict): logger.warning("invalid credentials format", url=url) raise CredentialRetrievalError("Invalid credentials format: expected dict") return self._parse_credentials(data, url) # pyright: ignore[reportUnknownArgumentType]
[docs] def clear(self, url: str) -> bool: """Clear stored credentials for a specific URL. Parameters ---------- url The platform URL to clear credentials for. Returns ------- bool True if credentials were cleared, False if none existed. """ try: existing = keyring.get_password(self.KEYRING_SERVICE_NAME, url) if not existing: logger.debug("no credentials to clear", url=url) return False keyring.delete_password(self.KEYRING_SERVICE_NAME, url) logger.debug("cleared credentials", url=url) return True except Exception as e: logger.warning("failed to clear credentials", url=url, error=str(e)) return False
def _parse_credentials(self, data: dict[str, object], url: str) -> Optional[TokenCredentials]: """Parse and validate credentials data from JSON. Parameters ---------- data The parsed JSON data as a dict. url The URL for logging purposes. Returns ------- Optional[TokenCredentials] Parsed credentials if valid, None otherwise. """ # Validate required fields access_token = data.get("access_token") access_expires_at = data.get("access_expires_at") if not isinstance(access_token, str) or not isinstance(access_expires_at, int): logger.warning("invalid credentials data types", url=url) raise CredentialRetrievalError("Invalid credentials: missing or invalid required fields") refresh_token = data.get("refresh_token") refresh_expires_at = data.get("refresh_expires_at") oauth_client_id = data.get("oauth_client_id") credentials = TokenCredentials( access_token=access_token, access_expires_at=access_expires_at, refresh_token=refresh_token if isinstance(refresh_token, str) else None, refresh_expires_at=refresh_expires_at if isinstance(refresh_expires_at, int) else None, oauth_client_id=oauth_client_id if isinstance(oauth_client_id, str) else None, ) logger.debug("retrieved credentials", url=url) return credentials
[docs] def is_token_expired(self, credentials: TokenCredentials) -> bool: """Check if the access token is expired. Parameters ---------- credentials The credentials to check. Returns ------- bool True if the access token is expired, False otherwise. """ current_time = int(time.time()) return current_time >= credentials.access_expires_at
[docs] def is_refresh_token_expired(self, credentials: TokenCredentials) -> bool: """Check if the refresh token is expired. Parameters ---------- credentials The credentials to check. Returns ------- bool True if no refresh token or if it's expired, False otherwise. """ if not credentials.refresh_token or not credentials.refresh_expires_at: return True current_time = int(time.time()) return current_time >= credentials.refresh_expires_at