"""ManifestBuilder for creating test RuntimeManifests."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Optional, cast
import structlog
import yaml
from pydantic import BaseModel, Field
from typing_extensions import Self, override
from kelvin.krn import KRNAsset
from kelvin.message.base_messages import ParameterType, PropertyType
from kelvin.message.msg_type import PrimitiveTypes
from kelvin.message.runtime_manifest import (
CAWayEnum,
CustomAction,
ManifestDatastream,
Resource,
ResourceDatastream,
RuntimeManifest,
RuntimeManifestPayload,
WayEnum,
)
logger = structlog.stdlib.get_logger()
VALID_DATA_TYPES = {e.value for e in PrimitiveTypes}
class _PendingAsset(BaseModel):
"""Internal storage for asset data before Resource construction."""
name: str
properties: dict[str, PropertyType] = Field(default_factory=dict)
parameters: dict[str, ParameterType] = Field(default_factory=dict)
class _PendingDatastream(BaseModel):
"""Internal storage for datastream data."""
manifest_ds: ManifestDatastream
way: WayEnum
configuration: dict[str, Any] = Field(default_factory=dict)
[docs]
class ManifestBuilder:
"""Fluent builder for creating RuntimeManifest instances for testing.
Provides a chainable API for constructing RuntimeManifest objects with
datastreams, assets, and configuration for use in testing scenarios.
Examples:
>>> manifest = (ManifestBuilder()
... .add_datastream("temperature", "number", WayEnum.input)
... .add_datastream("alert", "boolean", WayEnum.output)
... .add_asset("pump-001", properties={"location": "A1"})
... .set_configuration({"threshold": 100})
... .build())
"""
[docs]
def __init__(self) -> None:
"""Initialize an empty manifest builder."""
self._pending_assets: dict[str, _PendingAsset] = {}
self._pending_datastreams: dict[str, _PendingDatastream] = {}
self._pending_custom_actions: dict[str, CustomAction] = {}
self._configuration: dict[str, Any] = {}
logger.debug("manifest_builder_initialized")
[docs]
@override
def __repr__(self) -> str:
"""Return a string representation of the builder state."""
asset_names = list(self._pending_assets.keys())
ds_names = list(self._pending_datastreams.keys())
ca_types = list(self._pending_custom_actions.keys())
config_keys = list(self._configuration.keys())
return f"ManifestBuilder(assets={asset_names}, datastreams={ds_names}, custom_actions={ca_types}, configuration_keys={config_keys})"
# ====================================================
# Factory Methods
# ====================================================
[docs]
@classmethod
def from_app_yaml(cls, path: Optional[Path] = None) -> ManifestBuilder:
"""Load base configuration from app.yaml.
Supports both legacy format (inputs/outputs at top level) and
modern format (spec_version 5.0.0 with data_streams section).
Parameters:
path: Path to app.yaml file.
Returns:
ManifestBuilder populated from app.yaml.
Raises:
FileNotFoundError: If the specified path does not exist.
yaml.YAMLError: If the YAML file is malformed.
"""
if path is None:
path = Path("app.yaml")
builder = cls()
if not path.exists():
logger.error("app_yaml_not_found", path=str(path))
raise FileNotFoundError(f"app.yaml not found at {path}")
config: dict[str, Any] = yaml.safe_load(path.read_text()) # pyright: ignore[reportUnknownMemberType]
if not config:
return builder
# Modern format (spec_version 5.0.0): data_streams section
data_streams: dict[str, Any] = config.get("data_streams", {})
for input_def in data_streams.get("inputs", []):
d = cast(dict[str, Any], input_def) if isinstance(input_def, dict) else None
name = str(d.get("name") or input_def) if d else str(input_def) # pyright: ignore[reportUnknownArgumentType]
data_type = str(d.get("data_type", "number")) if d else "number"
_ = builder.add_datastream(name, data_type, WayEnum.input)
for output_def in data_streams.get("outputs", []):
d = cast(dict[str, Any], output_def) if isinstance(output_def, dict) else None
name = str(d.get("name") or output_def) if d else str(output_def) # pyright: ignore[reportUnknownArgumentType]
data_type = str(d.get("data_type", "number")) if d else "number"
_ = builder.add_datastream(name, data_type, WayEnum.output)
# Legacy format: inputs/outputs at top level
legacy_inputs: dict[str, Any] = config.get("inputs", {})
for ds_name, spec in legacy_inputs.items():
d = cast(dict[str, Any], spec) if isinstance(spec, dict) else None
data_type = str(d.get("data_type", "number")) if d else "number"
_ = builder.add_datastream(str(ds_name), data_type, WayEnum.input)
legacy_outputs: dict[str, Any] = config.get("outputs", {})
for ds_name, spec in legacy_outputs.items():
d = cast(dict[str, Any], spec) if isinstance(spec, dict) else None
data_type = str(d.get("data_type", "number")) if d else "number"
_ = builder.add_datastream(str(ds_name), data_type, WayEnum.output)
# Control changes (modern format)
control_changes: dict[str, Any] = config.get("control_changes", {})
for cc_input in control_changes.get("inputs", []):
d = cast(dict[str, Any], cc_input) if isinstance(cc_input, dict) else None
name = str(d.get("name")) if d else str(cc_input) # pyright: ignore[reportUnknownArgumentType]
data_type = str(d.get("data_type", "number")) if d else "number"
if name:
_ = builder.add_datastream(name, data_type, WayEnum.input_cc)
for cc_output in control_changes.get("outputs", []):
d = cast(dict[str, Any], cc_output) if isinstance(cc_output, dict) else None
name = str(d.get("name")) if d else str(cc_output) # pyright: ignore[reportUnknownArgumentType]
data_type = str(d.get("data_type", "number")) if d else "number"
if name:
_ = builder.add_datastream(name, data_type, WayEnum.output_cc)
# Configuration from defaults or top level
defaults = config.get("defaults", {})
if "configuration" in defaults:
_ = builder.set_configuration(defaults["configuration"])
elif "configuration" in config:
_ = builder.set_configuration(config["configuration"])
logger.debug(
"manifest_builder_from_app_yaml",
path=str(path),
datastreams=len(builder._pending_datastreams),
config_keys=len(builder._configuration),
)
return builder
[docs]
@classmethod
def from_dict(cls, config: dict[str, Any]) -> ManifestBuilder:
"""Create a ManifestBuilder from a dictionary.
Parameters:
config: Dictionary with manifest configuration.
Returns:
ManifestBuilder populated from the dictionary.
"""
builder = cls()
# Handle resources
for resource_data in config.get("resources", []):
asset_name = resource_data.get("name", resource_data.get("asset", ""))
if asset_name:
_ = builder.add_asset(
asset_name,
properties=resource_data.get("properties", {}),
parameters=resource_data.get("parameters", {}),
)
# Handle datastreams
for ds_data in config.get("datastreams", []):
name: str = ds_data.get("name", "")
data_type: str = ds_data.get("data_type", ds_data.get("primitive_type_name", "number"))
way_str = ds_data.get("way", "input")
way = WayEnum(way_str) if way_str in [e.value for e in WayEnum] else WayEnum.input
_ = builder.add_datastream(name, data_type, way)
# Handle configuration
if "configuration" in config:
_ = builder.set_configuration(config["configuration"])
return builder
# ====================================================
# Single Item Methods
# ====================================================
[docs]
def add_asset(
self,
name: str,
properties: dict[str, Any] | None = None,
parameters: dict[str, Any] | None = None,
) -> Self:
"""Add an asset to the manifest.
Note: If an asset with the same name already exists, it will be overwritten.
Parameters:
name: Asset name (cannot be None, empty, or whitespace-only).
properties: Optional asset properties.
parameters: Optional asset parameters.
Returns:
Self for chaining.
Raises:
ValueError: If name is None, empty, or whitespace-only.
"""
if not name or not name.strip():
logger.error("invalid_asset_name", name=repr(name))
raise ValueError("asset name cannot be None, empty, or whitespace-only")
self._pending_assets[name] = _PendingAsset(
name=name,
properties=properties or {},
parameters=parameters or {},
)
logger.debug("asset_added", name=name)
return self
[docs]
def add_datastream(
self,
name: str,
data_type: str = "number",
way: WayEnum = WayEnum.input,
unit: str | None = None,
configuration: dict[str, Any] | None = None,
) -> Self:
"""Add a datastream definition to the manifest.
Note: If a datastream with the same name already exists, it will be overwritten.
Parameters:
name: Datastream name (cannot be None, empty, or whitespace-only).
data_type: Data type (number, string, boolean, object). Defaults to "number".
way: Input/output direction (input, output, input_cc, output_cc, input_cc_output,
input_output_cc). Defaults to WayEnum.input.
unit: Optional unit name (e.g., "celsius", "meters").
configuration: Optional per-datastream configuration dict.
Returns:
Self for chaining.
Raises:
ValueError: If name is None, empty, or whitespace-only.
ValueError: If data_type is not one of: boolean, number, object, string.
"""
if not name or not name.strip():
logger.error("invalid_datastream_name", name=repr(name))
raise ValueError("datastream name cannot be None, empty, or whitespace-only")
if data_type not in VALID_DATA_TYPES:
logger.error("invalid_datastream_data_type", data_type=data_type, valid_types=sorted(VALID_DATA_TYPES))
raise ValueError(f"invalid data_type '{data_type}', must be one of: {sorted(VALID_DATA_TYPES)}")
self._pending_datastreams[name] = _PendingDatastream(
manifest_ds=ManifestDatastream(
name=name,
primitive_type_name=data_type,
data_type_name=data_type,
unit_name=unit,
),
way=way,
configuration=configuration or {},
)
logger.debug("datastream_added", name=name, data_type=data_type, way=way.value, unit=unit)
return self
[docs]
def add_output(
self,
name: str,
data_type: str = "number",
unit: str | None = None,
configuration: dict[str, Any] | None = None,
) -> Self:
"""Add an output datastream.
Convenience method that calls add_datastream with WayEnum.output.
Parameters:
name: Datastream name (cannot be None, empty, or whitespace-only).
data_type: Data type (number, string, boolean, object).
unit: Optional unit name.
configuration: Optional per-datastream configuration dict.
Returns:
Self for chaining.
Raises:
ValueError: If name is None, empty, or whitespace-only.
ValueError: If data_type is not one of: boolean, number, object, string.
"""
return self.add_datastream(name, data_type, WayEnum.output, unit, configuration)
[docs]
def add_control_change_output(
self,
name: str,
data_type: str = "number",
unit: str | None = None,
configuration: dict[str, Any] | None = None,
) -> Self:
"""Add a control change output datastream.
Convenience method that calls add_datastream with WayEnum.output_cc.
Parameters:
name: Datastream name (cannot be None, empty, or whitespace-only).
data_type: Data type (number, string, boolean, object).
unit: Optional unit name.
configuration: Optional per-datastream configuration dict.
Returns:
Self for chaining.
Raises:
ValueError: If name is None, empty, or whitespace-only.
ValueError: If data_type is not one of: boolean, number, object, string.
"""
return self.add_datastream(name, data_type, WayEnum.output_cc, unit, configuration)
[docs]
def add_custom_action(self, action_type: str, way: CAWayEnum = CAWayEnum.output_ca) -> Self:
"""Add a custom action to the manifest.
Note: If a custom action with the same type already exists, it will be overwritten.
Parameters:
action_type: Custom action type identifier (cannot be None, empty, or whitespace-only).
way: Direction of the custom action (input_ca or output_ca). Defaults to output_ca.
Returns:
Self for chaining.
Raises:
ValueError: If action_type is None, empty, or whitespace-only.
"""
if not action_type or not action_type.strip():
logger.error("invalid_custom_action_type", action_type=repr(action_type))
raise ValueError("custom action type cannot be None, empty, or whitespace-only")
self._pending_custom_actions[action_type] = CustomAction(type=action_type, way=way)
logger.debug("custom_action_added", action_type=action_type, way=way.value)
return self
[docs]
def add_custom_action_output(self, action_type: str) -> Self:
"""Add an output custom action.
Convenience method that calls add_custom_action with CAWayEnum.output_ca.
Parameters:
action_type: Custom action type identifier (cannot be None, empty, or whitespace-only).
Returns:
Self for chaining.
Raises:
ValueError: If action_type is None, empty, or whitespace-only.
"""
return self.add_custom_action(action_type, CAWayEnum.output_ca)
# ====================================================
# Bulk Methods
# ====================================================
[docs]
def add_assets(self, assets: list[dict[str, Any]]) -> Self:
"""Add multiple assets to the manifest.
Note: If an asset with the same name already exists, it will be overwritten.
Parameters:
assets: List of dicts with 'name' (required), 'properties', 'parameters' keys.
Returns:
Self for chaining.
Raises:
KeyError: If any asset dict is missing the required 'name' key.
ValueError: If any asset name is None, empty, or whitespace-only.
"""
for i, asset in enumerate(assets):
# Edge case: require 'name' key to be present
if "name" not in asset:
logger.error("asset_missing_name", index=i, keys=list(asset.keys()))
raise KeyError(f"asset at index {i} is missing required 'name' key")
_ = self.add_asset(
name=asset["name"],
properties=asset.get("properties"),
parameters=asset.get("parameters"),
)
logger.debug("assets_bulk_added", count=len(assets))
return self
[docs]
def add_datastreams(self, datastreams: list[dict[str, Any]]) -> Self:
"""Add multiple datastreams to the manifest.
Note: If a datastream with the same name already exists, it will be overwritten.
Parameters:
datastreams: List of dicts with 'name' (required), 'data_type', 'way',
'unit', 'configuration' keys.
Returns:
Self for chaining.
Raises:
KeyError: If any datastream dict is missing the required 'name' key.
ValueError: If any datastream name is None, empty, or whitespace-only.
ValueError: If 'way' is an invalid string or 'data_type' is invalid.
"""
for i, ds in enumerate(datastreams):
# Edge case: require 'name' key to be present
if "name" not in ds:
logger.error("datastream_missing_name", index=i, keys=list(ds.keys()))
raise KeyError(f"datastream at index {i} is missing required 'name' key")
way = ds.get("way", WayEnum.input)
if isinstance(way, str):
# Edge case: invalid way string will raise ValueError
way = WayEnum(way)
_ = self.add_datastream(
name=ds["name"],
data_type=ds.get("data_type", "number"),
way=way,
unit=ds.get("unit"),
configuration=ds.get("configuration"),
)
logger.debug("datastreams_bulk_added", count=len(datastreams))
return self
[docs]
def add_custom_actions(self, actions: list[dict[str, Any]]) -> Self:
"""Add multiple custom actions to the manifest.
Note: If a custom action with the same type already exists, it will be overwritten.
Parameters:
actions: List of dicts with 'type' (required) and 'way' (optional) keys.
The 'way' can be a CAWayEnum or string ('input-ca' or 'output-ca').
Returns:
Self for chaining.
Raises:
KeyError: If any action dict is missing the required 'type' key.
ValueError: If any action type is None, empty, or whitespace-only.
ValueError: If 'way' is an invalid string.
"""
for i, action in enumerate(actions):
# Edge case: require 'type' key to be present
if "type" not in action:
logger.error("custom_action_missing_type", index=i, keys=list(action.keys()))
raise KeyError(f"custom action at index {i} is missing required 'type' key")
way = action.get("way", CAWayEnum.output_ca)
if isinstance(way, str):
# Edge case: invalid way string will raise ValueError
way = CAWayEnum(way)
_ = self.add_custom_action(action_type=action["type"], way=way)
logger.debug("custom_actions_bulk_added", count=len(actions))
return self
# ====================================================
# Configuration Methods
# ====================================================
[docs]
def set_configuration(self, config: dict[str, Any]) -> Self:
"""Set the app configuration.
Note: This replaces any existing configuration entirely.
Parameters:
config: Configuration dictionary.
Returns:
Self for chaining.
"""
self._configuration = config
logger.debug("configuration_set", keys=list(config.keys()))
return self
[docs]
def clear(self) -> Self:
"""Clear all builder state to allow reuse.
Resets assets, datastreams, custom actions, and configuration
to empty state. Useful for building multiple manifests with
the same builder instance.
Returns:
Self for chaining.
"""
self._pending_assets.clear()
self._pending_datastreams.clear()
self._pending_custom_actions.clear()
self._configuration = {}
logger.debug("manifest_builder_cleared")
return self
# ====================================================
# Build Methods
# ====================================================
def _build_resources(self) -> list[Resource]:
"""Build Resource objects from pending assets.
Returns:
List of Resource instances with current datastreams.
"""
resources: list[Resource] = []
for asset in self._pending_assets.values():
# Build datastreams dict for this asset using current datastreams
asset_datastreams: dict[str, ResourceDatastream] = {}
for pending_ds in self._pending_datastreams.values():
asset_datastreams[pending_ds.manifest_ds.name] = ResourceDatastream(
way=pending_ds.way,
configuration=pending_ds.configuration,
)
resource = Resource(
resource=KRNAsset(asset.name),
properties=asset.properties,
parameters=asset.parameters,
datastreams=asset_datastreams,
)
resources.append(resource)
return resources
[docs]
def build(self) -> RuntimeManifest:
"""Build the RuntimeManifest.
Constructs the final RuntimeManifest from all configured assets,
datastreams, custom actions, and configuration.
Returns:
RuntimeManifest instance ready for use in testing.
"""
manifest = RuntimeManifest(
payload=RuntimeManifestPayload(
resources=self._build_resources(),
datastreams=[ds.manifest_ds for ds in self._pending_datastreams.values()],
configuration=self._configuration,
custom_actions=list(self._pending_custom_actions.values()),
)
)
logger.debug(
"manifest_built",
resources=len(manifest.payload.resources),
datastreams=len(manifest.payload.datastreams),
custom_actions=len(manifest.payload.custom_actions),
config_keys=len(manifest.payload.configuration),
)
return manifest