"""Credential store service for secure OAuth token storage.
This service manages secure storage and retrieval of OAuth tokens using keyring.
It is a leaf service with no dependencies.
"""
from __future__ import annotations
import json
import time
from dataclasses import asdict, dataclass
from typing import TYPE_CHECKING, Final, Optional, cast
import keyring
import structlog
from kelvin.sdk.exceptions import CLIError
if TYPE_CHECKING:
from kelvin.sdk.services.auth import TokenResponse
logger = cast(structlog.stdlib.BoundLogger, structlog.get_logger(__name__))
# ============ Exceptions ============
[docs]
class CredentialStoreError(CLIError):
"""Base exception for credential store operations."""
exit_code: int = 77 # EX_NOPERM
[docs]
class CredentialStorageError(CredentialStoreError):
"""Failed to store credentials."""
pass
[docs]
class CredentialRetrievalError(CredentialStoreError):
"""Failed to retrieve credentials."""
pass
# ============ Models ============
[docs]
@dataclass
class TokenCredentials:
"""OAuth token credentials."""
access_token: str
access_expires_at: int
refresh_token: Optional[str] = None
refresh_expires_at: Optional[int] = None
oauth_client_id: Optional[str] = None
@property
def is_service_account(self) -> bool:
"""True if these credentials were obtained via client credentials grant.
Service account tokens have no refresh token. The Keycloak userinfo
endpoint rejects them, so callers should skip remote token validation.
"""
return self.refresh_token is None
# ============ Service ============
[docs]
class CredentialStore:
"""Secure credential storage using keyring.
This service handles storage and retrieval of OAuth tokens in the system's
secure credential store (keyring). It does not perform authentication -
that responsibility belongs to AuthService.
Example
-------
>>> store = CredentialStore()
>>> creds = store.retrieve("https://api.example.com")
>>> if creds and not store.is_token_expired(creds):
... print("Valid credentials found")
"""
KEYRING_SERVICE_NAME: Final[str] = "kelvin:kelvin-sdk"
EXPIRATION_THRESHOLD: Final[int] = 60 # seconds before actual expiry to consider expired
[docs]
def store(
self,
url: str,
access_token: str,
expires_in: int,
refresh_token: Optional[str] = None,
refresh_expires_in: Optional[int] = None,
oauth_client_id: Optional[str] = None,
) -> TokenCredentials:
"""Store OAuth tokens in the secure keyring.
Parameters
----------
url
The platform URL to associate with these credentials.
access_token
The OAuth access token.
expires_in
Number of seconds until the access token expires.
refresh_token
Optional OAuth refresh token.
refresh_expires_in
Number of seconds until the refresh token expires.
Returns
-------
TokenCredentials
The stored credentials with calculated expiration timestamps.
Raises
------
CredentialStorageError
If storage fails.
"""
try:
current_time = int(time.time())
# Calculate expiration timestamps with threshold
access_expires_at = current_time + expires_in - self.EXPIRATION_THRESHOLD
refresh_expires_at: Optional[int] = None
if refresh_token and refresh_expires_in:
refresh_expires_at = current_time + refresh_expires_in - self.EXPIRATION_THRESHOLD
credentials = TokenCredentials(
access_token=access_token,
access_expires_at=access_expires_at,
refresh_token=refresh_token,
refresh_expires_at=refresh_expires_at,
oauth_client_id=oauth_client_id,
)
credentials_json = json.dumps(asdict(credentials))
keyring.set_password(self.KEYRING_SERVICE_NAME, url, credentials_json)
logger.debug("stored credentials", url=url)
return credentials
except Exception as e:
logger.warning("failed to store credentials", url=url, error=str(e))
raise CredentialStorageError(f"Failed to store credentials: {e}") from e
[docs]
def store_token_response(
self, url: str, tokens: TokenResponse, oauth_client_id: Optional[str] = None
) -> TokenCredentials:
"""Store OAuth tokens from a TokenResponse.
Parameters
----------
url
The platform URL to associate with these credentials.
tokens
The TokenResponse from AuthService.
oauth_client_id
The Keycloak OAuth client ID used to obtain these tokens.
Returns
-------
TokenCredentials
The stored credentials with calculated expiration timestamps.
Raises
------
CredentialStorageError
If storage fails.
"""
return self.store(
url=url,
access_token=tokens.access_token,
expires_in=tokens.expires_in,
refresh_token=tokens.refresh_token,
refresh_expires_in=tokens.refresh_expires_in,
oauth_client_id=oauth_client_id,
)
[docs]
def retrieve(self, url: str) -> Optional[TokenCredentials]:
"""Retrieve OAuth tokens from the secure keyring.
Parameters
----------
url
The platform URL to retrieve credentials for.
Returns
-------
Optional[TokenCredentials]
The stored credentials if found, None otherwise.
"""
credentials_json = keyring.get_password(self.KEYRING_SERVICE_NAME, url)
if not credentials_json:
logger.debug("no credentials found", url=url)
return None
try:
data: object = json.loads(credentials_json) # pyright: ignore[reportAny]
except json.JSONDecodeError as e:
logger.warning("failed to parse credentials", url=url, error=str(e))
raise CredentialRetrievalError(f"Failed to parse stored credentials: {e}") from e
if not isinstance(data, dict):
logger.warning("invalid credentials format", url=url)
raise CredentialRetrievalError("Invalid credentials format: expected dict")
return self._parse_credentials(data, url) # pyright: ignore[reportUnknownArgumentType]
[docs]
def clear(self, url: str) -> bool:
"""Clear stored credentials for a specific URL.
Parameters
----------
url
The platform URL to clear credentials for.
Returns
-------
bool
True if credentials were cleared, False if none existed.
"""
try:
existing = keyring.get_password(self.KEYRING_SERVICE_NAME, url)
if not existing:
logger.debug("no credentials to clear", url=url)
return False
keyring.delete_password(self.KEYRING_SERVICE_NAME, url)
logger.debug("cleared credentials", url=url)
return True
except Exception as e:
logger.warning("failed to clear credentials", url=url, error=str(e))
return False
def _parse_credentials(self, data: dict[str, object], url: str) -> Optional[TokenCredentials]:
"""Parse and validate credentials data from JSON.
Parameters
----------
data
The parsed JSON data as a dict.
url
The URL for logging purposes.
Returns
-------
Optional[TokenCredentials]
Parsed credentials if valid, None otherwise.
"""
# Validate required fields
access_token = data.get("access_token")
access_expires_at = data.get("access_expires_at")
if not isinstance(access_token, str) or not isinstance(access_expires_at, int):
logger.warning("invalid credentials data types", url=url)
raise CredentialRetrievalError("Invalid credentials: missing or invalid required fields")
refresh_token = data.get("refresh_token")
refresh_expires_at = data.get("refresh_expires_at")
oauth_client_id = data.get("oauth_client_id")
credentials = TokenCredentials(
access_token=access_token,
access_expires_at=access_expires_at,
refresh_token=refresh_token if isinstance(refresh_token, str) else None,
refresh_expires_at=refresh_expires_at if isinstance(refresh_expires_at, int) else None,
oauth_client_id=oauth_client_id if isinstance(oauth_client_id, str) else None,
)
logger.debug("retrieved credentials", url=url)
return credentials
[docs]
def is_token_expired(self, credentials: TokenCredentials) -> bool:
"""Check if the access token is expired.
Parameters
----------
credentials
The credentials to check.
Returns
-------
bool
True if the access token is expired, False otherwise.
"""
current_time = int(time.time())
return current_time >= credentials.access_expires_at
[docs]
def is_refresh_token_expired(self, credentials: TokenCredentials) -> bool:
"""Check if the refresh token is expired.
Parameters
----------
credentials
The credentials to check.
Returns
-------
bool
True if no refresh token or if it's expired, False otherwise.
"""
if not credentials.refresh_token or not credentials.refresh_expires_at:
return True
current_time = int(time.time())
return current_time >= credentials.refresh_expires_at