Source code for kelvin.sdk.lib.auth.credential_storage

import dataclasses
import json
import time
from dataclasses import dataclass
from typing import Dict, Optional

import keyring

from kelvin.sdk.lib.utils.logger_utils import logger

from .errors import CredentialStorageError


[docs] @dataclass class TokenCredentials: access_token: str access_expires_at: int refresh_token: Optional[str] = None refresh_expires_at: Optional[int] = None
[docs] class CredentialStorage: KEYRING_SERVICE_NAME = "kelvin:kelvin-sdk" EXPIRATION_THRESHOLD = 60 # seconds
[docs] def store_credentials(self, url: str, oauth_tokens: Dict) -> TokenCredentials: """ Store OAuth tokens in keyring. Parameters ---------- url : str The platform URL to associate with these credentials oauth_tokens : Dict OAuth token response containing access_token, expires_in, refresh_token, refresh_expires_in, etc. """ try: # Calculate expiration timestamps current_time = int(time.time()) access_expires_at = current_time + oauth_tokens.get("expires_in", 0) - self.EXPIRATION_THRESHOLD refresh_token = oauth_tokens.get("refresh_token") refresh_expires_at = None if refresh_token: refresh_expires_at = ( current_time + oauth_tokens.get("refresh_expires_in", 0) - self.EXPIRATION_THRESHOLD ) # Create TokenCredentials object credentials = TokenCredentials( access_token=oauth_tokens["access_token"], access_expires_at=access_expires_at, refresh_token=refresh_token, refresh_expires_at=refresh_expires_at, ) # Serialize to JSON and store in keyring credentials_json = json.dumps(dataclasses.asdict(credentials)) keyring.set_password(self.KEYRING_SERVICE_NAME, url, credentials_json) logger.debug(f"Stored OAuth credentials for URL '{url}' in keyring") return credentials except KeyError as e: raise CredentialStorageError(f"Missing required OAuth token field: {e}") except Exception as e: logger.warning(f"Failed to store OAuth credentials in keyring: {e}") raise CredentialStorageError(f"Failed to store credentials: {e}")
[docs] def retrieve_credentials(self, url: str) -> Optional[TokenCredentials]: """ Retrieve OAuth tokens from keyring. Parameters ---------- url : str The platform URL to retrieve credentials for Returns ------- Optional[TokenCredentials] TokenCredentials object if found, None otherwise """ try: credentials_json = keyring.get_password(self.KEYRING_SERVICE_NAME, url) if not credentials_json: return None # Parse JSON and create TokenCredentials object credentials_data = json.loads(credentials_json) credentials = TokenCredentials(**credentials_data) logger.debug(f"Retrieved OAuth credentials for URL '{url}' from keyring") return credentials except json.JSONDecodeError as e: logger.warning(f"Failed to parse stored credentials for URL '{url}': {e}") return None except Exception as e: logger.warning(f"Failed to retrieve credentials from keyring: {e}") return None
[docs] def clear_credentials(self, url: str) -> None: """ Clear stored OAuth credentials for a specific URL. Parameters ---------- url : str The platform URL to clear credentials for """ try: creds = keyring.get_password(self.KEYRING_SERVICE_NAME, url) if not creds: return keyring.delete_password(self.KEYRING_SERVICE_NAME, url) logger.debug(f"Cleared OAuth credentials for URL '{url}' from keyring") except Exception as e: logger.warning(f"Failed to clear credentials from keyring: {e}")