Source code for kelvin.testing.manifest

"""ManifestBuilder for creating test RuntimeManifests."""

from __future__ import annotations

from pathlib import Path
from typing import Any, Optional, cast

import structlog
import yaml
from pydantic import BaseModel, Field
from typing_extensions import Self, override

from kelvin.krn import KRNAsset
from kelvin.message.base_messages import ParameterType, PropertyType
from kelvin.message.msg_type import PrimitiveTypes
from kelvin.message.runtime_manifest import (
    CAWayEnum,
    CustomAction,
    ManifestDatastream,
    Resource,
    ResourceDatastream,
    RuntimeManifest,
    RuntimeManifestPayload,
    WayEnum,
)

logger = structlog.stdlib.get_logger()

VALID_DATA_TYPES = {e.value for e in PrimitiveTypes}


class _PendingAsset(BaseModel):
    """Internal storage for asset data before Resource construction."""

    name: str
    properties: dict[str, PropertyType] = Field(default_factory=dict)
    parameters: dict[str, ParameterType] = Field(default_factory=dict)


class _PendingDatastream(BaseModel):
    """Internal storage for datastream data."""

    manifest_ds: ManifestDatastream
    way: WayEnum
    configuration: dict[str, Any] = Field(default_factory=dict)


[docs] class ManifestBuilder: """Fluent builder for creating RuntimeManifest instances for testing. Provides a chainable API for constructing RuntimeManifest objects with datastreams, assets, and configuration for use in testing scenarios. Examples: >>> manifest = (ManifestBuilder() ... .add_datastream("temperature", "number", WayEnum.input) ... .add_datastream("alert", "boolean", WayEnum.output) ... .add_asset("pump-001", properties={"location": "A1"}) ... .set_configuration({"threshold": 100}) ... .build()) """
[docs] def __init__(self) -> None: """Initialize an empty manifest builder.""" self._pending_assets: dict[str, _PendingAsset] = {} self._pending_datastreams: dict[str, _PendingDatastream] = {} self._pending_custom_actions: dict[str, CustomAction] = {} self._configuration: dict[str, Any] = {} logger.debug("manifest_builder_initialized")
[docs] @override def __repr__(self) -> str: """Return a string representation of the builder state.""" asset_names = list(self._pending_assets.keys()) ds_names = list(self._pending_datastreams.keys()) ca_types = list(self._pending_custom_actions.keys()) config_keys = list(self._configuration.keys()) return f"ManifestBuilder(assets={asset_names}, datastreams={ds_names}, custom_actions={ca_types}, configuration_keys={config_keys})"
# ==================================================== # Factory Methods # ====================================================
[docs] @classmethod def from_app_yaml(cls, path: Optional[Path] = None) -> ManifestBuilder: """Load base configuration from app.yaml. Supports both legacy format (inputs/outputs at top level) and modern format (spec_version 5.0.0 with data_streams section). Parameters: path: Path to app.yaml file. Returns: ManifestBuilder populated from app.yaml. Raises: FileNotFoundError: If the specified path does not exist. yaml.YAMLError: If the YAML file is malformed. """ if path is None: path = Path("app.yaml") builder = cls() if not path.exists(): logger.error("app_yaml_not_found", path=str(path)) raise FileNotFoundError(f"app.yaml not found at {path}") config: dict[str, Any] = yaml.safe_load(path.read_text()) # pyright: ignore[reportUnknownMemberType] if not config: return builder # Modern format (spec_version 5.0.0): data_streams section data_streams: dict[str, Any] = config.get("data_streams", {}) for input_def in data_streams.get("inputs", []): d = cast(dict[str, Any], input_def) if isinstance(input_def, dict) else None name = str(d.get("name") or input_def) if d else str(input_def) # pyright: ignore[reportUnknownArgumentType] data_type = str(d.get("data_type", "number")) if d else "number" _ = builder.add_datastream(name, data_type, WayEnum.input) for output_def in data_streams.get("outputs", []): d = cast(dict[str, Any], output_def) if isinstance(output_def, dict) else None name = str(d.get("name") or output_def) if d else str(output_def) # pyright: ignore[reportUnknownArgumentType] data_type = str(d.get("data_type", "number")) if d else "number" _ = builder.add_datastream(name, data_type, WayEnum.output) # Legacy format: inputs/outputs at top level legacy_inputs: dict[str, Any] = config.get("inputs", {}) for ds_name, spec in legacy_inputs.items(): d = cast(dict[str, Any], spec) if isinstance(spec, dict) else None data_type = str(d.get("data_type", "number")) if d else "number" _ = builder.add_datastream(str(ds_name), data_type, WayEnum.input) legacy_outputs: dict[str, Any] = config.get("outputs", {}) for ds_name, spec in legacy_outputs.items(): d = cast(dict[str, Any], spec) if isinstance(spec, dict) else None data_type = str(d.get("data_type", "number")) if d else "number" _ = builder.add_datastream(str(ds_name), data_type, WayEnum.output) # Control changes (modern format) control_changes: dict[str, Any] = config.get("control_changes", {}) for cc_input in control_changes.get("inputs", []): d = cast(dict[str, Any], cc_input) if isinstance(cc_input, dict) else None name = str(d.get("name")) if d else str(cc_input) # pyright: ignore[reportUnknownArgumentType] data_type = str(d.get("data_type", "number")) if d else "number" if name: _ = builder.add_datastream(name, data_type, WayEnum.input_cc) for cc_output in control_changes.get("outputs", []): d = cast(dict[str, Any], cc_output) if isinstance(cc_output, dict) else None name = str(d.get("name")) if d else str(cc_output) # pyright: ignore[reportUnknownArgumentType] data_type = str(d.get("data_type", "number")) if d else "number" if name: _ = builder.add_datastream(name, data_type, WayEnum.output_cc) # Configuration from defaults or top level defaults = config.get("defaults", {}) if "configuration" in defaults: _ = builder.set_configuration(defaults["configuration"]) elif "configuration" in config: _ = builder.set_configuration(config["configuration"]) logger.debug( "manifest_builder_from_app_yaml", path=str(path), datastreams=len(builder._pending_datastreams), config_keys=len(builder._configuration), ) return builder
[docs] @classmethod def from_dict(cls, config: dict[str, Any]) -> ManifestBuilder: """Create a ManifestBuilder from a dictionary. Parameters: config: Dictionary with manifest configuration. Returns: ManifestBuilder populated from the dictionary. """ builder = cls() # Handle resources for resource_data in config.get("resources", []): asset_name = resource_data.get("name", resource_data.get("asset", "")) if asset_name: _ = builder.add_asset( asset_name, properties=resource_data.get("properties", {}), parameters=resource_data.get("parameters", {}), ) # Handle datastreams for ds_data in config.get("datastreams", []): name: str = ds_data.get("name", "") data_type: str = ds_data.get("data_type", ds_data.get("primitive_type_name", "number")) way_str = ds_data.get("way", "input") way = WayEnum(way_str) if way_str in [e.value for e in WayEnum] else WayEnum.input _ = builder.add_datastream(name, data_type, way) # Handle configuration if "configuration" in config: _ = builder.set_configuration(config["configuration"]) return builder
# ==================================================== # Single Item Methods # ====================================================
[docs] def add_asset( self, name: str, properties: dict[str, Any] | None = None, parameters: dict[str, Any] | None = None, ) -> Self: """Add an asset to the manifest. Note: If an asset with the same name already exists, it will be overwritten. Parameters: name: Asset name (cannot be None, empty, or whitespace-only). properties: Optional asset properties. parameters: Optional asset parameters. Returns: Self for chaining. Raises: ValueError: If name is None, empty, or whitespace-only. """ if not name or not name.strip(): logger.error("invalid_asset_name", name=repr(name)) raise ValueError("asset name cannot be None, empty, or whitespace-only") self._pending_assets[name] = _PendingAsset( name=name, properties=properties or {}, parameters=parameters or {}, ) logger.debug("asset_added", name=name) return self
[docs] def add_datastream( self, name: str, data_type: str = "number", way: WayEnum = WayEnum.input, unit: str | None = None, configuration: dict[str, Any] | None = None, ) -> Self: """Add a datastream definition to the manifest. Note: If a datastream with the same name already exists, it will be overwritten. Parameters: name: Datastream name (cannot be None, empty, or whitespace-only). data_type: Data type (number, string, boolean, object). Defaults to "number". way: Input/output direction (input, output, input_cc, output_cc, input_cc_output, input_output_cc). Defaults to WayEnum.input. unit: Optional unit name (e.g., "celsius", "meters"). configuration: Optional per-datastream configuration dict. Returns: Self for chaining. Raises: ValueError: If name is None, empty, or whitespace-only. ValueError: If data_type is not one of: boolean, number, object, string. """ if not name or not name.strip(): logger.error("invalid_datastream_name", name=repr(name)) raise ValueError("datastream name cannot be None, empty, or whitespace-only") if data_type not in VALID_DATA_TYPES: logger.error("invalid_datastream_data_type", data_type=data_type, valid_types=sorted(VALID_DATA_TYPES)) raise ValueError(f"invalid data_type '{data_type}', must be one of: {sorted(VALID_DATA_TYPES)}") self._pending_datastreams[name] = _PendingDatastream( manifest_ds=ManifestDatastream( name=name, primitive_type_name=data_type, data_type_name=data_type, unit_name=unit, ), way=way, configuration=configuration or {}, ) logger.debug("datastream_added", name=name, data_type=data_type, way=way.value, unit=unit) return self
[docs] def add_input( self, name: str, data_type: str = "number", unit: str | None = None, configuration: dict[str, Any] | None = None, ) -> Self: """Add an input datastream. Convenience method that calls add_datastream with WayEnum.input. Parameters: name: Datastream name (cannot be None, empty, or whitespace-only). data_type: Data type (number, string, boolean, object). unit: Optional unit name. configuration: Optional per-datastream configuration dict. Returns: Self for chaining. Raises: ValueError: If name is None, empty, or whitespace-only. ValueError: If data_type is not one of: boolean, number, object, string. """ return self.add_datastream(name, data_type, WayEnum.input, unit, configuration)
[docs] def add_output( self, name: str, data_type: str = "number", unit: str | None = None, configuration: dict[str, Any] | None = None, ) -> Self: """Add an output datastream. Convenience method that calls add_datastream with WayEnum.output. Parameters: name: Datastream name (cannot be None, empty, or whitespace-only). data_type: Data type (number, string, boolean, object). unit: Optional unit name. configuration: Optional per-datastream configuration dict. Returns: Self for chaining. Raises: ValueError: If name is None, empty, or whitespace-only. ValueError: If data_type is not one of: boolean, number, object, string. """ return self.add_datastream(name, data_type, WayEnum.output, unit, configuration)
[docs] def add_control_change_input( self, name: str, data_type: str = "number", unit: str | None = None, configuration: dict[str, Any] | None = None, ) -> Self: """Add a control change input datastream. Convenience method that calls add_datastream with WayEnum.input_cc. Parameters: name: Datastream name (cannot be None, empty, or whitespace-only). data_type: Data type (number, string, boolean, object). unit: Optional unit name. configuration: Optional per-datastream configuration dict. Returns: Self for chaining. Raises: ValueError: If name is None, empty, or whitespace-only. ValueError: If data_type is not one of: boolean, number, object, string. """ return self.add_datastream(name, data_type, WayEnum.input_cc, unit, configuration)
[docs] def add_control_change_output( self, name: str, data_type: str = "number", unit: str | None = None, configuration: dict[str, Any] | None = None, ) -> Self: """Add a control change output datastream. Convenience method that calls add_datastream with WayEnum.output_cc. Parameters: name: Datastream name (cannot be None, empty, or whitespace-only). data_type: Data type (number, string, boolean, object). unit: Optional unit name. configuration: Optional per-datastream configuration dict. Returns: Self for chaining. Raises: ValueError: If name is None, empty, or whitespace-only. ValueError: If data_type is not one of: boolean, number, object, string. """ return self.add_datastream(name, data_type, WayEnum.output_cc, unit, configuration)
[docs] def add_input_cc_output( self, name: str, data_type: str = "number", unit: str | None = None, configuration: dict[str, Any] | None = None, ) -> Self: """Add a datastream that is both input control change and output. Convenience method that calls add_datastream with WayEnum.input_cc_output. Use for owned datastreams with read-write access. Parameters: name: Datastream name (cannot be None, empty, or whitespace-only). data_type: Data type (number, string, boolean, object). unit: Optional unit name. configuration: Optional per-datastream configuration dict. Returns: Self for chaining. Raises: ValueError: If name is None, empty, or whitespace-only. ValueError: If data_type is not one of: boolean, number, object, string. """ return self.add_datastream(name, data_type, WayEnum.input_cc_output, unit, configuration)
[docs] def add_input_output_cc( self, name: str, data_type: str = "number", unit: str | None = None, configuration: dict[str, Any] | None = None, ) -> Self: """Add a datastream that is both input and output control change. Convenience method that calls add_datastream with WayEnum.input_output_cc. Use for remote datastreams with read-write access. Parameters: name: Datastream name (cannot be None, empty, or whitespace-only). data_type: Data type (number, string, boolean, object). unit: Optional unit name. configuration: Optional per-datastream configuration dict. Returns: Self for chaining. Raises: ValueError: If name is None, empty, or whitespace-only. ValueError: If data_type is not one of: boolean, number, object, string. """ return self.add_datastream(name, data_type, WayEnum.input_output_cc, unit, configuration)
[docs] def add_custom_action(self, action_type: str, way: CAWayEnum = CAWayEnum.output_ca) -> Self: """Add a custom action to the manifest. Note: If a custom action with the same type already exists, it will be overwritten. Parameters: action_type: Custom action type identifier (cannot be None, empty, or whitespace-only). way: Direction of the custom action (input_ca or output_ca). Defaults to output_ca. Returns: Self for chaining. Raises: ValueError: If action_type is None, empty, or whitespace-only. """ if not action_type or not action_type.strip(): logger.error("invalid_custom_action_type", action_type=repr(action_type)) raise ValueError("custom action type cannot be None, empty, or whitespace-only") self._pending_custom_actions[action_type] = CustomAction(type=action_type, way=way) logger.debug("custom_action_added", action_type=action_type, way=way.value) return self
[docs] def add_custom_action_input(self, action_type: str) -> Self: """Add an input custom action. Convenience method that calls add_custom_action with CAWayEnum.input_ca. Parameters: action_type: Custom action type identifier (cannot be None, empty, or whitespace-only). Returns: Self for chaining. Raises: ValueError: If action_type is None, empty, or whitespace-only. """ return self.add_custom_action(action_type, CAWayEnum.input_ca)
[docs] def add_custom_action_output(self, action_type: str) -> Self: """Add an output custom action. Convenience method that calls add_custom_action with CAWayEnum.output_ca. Parameters: action_type: Custom action type identifier (cannot be None, empty, or whitespace-only). Returns: Self for chaining. Raises: ValueError: If action_type is None, empty, or whitespace-only. """ return self.add_custom_action(action_type, CAWayEnum.output_ca)
# ==================================================== # Bulk Methods # ====================================================
[docs] def add_assets(self, assets: list[dict[str, Any]]) -> Self: """Add multiple assets to the manifest. Note: If an asset with the same name already exists, it will be overwritten. Parameters: assets: List of dicts with 'name' (required), 'properties', 'parameters' keys. Returns: Self for chaining. Raises: KeyError: If any asset dict is missing the required 'name' key. ValueError: If any asset name is None, empty, or whitespace-only. """ for i, asset in enumerate(assets): # Edge case: require 'name' key to be present if "name" not in asset: logger.error("asset_missing_name", index=i, keys=list(asset.keys())) raise KeyError(f"asset at index {i} is missing required 'name' key") _ = self.add_asset( name=asset["name"], properties=asset.get("properties"), parameters=asset.get("parameters"), ) logger.debug("assets_bulk_added", count=len(assets)) return self
[docs] def add_datastreams(self, datastreams: list[dict[str, Any]]) -> Self: """Add multiple datastreams to the manifest. Note: If a datastream with the same name already exists, it will be overwritten. Parameters: datastreams: List of dicts with 'name' (required), 'data_type', 'way', 'unit', 'configuration' keys. Returns: Self for chaining. Raises: KeyError: If any datastream dict is missing the required 'name' key. ValueError: If any datastream name is None, empty, or whitespace-only. ValueError: If 'way' is an invalid string or 'data_type' is invalid. """ for i, ds in enumerate(datastreams): # Edge case: require 'name' key to be present if "name" not in ds: logger.error("datastream_missing_name", index=i, keys=list(ds.keys())) raise KeyError(f"datastream at index {i} is missing required 'name' key") way = ds.get("way", WayEnum.input) if isinstance(way, str): # Edge case: invalid way string will raise ValueError way = WayEnum(way) _ = self.add_datastream( name=ds["name"], data_type=ds.get("data_type", "number"), way=way, unit=ds.get("unit"), configuration=ds.get("configuration"), ) logger.debug("datastreams_bulk_added", count=len(datastreams)) return self
[docs] def add_custom_actions(self, actions: list[dict[str, Any]]) -> Self: """Add multiple custom actions to the manifest. Note: If a custom action with the same type already exists, it will be overwritten. Parameters: actions: List of dicts with 'type' (required) and 'way' (optional) keys. The 'way' can be a CAWayEnum or string ('input-ca' or 'output-ca'). Returns: Self for chaining. Raises: KeyError: If any action dict is missing the required 'type' key. ValueError: If any action type is None, empty, or whitespace-only. ValueError: If 'way' is an invalid string. """ for i, action in enumerate(actions): # Edge case: require 'type' key to be present if "type" not in action: logger.error("custom_action_missing_type", index=i, keys=list(action.keys())) raise KeyError(f"custom action at index {i} is missing required 'type' key") way = action.get("way", CAWayEnum.output_ca) if isinstance(way, str): # Edge case: invalid way string will raise ValueError way = CAWayEnum(way) _ = self.add_custom_action(action_type=action["type"], way=way) logger.debug("custom_actions_bulk_added", count=len(actions)) return self
# ==================================================== # Configuration Methods # ====================================================
[docs] def set_configuration(self, config: dict[str, Any]) -> Self: """Set the app configuration. Note: This replaces any existing configuration entirely. Parameters: config: Configuration dictionary. Returns: Self for chaining. """ self._configuration = config logger.debug("configuration_set", keys=list(config.keys())) return self
[docs] def clear(self) -> Self: """Clear all builder state to allow reuse. Resets assets, datastreams, custom actions, and configuration to empty state. Useful for building multiple manifests with the same builder instance. Returns: Self for chaining. """ self._pending_assets.clear() self._pending_datastreams.clear() self._pending_custom_actions.clear() self._configuration = {} logger.debug("manifest_builder_cleared") return self
# ==================================================== # Build Methods # ==================================================== def _build_resources(self) -> list[Resource]: """Build Resource objects from pending assets. Returns: List of Resource instances with current datastreams. """ resources: list[Resource] = [] for asset in self._pending_assets.values(): # Build datastreams dict for this asset using current datastreams asset_datastreams: dict[str, ResourceDatastream] = {} for pending_ds in self._pending_datastreams.values(): asset_datastreams[pending_ds.manifest_ds.name] = ResourceDatastream( way=pending_ds.way, configuration=pending_ds.configuration, ) resource = Resource( resource=KRNAsset(asset.name), properties=asset.properties, parameters=asset.parameters, datastreams=asset_datastreams, ) resources.append(resource) return resources
[docs] def build(self) -> RuntimeManifest: """Build the RuntimeManifest. Constructs the final RuntimeManifest from all configured assets, datastreams, custom actions, and configuration. Returns: RuntimeManifest instance ready for use in testing. """ manifest = RuntimeManifest( payload=RuntimeManifestPayload( resources=self._build_resources(), datastreams=[ds.manifest_ds for ds in self._pending_datastreams.values()], configuration=self._configuration, custom_actions=list(self._pending_custom_actions.values()), ) ) logger.debug( "manifest_built", resources=len(manifest.payload.resources), datastreams=len(manifest.payload.datastreams), custom_actions=len(manifest.payload.custom_actions), config_keys=len(manifest.payload.configuration), ) return manifest