Source code for kelvin.sdk.services.docker

"""DockerService - Docker operations using buildx for all builds.

This service manages Docker operations for building and managing application images.
Uses buildx for all builds (both single and multi-arch) for consistency.
It is a leaf service with no dependencies.
"""

# pyright: reportUnknownMemberType=false

from __future__ import annotations

import contextlib
import tarfile
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Final, Optional, cast

import structlog
from docker import APIClient
from docker.errors import DockerException, NotFound
from packaging.version import Version
from packaging.version import parse as parse_version
from python_on_whales import docker as docker_buildx

from kelvin.sdk.exceptions import CLIError

logger = cast(structlog.stdlib.BoundLogger, structlog.get_logger(__name__))


# ============ Constants ============

# Docker versions with host-gateway support
HOST_GATEWAY_SUPPORT_START: Final[Version] = parse_version("24.0.3")
HOST_GATEWAY_SUPPORT_END: Final[Version] = parse_version("25.0.0")

# Platform mapping from common names to Docker platform strings
PLATFORM_MAP: Final[dict[str, str]] = {
    "amd64": "linux/amd64",
    "arm64": "linux/arm64",
    "arm32": "linux/arm/v7",
}


# ============ Exceptions ============


[docs] class DockerServiceError(CLIError): """Base exception for Docker service operations.""" exit_code: int = 75 # EX_TEMPFAIL - temporary failure
[docs] class DockerNotRunningError(DockerServiceError): """Docker daemon is not running.""" def __init__(self, message: Optional[str] = None): super().__init__( message or ( "Docker is not running. Please start Docker and try again.\n" "More info: https://docs.docker.com/config/daemon/" ) )
[docs] class DockerVersionError(DockerServiceError): """Docker version does not meet requirements.""" pass
[docs] class DockerBuildError(DockerServiceError): """Docker build operation failed.""" pass
[docs] class DockerImageNotFoundError(DockerServiceError): """Docker image not found.""" exit_code: int = 66 # EX_NOINPUT
[docs] class DockerPlatformError(DockerServiceError): """Requested platform is not supported.""" pass
[docs] class DockerRegistryError(DockerServiceError): """Docker registry operation failed.""" pass
# ============ Models ============
[docs] @dataclass class DockerImage: """Docker image information.""" id: str name: str tags: list[str] size: int created_at: Optional[datetime] = None labels: dict[str, str] = field(default_factory=dict)
[docs] @classmethod def from_api(cls, data: dict[str, object]) -> DockerImage: """Create DockerImage from Docker API response.""" raw_id = data.get("Id", "") image_id = str(raw_id).replace("sha256:", "")[:12] raw_tags = data.get("RepoTags") tags: list[str] = [str(t) for t in raw_tags] if isinstance(raw_tags, list) else [] # pyright: ignore[reportUnknownVariableType,reportUnknownArgumentType] name: str = str(tags[0]) if tags else image_id created = data.get("Created") created_at = None if isinstance(created, int): created_at = datetime.fromtimestamp(created) raw_size = data.get("Size", 0) size: int = raw_size if isinstance(raw_size, int) else 0 raw_labels = data.get("Labels") labels: dict[str, str] = {str(k): str(v) for k, v in raw_labels.items()} if isinstance(raw_labels, dict) else {} # pyright: ignore[reportUnknownVariableType,reportUnknownArgumentType] return cls( id=image_id, name=name, tags=tags, size=size, created_at=created_at, labels=labels, )
[docs] @dataclass class BuildConfig: """Configuration for a Docker build operation.""" context_path: Path dockerfile_path: Path image_name: str build_args: Optional[dict[str, str]] = None platforms: Optional[list[str]] = None # e.g., ["linux/amd64", "linux/arm64"] labels: Optional[dict[str, str]] = None fresh_build: bool = False # If True, no cache
[docs] @dataclass class BuildResult: """Result of a Docker build operation.""" success: bool image_name: str logs: list[str] = field(default_factory=list)
# ============ Service ============
[docs] class DockerService: """Docker operations using buildx for all builds. This is a leaf service with no dependencies. It manages: - Docker daemon status checks - Registry authentication - Image builds (via buildx) - Image management (list, get, remove, tag, pull) - File extraction from images All builds use buildx for consistency, even for single-architecture builds. Example ------- >>> docker = DockerService() >>> if docker.is_running(): ... result = docker.build(config, registry_url="registry.example.com") """ CLIENT_TIMEOUT: Final[int] = 300 # seconds
[docs] def __init__(self) -> None: """Initialize DockerService.""" self._client: Optional[APIClient] = None
def _get_client(self) -> APIClient: """Get or create the Docker API client. Returns ------- APIClient The Docker API client. Raises ------ DockerNotRunningError If Docker is not accessible. """ if self._client is None: try: self._client = APIClient(timeout=self.CLIENT_TIMEOUT) except Exception as e: logger.warning("failed to create docker client", error=str(e)) raise DockerNotRunningError( f"Error accessing Docker: {e}\nPlease ensure Docker is running and accessible." ) from e return self._client def _ensure_running(self) -> None: """Ensure Docker daemon is running. Raises ------ DockerNotRunningError If Docker is not running. """ client = self._get_client() try: if not client.ping(): raise DockerNotRunningError() except DockerException as e: raise DockerNotRunningError(str(e)) from e # ========== Docker Status ==========
[docs] def is_running(self) -> bool: """Check if Docker daemon is running. Returns ------- bool True if Docker is running, False otherwise. """ try: client = self._get_client() return bool(client.ping()) except Exception: return False
[docs] def get_version(self) -> Optional[str]: """Get Docker version string. Returns ------- Optional[str] Docker version string, or None if unavailable. """ try: client = self._get_client() version_info = cast(dict[str, object], client.version()) version_str: str = str(version_info.get("Version", "")) # Remove any suffix like "-ce" return version_str.rsplit("-", 1)[0] if version_str else None except Exception as e: logger.debug("failed to get docker version", error=str(e)) return None
[docs] def validate_version(self, minimum_version: str) -> bool: """Check if Docker version meets minimum requirement. Parameters ---------- minimum_version Minimum required Docker version. Returns ------- bool True if current version meets requirement. Raises ------ DockerVersionError If version check fails or version is insufficient. """ current_version = self.get_version() if not current_version: raise DockerVersionError("Unable to determine Docker version") current = parse_version(current_version) minimum = parse_version(minimum_version) if current < minimum: raise DockerVersionError( f"Docker version {current_version} is below minimum {minimum_version}. " + "Please update Docker: https://docs.docker.com/engine/install/" ) logger.debug( "docker version validated", current=current_version, minimum=minimum_version, ) return True
[docs] def get_supported_platforms(self) -> list[str]: """Get platforms supported by current buildx builder. Returns ------- List[str] List of supported platforms (e.g., ["linux/amd64", "linux/arm64"]). """ try: builder = docker_buildx.buildx.inspect() # Platforms are on the nodes, not the builder itself # Collect platforms from all nodes all_platforms: set[str] = set() for node in builder.nodes or []: if node.platforms: # Remove asterisk markers from platform names all_platforms.update(p.strip("*") for p in node.platforms) if all_platforms: return list(all_platforms) logger.debug("no platforms found in builder nodes, using default") return ["linux/amd64"] except Exception as e: logger.warning("failed to get buildx platforms", error=str(e)) return ["linux/amd64"] # Default fallback
[docs] def validate_platforms(self, platforms: list[str]) -> None: """Validate that requested platforms are supported by buildx. Parameters ---------- platforms List of platforms to validate. Raises ------ DockerPlatformError If any platform is not supported. """ supported = set(self.get_supported_platforms()) requested = set(platforms) missing = requested - supported if missing: raise DockerPlatformError( f"Unsupported platforms: {', '.join(sorted(missing))}. " + f"Available: {', '.join(sorted(supported))}. " + "Please enable them in buildx." )
# ========== Registry Authentication ==========
[docs] def login(self, registry_url: str, username: str, password: str) -> None: """Login to Docker registry (both docker and buildx). Parameters ---------- registry_url The registry URL to authenticate with. username Registry username. password Registry password or token. Raises ------ DockerRegistryError If login fails. """ self._ensure_running() logger.info("logging in to docker registry", registry=registry_url) try: client = self._get_client() _ = client.login( username=username, password=password, registry=registry_url, ) # Also login with buildx docker_buildx.login( server=registry_url, username=username, password=password, ) logger.debug("docker registry login successful", registry=registry_url) except Exception as e: logger.warning("docker registry login failed", registry=registry_url, error=str(e)) raise DockerRegistryError(f"Failed to login to registry {registry_url}: {e}") from e
# ========== Build Operations (all use buildx) ==========
[docs] def build(self, config: BuildConfig, registry_url: Optional[str] = None) -> BuildResult: """Build image locally using buildx. - Single platform: loads image to local daemon (--load) - Multi platform: builds but does NOT load (multi-arch can't be loaded locally) - If registry_url provided, image name is prefixed with it Parameters ---------- config Build configuration. registry_url Optional registry URL to prefix image name. Returns ------- BuildResult Result of the build operation. Raises ------ DockerBuildError If build fails. """ self._ensure_running() return self._execute_build(config, registry_url, push=False)
[docs] def build_and_push(self, config: BuildConfig, registry_url: str) -> BuildResult: """Build and push image to registry using buildx --push. - Supports multi-arch builds - Image name is prefixed with registry_url - Image is NOT loaded locally (pushed directly to registry) Parameters ---------- config Build configuration. registry_url Registry URL to push to. Returns ------- BuildResult Result of the build operation. Raises ------ DockerBuildError If build or push fails. """ self._ensure_running() return self._execute_build(config, registry_url, push=True)
def _execute_build( self, config: BuildConfig, registry_url: Optional[str], push: bool, ) -> BuildResult: """Internal build execution using buildx. Parameters ---------- config Build configuration. registry_url Optional registry URL. push Whether to push to registry. Returns ------- BuildResult Result of the build operation. """ # Normalize platforms platforms = self._normalize_platforms(config.platforms) # Validate platforms self.validate_platforms(platforms) # Determine image name image_name = config.image_name if registry_url: image_name = f"{registry_url}/{config.image_name}" # Determine load/push behavior # Multi-arch builds cannot be loaded locally is_single_platform = len(platforms) == 1 load = is_single_platform and not push logger.info( "building docker image", image=image_name, platforms=platforms, push=push, load=load, ) # Get buildx configuration builder = docker_buildx.buildx.inspect() driver = builder.driver # Check for host-gateway support add_hosts: dict[str, str] = {} server_version = docker_buildx.info().server_version if server_version and driver == "docker": version = parse_version(server_version) if HOST_GATEWAY_SUPPORT_START <= version < HOST_GATEWAY_SUPPORT_END: add_hosts = {"host.docker.internal": "host-gateway"} # Clean up if fresh build requested if config.fresh_build: self._cleanup_for_fresh_build(image_name) try: _ = docker_buildx.buildx.build( str(config.context_path), build_args=config.build_args or {}, cache=not config.fresh_build, file=str(config.dockerfile_path), labels=config.labels or {}, progress="auto", load=load, push=push, tags=[image_name], platforms=platforms, add_hosts=add_hosts, provenance=False, # Disable to avoid cross-repo token scope issues (buildx#2364) sbom=False, # Disable to avoid cross-repo token scope issues (buildx#2364) ) # Clean up local image after push with docker driver if driver == "docker" and push: with contextlib.suppress(DockerImageNotFoundError): self.remove_image(image_name) logger.info("docker image built successfully", image=image_name) return BuildResult(success=True, image_name=image_name) except Exception as e: logger.error("docker build failed", image=image_name, error=str(e)) if push and self._is_auth_error(e): msg = ( f"Failed to push image {image_name}: {e}\n" + "This may be caused by an expired or missing Docker session.\n" + "Please run 'docker login' to authenticate with the registry." ) raise DockerBuildError(msg) from e raise DockerBuildError(f"Failed to build image {image_name}: {e}") from e def _normalize_platforms(self, platforms: Optional[list[str]]) -> list[str]: """Normalize platform names to Docker format. Parameters ---------- platforms List of platform names (can be short form like "amd64"). Returns ------- list[str] Normalized platform names. """ if not platforms: return ["linux/amd64"] normalized: list[str] = [] for platform in platforms: # Map common names to Docker platform strings mapped = PLATFORM_MAP.get(platform, platform) normalized.append(mapped) return list(set(normalized)) # Remove duplicates def _cleanup_for_fresh_build(self, image_name: str) -> None: """Clean up before a fresh build. Parameters ---------- image_name Image name to clean up. """ with contextlib.suppress(DockerImageNotFoundError): self.remove_image(image_name) # Prune stopped containers and dangling images try: client = self._get_client() client.prune_containers() client.prune_images() except Exception as e: logger.debug("prune operation failed", error=str(e)) @staticmethod def _is_auth_error(exc: Exception) -> bool: """Check if an exception looks like a registry authentication error.""" error_str = str(exc).lower() return any(keyword in error_str for keyword in ("unauthorized", "denied", "authentication required")) # ========== Image Management ==========
[docs] def list_images(self, labels: Optional[dict[str, str]] = None) -> list[DockerImage]: """List local images, optionally filtered by labels. Parameters ---------- labels Optional labels to filter images. Returns ------- List[DockerImage] List of matching Docker images. """ self._ensure_running() client = self._get_client() try: all_images = cast(list[dict[str, object]], client.images()) images = [DockerImage.from_api(img) for img in all_images] if not labels: return images # Filter by labels filtered: list[DockerImage] = [] for image in images: if all(image.labels.get(k) == v for k, v in labels.items()): filtered.append(image) return filtered except Exception as e: logger.warning("failed to list images", error=str(e)) return []
[docs] def get_image(self, name: str) -> Optional[DockerImage]: """Get a specific image by name/tag. Parameters ---------- name Image name or tag. Returns ------- Optional[DockerImage] The image if found, None otherwise. """ self._ensure_running() client = self._get_client() try: image_data = cast(dict[str, object], client.inspect_image(name)) return DockerImage.from_api(image_data) except NotFound: return None except Exception as e: logger.debug("failed to get image", name=name, error=str(e)) return None
[docs] def image_exists(self, name: str) -> bool: """Check if image exists locally. Parameters ---------- name Image name or tag. Returns ------- bool True if image exists, False otherwise. """ return self.get_image(name) is not None
[docs] def remove_image(self, name: str) -> None: """Remove a local image. Parameters ---------- name Image name or tag to remove. Raises ------ DockerImageNotFoundError If image does not exist. """ self._ensure_running() client = self._get_client() if not self.image_exists(name): raise DockerImageNotFoundError(f"Image not found: {name}") try: # First stop any containers using this image self._stop_containers_for_image(name) client.remove_image(name, force=True) logger.debug("removed image", name=name) except Exception as e: logger.warning("failed to remove image", name=name, error=str(e)) raise DockerServiceError(f"Failed to remove image {name}: {e}") from e
[docs] def tag_image(self, source: str, target: str) -> None: """Tag an existing image with a new name. Parameters ---------- source Source image name. target Target tag name. Raises ------ DockerImageNotFoundError If source image does not exist. """ self._ensure_running() client = self._get_client() if not self.image_exists(source): raise DockerImageNotFoundError(f"Source image not found: {source}") try: # Parse target into repository and tag if ":" in target: repository, tag = target.rsplit(":", 1) else: repository, tag = target, "latest" client.tag(source, repository, tag) logger.debug("tagged image", source=source, target=target) except Exception as e: logger.warning("failed to tag image", source=source, target=target, error=str(e)) raise DockerServiceError(f"Failed to tag image: {e}") from e
[docs] def pull_image(self, name: str) -> DockerImage: """Pull image from registry. Parameters ---------- name Image name with optional tag. Returns ------- DockerImage The pulled image. Raises ------ DockerImageNotFoundError If image not found in registry. DockerRegistryError If pull fails. """ self._ensure_running() client = self._get_client() logger.info("pulling docker image", image=name) try: # Pull using streaming to show progress for _chunk in client.pull(name, stream=True, decode=True): # pyright: ignore[reportUnknownVariableType] pass # Progress handled by docker image = self.get_image(name) if not image: raise DockerImageNotFoundError(f"Image not found after pull: {name}") logger.info("pulled docker image", image=name) return image except NotFound: raise DockerImageNotFoundError(f"Image not found in registry: {name}") from None except Exception as e: logger.warning("failed to pull image", image=name, error=str(e)) raise DockerRegistryError(f"Failed to pull image {name}: {e}") from e
[docs] def extract_from_image( self, image_name: str, container_path: str, output_path: Path, ) -> None: """Extract files from an image to local path. Parameters ---------- image_name Name of the image to extract from. container_path Path inside the container to extract. output_path Local path to extract files to. Raises ------ DockerImageNotFoundError If image does not exist. DockerServiceError If extraction fails. """ self._ensure_running() client = self._get_client() if not self.image_exists(image_name): raise DockerImageNotFoundError(f"Image not found: {image_name}") # Ensure output directory exists output_path.mkdir(parents=True, exist_ok=True) container_name = f"ksdk-extract-{image_name.replace('/', '-').replace(':', '-')}" try: # Remove any existing container with this name with contextlib.suppress(NotFound): client.remove_container(container_name, force=True) # Create a temporary container container = cast( dict[str, object], client.create_container( image=image_name, name=container_name, entrypoint="tail", command=["-f", "/dev/null"], ), ) container_id: str = str(container.get("Id", "")) # Extract the archive archive_result = client.get_archive(container_id, container_path) stream = archive_result[0] with TemporaryDirectory() as temp_dir: temp_tar = Path(temp_dir) / "extract.tar" # Write stream to temp file with temp_tar.open("wb") as f: for chunk in stream: _ = f.write(chunk) # Extract tar contents with tarfile.TarFile(temp_tar) as tf: # Get the base directory name from the path base_name = Path(container_path).name for member in tf.getmembers(): # Adjust paths relative to container_path if member.name.startswith(f"{base_name}/"): member.name = member.name[len(f"{base_name}/") :] elif member.name == base_name: continue # Security: skip absolute paths and parent traversal if member.name.startswith("/") or ".." in member.name: continue if member.isfile(): tf.extract(member, output_path) logger.debug( "extracted files from image", image=image_name, container_path=container_path, output_path=str(output_path), ) except DockerException as e: raise DockerServiceError(f"Failed to extract from image: {e}") from e finally: # Clean up container with contextlib.suppress(Exception): client.remove_container(container_name, force=True)
def _stop_containers_for_image(self, image_name: str) -> None: """Stop all containers running a specific image. Parameters ---------- image_name Image name to stop containers for. """ client = self._get_client() try: containers = cast(list[dict[str, object]], client.containers(all=True)) for container in containers: if container.get("Image") == image_name: container_id = str(container.get("Id", "")) if container_id: try: client.stop(container_id) client.remove_container(container_id, force=True) except Exception: pass except Exception as e: logger.debug("failed to stop containers for image", image=image_name, error=str(e))