Source code for kelvin.config.smart_app

from __future__ import annotations

from pathlib import Path
from typing import Any, Literal, Optional

from pydantic import Field

from kelvin.config.common import (
    AppBaseConfig,
    AppTypes,
    ConfigBaseModel,
    ConfigError,
    CustomActionsIO,
    read_schema_file,
    resolve_schema_path,
)
from kelvin.message import ParameterType
from kelvin.message.msg_type import PrimitiveTypes

from .manifest import (
    AppDefaults,
    AppManifest,
    CustomActionWay,
    DataTagWay,
    DefaultsDefinition,
    DQWay,
    Flags,
    IODatastreamMapping,
    IODefinition,
    IOWay,
    ManifCustomAction,
    ManifCustomDataQuality,
    ManifDataTag,
    ParamDefinition,
    RuntimeUpdateFlags,
    SchemasDefinition,
)

# =============================================================================
# Configuration Models
# =============================================================================


[docs] class RuntimeUpdateConfig(ConfigBaseModel): """Controls which parts of the smart app can be updated at runtime.""" configuration: bool = False parameters: bool = True resource_properties: bool = True
[docs] class SmartAppFlags(ConfigBaseModel): """Feature flags for smart app behavior.""" enable_runtime_update: RuntimeUpdateConfig = RuntimeUpdateConfig()
[docs] class IOConfig(ConfigBaseModel): """Defines a single IO channel with name, data type, and optional unit.""" name: str data_type: str unit: Optional[str] = None
[docs] class DataIo(ConfigBaseModel): """Container for input and output IO configurations.""" inputs: list[IOConfig] = Field(default_factory=list) outputs: list[IOConfig] = Field(default_factory=list)
[docs] class DataQualityConfig(ConfigBaseModel): """Defines a data quality metric and its associated data streams.""" name: str data_type: str data_streams: list[str] = Field(default_factory=list)
[docs] class DataQualityIO(ConfigBaseModel): """Container for input and output data quality configurations.""" inputs: list[DataQualityConfig] = Field(default_factory=list) outputs: list[DataQualityConfig] = Field(default_factory=list)
[docs] class SmartAppParams(ConfigBaseModel): """Defines a smart app parameter with name and primitive data type.""" name: str data_type: Literal[PrimitiveTypes.number, PrimitiveTypes.string, PrimitiveTypes.boolean]
[docs] class SchemasConfig(ConfigBaseModel): """Paths to JSON schema files for configuration and parameters validation.""" configuration: Optional[str] = None parameters: Optional[str] = None
[docs] class DatastreamMapping(ConfigBaseModel): """Maps an app IO to a datastream.""" app: str datastream: str
[docs] class DeploymentDefaults(ConfigBaseModel): """Default values applied when deploying the smart app.""" system: dict[str, Any] = Field(default_factory=dict) datastream_mapping: list[DatastreamMapping] = Field(default_factory=list) configuration: dict[str, Any] = Field(default_factory=dict) parameters: dict[str, ParameterType] = Field(default_factory=dict)
[docs] class DataTagsIO(ConfigBaseModel): inputs: list[str] = [] outputs: list[str] = []
[docs] class SmartAppConfig(AppBaseConfig): """Complete configuration for a smart app application.""" type: Literal[AppTypes.app] # pyright: ignore[reportIncompatibleVariableOverride] spec_version: str = "5.0.0" flags: SmartAppFlags = SmartAppFlags() data_streams: DataIo = DataIo() control_changes: DataIo = DataIo() data_quality: DataQualityIO = DataQualityIO() parameters: list[SmartAppParams] = Field(default_factory=list) ui_schemas: SchemasConfig = SchemasConfig() defaults: DeploymentDefaults = DeploymentDefaults() custom_actions: CustomActionsIO = CustomActionsIO() api_permissions: Optional[list[str]] = None data_tags: DataTagsIO = DataTagsIO()
[docs] def to_manifest(self, workdir: Path, read_schemas: bool) -> AppManifest: return convert_smart_app_to_manifest(self, workdir=workdir, read_schemas=read_schemas)
# ============================================================================= # Manifest Conversion # ============================================================================= def _validate_io_compatibility(existing: IODefinition, new: IOConfig, context: str) -> None: """Validate that an existing IO definition is compatible with a new one.""" if existing.data_type != new.data_type: raise ConfigError(f"io {new.name} has different data type in data streams and {context}") if existing.unit != new.unit: raise ConfigError(f"io {new.name} has different unit in data streams and {context}") def _ensure_unique_io_names(items: list[IOConfig], context: str) -> None: duplicates: set[str] = set() seen: set[str] = set() for io in items: if io.name in seen: duplicates.add(io.name) seen.add(io.name) if duplicates: raise ConfigError(f"duplicate io name(s) in {context}: {', '.join(sorted(duplicates))}") def _build_io_definitions(config: SmartAppConfig) -> list[IODefinition]: """Build IO definitions from data streams and control changes, validating conflicts.""" ios_map: dict[str, IODefinition] = {} _ensure_unique_io_names(config.data_streams.inputs, "data_streams.inputs") _ensure_unique_io_names(config.data_streams.outputs, "data_streams.outputs") _ensure_unique_io_names(config.control_changes.inputs, "control_changes.inputs") _ensure_unique_io_names(config.control_changes.outputs, "control_changes.outputs") # Add data stream inputs for io in config.data_streams.inputs: ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.input, unit=io.unit) # Add data stream outputs (must not conflict with inputs) for io in config.data_streams.outputs: if io.name in ios_map: raise ConfigError(f"io {io.name} is defined as input and output") ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.output, unit=io.unit) # Add control change inputs (may combine with existing outputs) for io in config.control_changes.inputs: existing = ios_map.get(io.name) if not existing: ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.input_cc, unit=io.unit) continue _validate_io_compatibility(existing, io, "control changes") if existing.way == IOWay.input: raise ConfigError(f"io {io.name} is defined as input and input control changes") if existing.way != IOWay.output: raise ConfigError(f"unexpected configuration of IO {io.name} way, previous way: {existing.way}") existing.way = IOWay.input_cc_output # Add control change outputs (may combine with existing inputs) for io in config.control_changes.outputs: existing = ios_map.get(io.name) if not existing: ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.output_cc, unit=io.unit) continue _validate_io_compatibility(existing, io, "control changes") if existing.way == IOWay.output: raise ConfigError(f"io {io.name} is defined as output and output control changes") if existing.way != IOWay.input: raise ConfigError(f"unexpected configuration of IO {io.name} way, previous way: {existing.way}") existing.way = IOWay.input_output_cc return list(ios_map.values()) def _build_schemas(config: SmartAppConfig, workdir: Path, read_schemas: bool) -> SchemasDefinition: """Build schema definitions, optionally reading schema files from disk.""" schemas = SchemasDefinition() if not read_schemas: return schemas if config.ui_schemas.configuration: schema_path = resolve_schema_path(workdir, config.ui_schemas.configuration) schemas.configuration = read_schema_file(schema_path) if config.ui_schemas.parameters: schema_path = resolve_schema_path(workdir, config.ui_schemas.parameters) schemas.parameters = read_schema_file(schema_path) return schemas def _build_custom_actions(config: SmartAppConfig) -> list[ManifCustomAction]: """Convert input/output custom actions to manifest format.""" input_actions = [ ManifCustomAction(type=action.type, way=CustomActionWay.input_ca) for action in config.custom_actions.inputs ] output_actions = [ ManifCustomAction(type=action.type, way=CustomActionWay.output_ca) for action in config.custom_actions.outputs ] return input_actions + output_actions def _build_data_quality(config: SmartAppConfig) -> list[ManifCustomDataQuality]: """Convert input/output data quality definitions to manifest format.""" input_dq = [ ManifCustomDataQuality( name=dq.name, data_type=dq.data_type, way=DQWay.input, datastreams=dq.data_streams, ) for dq in config.data_quality.inputs ] output_dq = [ ManifCustomDataQuality( name=dq.name, data_type=dq.data_type, way=DQWay.output, datastreams=dq.data_streams, ) for dq in config.data_quality.outputs ] return input_dq + output_dq def _build_defaults(config: SmartAppConfig) -> DefaultsDefinition | None: """Build defaults definition if any defaults were explicitly set.""" fields_set = config.defaults.model_fields_set has_configuration = "configuration" in fields_set has_datastream_mapping = "datastream_mapping" in fields_set has_system = "system" in fields_set has_api_permissions = config.api_permissions is not None if not (has_configuration or has_datastream_mapping or has_system or has_api_permissions): return None defaults = DefaultsDefinition() if has_configuration or has_datastream_mapping: defaults.app = AppDefaults() if has_configuration: defaults.app.configuration = config.defaults.configuration if has_datastream_mapping: defaults.app.io_datastream_mapping = [ IODatastreamMapping(io=mapping.app, datastream=mapping.datastream) for mapping in config.defaults.datastream_mapping ] if has_system: defaults.system = config.defaults.system if has_api_permissions: defaults.api_permissions = config.api_permissions return defaults def _build_flags(config: SmartAppConfig) -> Flags: """Build manifest flags from smart app config.""" return Flags( spec_version=config.spec_version, enable_runtime_update=RuntimeUpdateFlags( resource_parameters=config.flags.enable_runtime_update.parameters, resource_properties=config.flags.enable_runtime_update.resource_properties, configuration=config.flags.enable_runtime_update.configuration, ), resources_required=True, ) def _build_parameters(config: SmartAppConfig) -> list[ParamDefinition]: """Build parameter definitions with their default values.""" return [ ParamDefinition(name=p.name, data_type=p.data_type, default=config.defaults.parameters.get(p.name)) for p in config.parameters ] def _build_data_tags(config: SmartAppConfig) -> list[ManifDataTag]: """Convert input/output data tags to manifest format.""" return [ManifDataTag(tag_name=tag, way=DataTagWay.input) for tag in config.data_tags.inputs] + [ ManifDataTag(tag_name=tag, way=DataTagWay.output) for tag in config.data_tags.outputs ]
[docs] def convert_smart_app_to_manifest(config: SmartAppConfig, workdir: Path, read_schemas: bool = True) -> AppManifest: """Convert a SmartAppConfig to its AppManifest representation.""" return AppManifest( name=config.name, title=config.title, description=config.description, type=config.type, version=config.version, category=config.category, flags=_build_flags(config), parameters=_build_parameters(config), io=_build_io_definitions(config), schemas=_build_schemas(config, workdir, read_schemas), defaults=_build_defaults(config), custom_actions=_build_custom_actions(config), data_quality=_build_data_quality(config), data_tags=_build_data_tags(config), )