"""DockerService - Docker operations using buildx for all builds.
This service manages Docker operations for building and managing application images.
Uses buildx for all builds (both single and multi-arch) for consistency.
It is a leaf service with no dependencies.
"""
# pyright: reportUnknownMemberType=false
from __future__ import annotations
import contextlib
import tarfile
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Final, Optional, cast
import structlog
from docker import APIClient
from docker.errors import DockerException, NotFound
from packaging.version import Version
from packaging.version import parse as parse_version
from python_on_whales import docker as docker_buildx
from kelvin.sdk.exceptions import CLIError
logger = cast(structlog.stdlib.BoundLogger, structlog.get_logger(__name__))
# ============ Constants ============
# Docker versions with host-gateway support
HOST_GATEWAY_SUPPORT_START: Final[Version] = parse_version("24.0.3")
HOST_GATEWAY_SUPPORT_END: Final[Version] = parse_version("25.0.0")
# Platform mapping from common names to Docker platform strings
PLATFORM_MAP: Final[dict[str, str]] = {
"amd64": "linux/amd64",
"arm64": "linux/arm64",
"arm32": "linux/arm/v7",
}
# ============ Exceptions ============
[docs]
class DockerServiceError(CLIError):
"""Base exception for Docker service operations."""
exit_code: int = 75 # EX_TEMPFAIL - temporary failure
[docs]
class DockerNotRunningError(DockerServiceError):
"""Docker daemon is not running."""
def __init__(self, message: Optional[str] = None):
super().__init__(
message
or (
"Docker is not running. Please start Docker and try again.\n"
"More info: https://docs.docker.com/config/daemon/"
)
)
[docs]
class DockerVersionError(DockerServiceError):
"""Docker version does not meet requirements."""
pass
[docs]
class DockerBuildError(DockerServiceError):
"""Docker build operation failed."""
pass
[docs]
class DockerImageNotFoundError(DockerServiceError):
"""Docker image not found."""
exit_code: int = 66 # EX_NOINPUT
[docs]
class DockerRegistryError(DockerServiceError):
"""Docker registry operation failed."""
pass
# ============ Models ============
[docs]
@dataclass
class DockerImage:
"""Docker image information."""
id: str
name: str
tags: list[str]
size: int
created_at: Optional[datetime] = None
labels: dict[str, str] = field(default_factory=dict)
[docs]
@classmethod
def from_api(cls, data: dict[str, object]) -> DockerImage:
"""Create DockerImage from Docker API response."""
raw_id = data.get("Id", "")
image_id = str(raw_id).replace("sha256:", "")[:12]
raw_tags = data.get("RepoTags")
tags: list[str] = [str(t) for t in raw_tags] if isinstance(raw_tags, list) else [] # pyright: ignore[reportUnknownVariableType,reportUnknownArgumentType]
name: str = str(tags[0]) if tags else image_id
created = data.get("Created")
created_at = None
if isinstance(created, int):
created_at = datetime.fromtimestamp(created)
raw_size = data.get("Size", 0)
size: int = raw_size if isinstance(raw_size, int) else 0
raw_labels = data.get("Labels")
labels: dict[str, str] = {str(k): str(v) for k, v in raw_labels.items()} if isinstance(raw_labels, dict) else {} # pyright: ignore[reportUnknownVariableType,reportUnknownArgumentType]
return cls(
id=image_id,
name=name,
tags=tags,
size=size,
created_at=created_at,
labels=labels,
)
[docs]
@dataclass
class BuildConfig:
"""Configuration for a Docker build operation."""
context_path: Path
dockerfile_path: Path
image_name: str
build_args: Optional[dict[str, str]] = None
platforms: Optional[list[str]] = None # e.g., ["linux/amd64", "linux/arm64"]
labels: Optional[dict[str, str]] = None
fresh_build: bool = False # If True, no cache
[docs]
@dataclass
class BuildResult:
"""Result of a Docker build operation."""
success: bool
image_name: str
logs: list[str] = field(default_factory=list)
# ============ Service ============
[docs]
class DockerService:
"""Docker operations using buildx for all builds.
This is a leaf service with no dependencies. It manages:
- Docker daemon status checks
- Registry authentication
- Image builds (via buildx)
- Image management (list, get, remove, tag, pull)
- File extraction from images
All builds use buildx for consistency, even for single-architecture builds.
Example
-------
>>> docker = DockerService()
>>> if docker.is_running():
... result = docker.build(config, registry_url="registry.example.com")
"""
CLIENT_TIMEOUT: Final[int] = 300 # seconds
[docs]
def __init__(self) -> None:
"""Initialize DockerService."""
self._client: Optional[APIClient] = None
def _get_client(self) -> APIClient:
"""Get or create the Docker API client.
Returns
-------
APIClient
The Docker API client.
Raises
------
DockerNotRunningError
If Docker is not accessible.
"""
if self._client is None:
try:
self._client = APIClient(timeout=self.CLIENT_TIMEOUT)
except Exception as e:
logger.warning("failed to create docker client", error=str(e))
raise DockerNotRunningError(
f"Error accessing Docker: {e}\nPlease ensure Docker is running and accessible."
) from e
return self._client
def _ensure_running(self) -> None:
"""Ensure Docker daemon is running.
Raises
------
DockerNotRunningError
If Docker is not running.
"""
client = self._get_client()
try:
if not client.ping():
raise DockerNotRunningError()
except DockerException as e:
raise DockerNotRunningError(str(e)) from e
# ========== Docker Status ==========
[docs]
def is_running(self) -> bool:
"""Check if Docker daemon is running.
Returns
-------
bool
True if Docker is running, False otherwise.
"""
try:
client = self._get_client()
return bool(client.ping())
except Exception:
return False
[docs]
def get_version(self) -> Optional[str]:
"""Get Docker version string.
Returns
-------
Optional[str]
Docker version string, or None if unavailable.
"""
try:
client = self._get_client()
version_info = cast(dict[str, object], client.version())
version_str: str = str(version_info.get("Version", ""))
# Remove any suffix like "-ce"
return version_str.rsplit("-", 1)[0] if version_str else None
except Exception as e:
logger.debug("failed to get docker version", error=str(e))
return None
[docs]
def validate_version(self, minimum_version: str) -> bool:
"""Check if Docker version meets minimum requirement.
Parameters
----------
minimum_version
Minimum required Docker version.
Returns
-------
bool
True if current version meets requirement.
Raises
------
DockerVersionError
If version check fails or version is insufficient.
"""
current_version = self.get_version()
if not current_version:
raise DockerVersionError("Unable to determine Docker version")
current = parse_version(current_version)
minimum = parse_version(minimum_version)
if current < minimum:
raise DockerVersionError(
f"Docker version {current_version} is below minimum {minimum_version}. "
+ "Please update Docker: https://docs.docker.com/engine/install/"
)
logger.debug(
"docker version validated",
current=current_version,
minimum=minimum_version,
)
return True
# ========== Registry Authentication ==========
[docs]
def login(self, registry_url: str, username: str, password: str) -> None:
"""Login to Docker registry (both docker and buildx).
Parameters
----------
registry_url
The registry URL to authenticate with.
username
Registry username.
password
Registry password or token.
Raises
------
DockerRegistryError
If login fails.
"""
self._ensure_running()
logger.info("logging in to docker registry", registry=registry_url)
try:
client = self._get_client()
_ = client.login(
username=username,
password=password,
registry=registry_url,
)
# Also login with buildx
docker_buildx.login(
server=registry_url,
username=username,
password=password,
)
logger.debug("docker registry login successful", registry=registry_url)
except Exception as e:
logger.warning("docker registry login failed", registry=registry_url, error=str(e))
raise DockerRegistryError(f"Failed to login to registry {registry_url}: {e}") from e
# ========== Build Operations (all use buildx) ==========
[docs]
def build(self, config: BuildConfig, registry_url: Optional[str] = None) -> BuildResult:
"""Build image locally using buildx.
- Single platform: loads image to local daemon (--load)
- Multi platform: builds but does NOT load (multi-arch can't be loaded locally)
- If registry_url provided, image name is prefixed with it
Parameters
----------
config
Build configuration.
registry_url
Optional registry URL to prefix image name.
Returns
-------
BuildResult
Result of the build operation.
Raises
------
DockerBuildError
If build fails.
"""
self._ensure_running()
return self._execute_build(config, registry_url, push=False)
[docs]
def build_and_push(self, config: BuildConfig, registry_url: str) -> BuildResult:
"""Build and push image to registry using buildx --push.
- Supports multi-arch builds
- Image name is prefixed with registry_url
- Image is NOT loaded locally (pushed directly to registry)
Parameters
----------
config
Build configuration.
registry_url
Registry URL to push to.
Returns
-------
BuildResult
Result of the build operation.
Raises
------
DockerBuildError
If build or push fails.
"""
self._ensure_running()
return self._execute_build(config, registry_url, push=True)
def _execute_build(
self,
config: BuildConfig,
registry_url: Optional[str],
push: bool,
) -> BuildResult:
"""Internal build execution using buildx.
Parameters
----------
config
Build configuration.
registry_url
Optional registry URL.
push
Whether to push to registry.
Returns
-------
BuildResult
Result of the build operation.
"""
# Normalize platforms
platforms = self._normalize_platforms(config.platforms)
# Validate platforms
self.validate_platforms(platforms)
# Determine image name
image_name = config.image_name
if registry_url:
image_name = f"{registry_url}/{config.image_name}"
# Determine load/push behavior
# Multi-arch builds cannot be loaded locally
is_single_platform = len(platforms) == 1
load = is_single_platform and not push
logger.info(
"building docker image",
image=image_name,
platforms=platforms,
push=push,
load=load,
)
# Get buildx configuration
builder = docker_buildx.buildx.inspect()
driver = builder.driver
# Check for host-gateway support
add_hosts: dict[str, str] = {}
server_version = docker_buildx.info().server_version
if server_version and driver == "docker":
version = parse_version(server_version)
if HOST_GATEWAY_SUPPORT_START <= version < HOST_GATEWAY_SUPPORT_END:
add_hosts = {"host.docker.internal": "host-gateway"}
# Clean up if fresh build requested
if config.fresh_build:
self._cleanup_for_fresh_build(image_name)
try:
_ = docker_buildx.buildx.build(
str(config.context_path),
build_args=config.build_args or {},
cache=not config.fresh_build,
file=str(config.dockerfile_path),
labels=config.labels or {},
progress="auto",
load=load,
push=push,
tags=[image_name],
platforms=platforms,
add_hosts=add_hosts,
provenance=False, # Disable to avoid cross-repo token scope issues (buildx#2364)
sbom=False, # Disable to avoid cross-repo token scope issues (buildx#2364)
)
# Clean up local image after push with docker driver
if driver == "docker" and push:
with contextlib.suppress(DockerImageNotFoundError):
self.remove_image(image_name)
logger.info("docker image built successfully", image=image_name)
return BuildResult(success=True, image_name=image_name)
except Exception as e:
logger.error("docker build failed", image=image_name, error=str(e))
if push and self._is_auth_error(e):
msg = (
f"Failed to push image {image_name}: {e}\n"
+ "This may be caused by an expired or missing Docker session.\n"
+ "Please run 'docker login' to authenticate with the registry."
)
raise DockerBuildError(msg) from e
raise DockerBuildError(f"Failed to build image {image_name}: {e}") from e
def _normalize_platforms(self, platforms: Optional[list[str]]) -> list[str]:
"""Normalize platform names to Docker format.
Parameters
----------
platforms
List of platform names (can be short form like "amd64").
Returns
-------
list[str]
Normalized platform names.
"""
if not platforms:
return ["linux/amd64"]
normalized: list[str] = []
for platform in platforms:
# Map common names to Docker platform strings
mapped = PLATFORM_MAP.get(platform, platform)
normalized.append(mapped)
return list(set(normalized)) # Remove duplicates
def _cleanup_for_fresh_build(self, image_name: str) -> None:
"""Clean up before a fresh build.
Parameters
----------
image_name
Image name to clean up.
"""
with contextlib.suppress(DockerImageNotFoundError):
self.remove_image(image_name)
# Prune stopped containers and dangling images
try:
client = self._get_client()
client.prune_containers()
client.prune_images()
except Exception as e:
logger.debug("prune operation failed", error=str(e))
@staticmethod
def _is_auth_error(exc: Exception) -> bool:
"""Check if an exception looks like a registry authentication error."""
error_str = str(exc).lower()
return any(keyword in error_str for keyword in ("unauthorized", "denied", "authentication required"))
# ========== Image Management ==========
[docs]
def list_images(self, labels: Optional[dict[str, str]] = None) -> list[DockerImage]:
"""List local images, optionally filtered by labels.
Parameters
----------
labels
Optional labels to filter images.
Returns
-------
List[DockerImage]
List of matching Docker images.
"""
self._ensure_running()
client = self._get_client()
try:
all_images = cast(list[dict[str, object]], client.images())
images = [DockerImage.from_api(img) for img in all_images]
if not labels:
return images
# Filter by labels
filtered: list[DockerImage] = []
for image in images:
if all(image.labels.get(k) == v for k, v in labels.items()):
filtered.append(image)
return filtered
except Exception as e:
logger.warning("failed to list images", error=str(e))
return []
[docs]
def get_image(self, name: str) -> Optional[DockerImage]:
"""Get a specific image by name/tag.
Parameters
----------
name
Image name or tag.
Returns
-------
Optional[DockerImage]
The image if found, None otherwise.
"""
self._ensure_running()
client = self._get_client()
try:
image_data = cast(dict[str, object], client.inspect_image(name))
return DockerImage.from_api(image_data)
except NotFound:
return None
except Exception as e:
logger.debug("failed to get image", name=name, error=str(e))
return None
[docs]
def image_exists(self, name: str) -> bool:
"""Check if image exists locally.
Parameters
----------
name
Image name or tag.
Returns
-------
bool
True if image exists, False otherwise.
"""
return self.get_image(name) is not None
[docs]
def remove_image(self, name: str) -> None:
"""Remove a local image.
Parameters
----------
name
Image name or tag to remove.
Raises
------
DockerImageNotFoundError
If image does not exist.
"""
self._ensure_running()
client = self._get_client()
if not self.image_exists(name):
raise DockerImageNotFoundError(f"Image not found: {name}")
try:
# First stop any containers using this image
self._stop_containers_for_image(name)
client.remove_image(name, force=True)
logger.debug("removed image", name=name)
except Exception as e:
logger.warning("failed to remove image", name=name, error=str(e))
raise DockerServiceError(f"Failed to remove image {name}: {e}") from e
[docs]
def tag_image(self, source: str, target: str) -> None:
"""Tag an existing image with a new name.
Parameters
----------
source
Source image name.
target
Target tag name.
Raises
------
DockerImageNotFoundError
If source image does not exist.
"""
self._ensure_running()
client = self._get_client()
if not self.image_exists(source):
raise DockerImageNotFoundError(f"Source image not found: {source}")
try:
# Parse target into repository and tag
if ":" in target:
repository, tag = target.rsplit(":", 1)
else:
repository, tag = target, "latest"
client.tag(source, repository, tag)
logger.debug("tagged image", source=source, target=target)
except Exception as e:
logger.warning("failed to tag image", source=source, target=target, error=str(e))
raise DockerServiceError(f"Failed to tag image: {e}") from e
[docs]
def pull_image(self, name: str) -> DockerImage:
"""Pull image from registry.
Parameters
----------
name
Image name with optional tag.
Returns
-------
DockerImage
The pulled image.
Raises
------
DockerImageNotFoundError
If image not found in registry.
DockerRegistryError
If pull fails.
"""
self._ensure_running()
client = self._get_client()
logger.info("pulling docker image", image=name)
try:
# Pull using streaming to show progress
for _chunk in client.pull(name, stream=True, decode=True): # pyright: ignore[reportUnknownVariableType]
pass # Progress handled by docker
image = self.get_image(name)
if not image:
raise DockerImageNotFoundError(f"Image not found after pull: {name}")
logger.info("pulled docker image", image=name)
return image
except NotFound:
raise DockerImageNotFoundError(f"Image not found in registry: {name}") from None
except Exception as e:
logger.warning("failed to pull image", image=name, error=str(e))
raise DockerRegistryError(f"Failed to pull image {name}: {e}") from e
def _stop_containers_for_image(self, image_name: str) -> None:
"""Stop all containers running a specific image.
Parameters
----------
image_name
Image name to stop containers for.
"""
client = self._get_client()
try:
containers = cast(list[dict[str, object]], client.containers(all=True))
for container in containers:
if container.get("Image") == image_name:
container_id = str(container.get("Id", ""))
if container_id:
try:
client.stop(container_id)
client.remove_container(container_id, force=True)
except Exception:
pass
except Exception as e:
logger.debug("failed to stop containers for image", image=image_name, error=str(e))