Source code for kelvin.sdk.lib.auth.credential_storage
import dataclasses
import json
import time
from dataclasses import dataclass
from typing import Dict, Optional
import keyring
from kelvin.sdk.lib.utils.logger_utils import logger
from .errors import CredentialStorageError
[docs]
@dataclass
class TokenCredentials:
access_token: str
access_expires_at: int
refresh_token: Optional[str] = None
refresh_expires_at: Optional[int] = None
[docs]
class CredentialStorage:
KEYRING_SERVICE_NAME = "kelvin:kelvin-sdk"
EXPIRATION_THRESHOLD = 60 # seconds
[docs]
def store_credentials(self, url: str, oauth_tokens: Dict) -> TokenCredentials:
"""
Store OAuth tokens in keyring.
Parameters
----------
url : str
The platform URL to associate with these credentials
oauth_tokens : Dict
OAuth token response containing access_token, expires_in, refresh_token, refresh_expires_in, etc.
"""
try:
# Calculate expiration timestamps
current_time = int(time.time())
access_expires_at = current_time + oauth_tokens.get("expires_in", 0) - self.EXPIRATION_THRESHOLD
refresh_token = oauth_tokens.get("refresh_token")
refresh_expires_at = None
if refresh_token:
refresh_expires_at = (
current_time + oauth_tokens.get("refresh_expires_in", 0) - self.EXPIRATION_THRESHOLD
)
# Create TokenCredentials object
credentials = TokenCredentials(
access_token=oauth_tokens["access_token"],
access_expires_at=access_expires_at,
refresh_token=refresh_token,
refresh_expires_at=refresh_expires_at,
)
# Serialize to JSON and store in keyring
credentials_json = json.dumps(dataclasses.asdict(credentials))
keyring.set_password(self.KEYRING_SERVICE_NAME, url, credentials_json)
logger.debug(f"Stored OAuth credentials for URL '{url}' in keyring")
return credentials
except KeyError as e:
raise CredentialStorageError(f"Missing required OAuth token field: {e}")
except Exception as e:
logger.warning(f"Failed to store OAuth credentials in keyring: {e}")
raise CredentialStorageError(f"Failed to store credentials: {e}")
[docs]
def retrieve_credentials(self, url: str) -> Optional[TokenCredentials]:
"""
Retrieve OAuth tokens from keyring.
Parameters
----------
url : str
The platform URL to retrieve credentials for
Returns
-------
Optional[TokenCredentials]
TokenCredentials object if found, None otherwise
"""
try:
credentials_json = keyring.get_password(self.KEYRING_SERVICE_NAME, url)
if not credentials_json:
return None
# Parse JSON and create TokenCredentials object
credentials_data = json.loads(credentials_json)
credentials = TokenCredentials(**credentials_data)
logger.debug(f"Retrieved OAuth credentials for URL '{url}' from keyring")
return credentials
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse stored credentials for URL '{url}': {e}")
return None
except Exception as e:
logger.warning(f"Failed to retrieve credentials from keyring: {e}")
return None
[docs]
def clear_credentials(self, url: str) -> None:
"""
Clear stored OAuth credentials for a specific URL.
Parameters
----------
url : str
The platform URL to clear credentials for
"""
try:
creds = keyring.get_password(self.KEYRING_SERVICE_NAME, url)
if not creds:
return
keyring.delete_password(self.KEYRING_SERVICE_NAME, url)
logger.debug(f"Cleared OAuth credentials for URL '{url}' from keyring")
except Exception as e:
logger.warning(f"Failed to clear credentials from keyring: {e}")