from __future__ import annotations
from pathlib import Path
from typing import Any, Literal, Optional
from pydantic import Field
from kelvin.config.common import (
AppBaseConfig,
AppTypes,
ConfigBaseModel,
ConfigError,
CustomActionsIO,
read_schema_file,
resolve_schema_path,
)
from kelvin.message import ParameterType
from kelvin.message.msg_type import PrimitiveTypes
from .manifest import (
AppDefaults,
AppManifest,
CustomActionWay,
DataTagWay,
DefaultsDefinition,
DQWay,
Flags,
IODatastreamMapping,
IODefinition,
IOWay,
ManifCustomAction,
ManifCustomDataQuality,
ManifDataTag,
ParamDefinition,
RuntimeUpdateFlags,
SchemasDefinition,
)
# =============================================================================
# Configuration Models
# =============================================================================
[docs]
class RuntimeUpdateConfig(ConfigBaseModel):
"""Controls which parts of the smart app can be updated at runtime."""
configuration: bool = False
parameters: bool = True
resource_properties: bool = True
[docs]
class SmartAppFlags(ConfigBaseModel):
"""Feature flags for smart app behavior."""
enable_runtime_update: RuntimeUpdateConfig = RuntimeUpdateConfig()
[docs]
class IOConfig(ConfigBaseModel):
"""Defines a single IO channel with name, data type, and optional unit."""
name: str
data_type: str
unit: Optional[str] = None
[docs]
class DataIo(ConfigBaseModel):
"""Container for input and output IO configurations."""
inputs: list[IOConfig] = Field(default_factory=list)
outputs: list[IOConfig] = Field(default_factory=list)
[docs]
class DataQualityConfig(ConfigBaseModel):
"""Defines a data quality metric and its associated data streams."""
name: str
data_type: str
data_streams: list[str] = Field(default_factory=list)
[docs]
class DataQualityIO(ConfigBaseModel):
"""Container for input and output data quality configurations."""
inputs: list[DataQualityConfig] = Field(default_factory=list)
outputs: list[DataQualityConfig] = Field(default_factory=list)
[docs]
class SmartAppParams(ConfigBaseModel):
"""Defines a smart app parameter with name and primitive data type."""
name: str
data_type: Literal[PrimitiveTypes.number, PrimitiveTypes.string, PrimitiveTypes.boolean]
[docs]
class SchemasConfig(ConfigBaseModel):
"""Paths to JSON schema files for configuration and parameters validation."""
configuration: Optional[str] = None
parameters: Optional[str] = None
[docs]
class DatastreamMapping(ConfigBaseModel):
"""Maps an app IO to a datastream."""
app: str
datastream: str
[docs]
class DeploymentDefaults(ConfigBaseModel):
"""Default values applied when deploying the smart app."""
system: dict[str, Any] = Field(default_factory=dict)
datastream_mapping: list[DatastreamMapping] = Field(default_factory=list)
configuration: dict[str, Any] = Field(default_factory=dict)
parameters: dict[str, ParameterType] = Field(default_factory=dict)
[docs]
class SmartAppConfig(AppBaseConfig):
"""Complete configuration for a smart app application."""
type: Literal[AppTypes.app] # pyright: ignore[reportIncompatibleVariableOverride]
spec_version: str = "5.0.0"
flags: SmartAppFlags = SmartAppFlags()
data_streams: DataIo = DataIo()
control_changes: DataIo = DataIo()
data_quality: DataQualityIO = DataQualityIO()
parameters: list[SmartAppParams] = Field(default_factory=list)
ui_schemas: SchemasConfig = SchemasConfig()
defaults: DeploymentDefaults = DeploymentDefaults()
custom_actions: CustomActionsIO = CustomActionsIO()
api_permissions: Optional[list[str]] = None
data_tags: DataTagsIO = DataTagsIO()
[docs]
def to_manifest(self, workdir: Path, read_schemas: bool) -> AppManifest:
return convert_smart_app_to_manifest(self, workdir=workdir, read_schemas=read_schemas)
# =============================================================================
# Manifest Conversion
# =============================================================================
def _validate_io_compatibility(existing: IODefinition, new: IOConfig, context: str) -> None:
"""Validate that an existing IO definition is compatible with a new one."""
if existing.data_type != new.data_type:
raise ConfigError(f"io {new.name} has different data type in data streams and {context}")
if existing.unit != new.unit:
raise ConfigError(f"io {new.name} has different unit in data streams and {context}")
def _ensure_unique_io_names(items: list[IOConfig], context: str) -> None:
duplicates: set[str] = set()
seen: set[str] = set()
for io in items:
if io.name in seen:
duplicates.add(io.name)
seen.add(io.name)
if duplicates:
raise ConfigError(f"duplicate io name(s) in {context}: {', '.join(sorted(duplicates))}")
def _build_io_definitions(config: SmartAppConfig) -> list[IODefinition]:
"""Build IO definitions from data streams and control changes, validating conflicts."""
ios_map: dict[str, IODefinition] = {}
_ensure_unique_io_names(config.data_streams.inputs, "data_streams.inputs")
_ensure_unique_io_names(config.data_streams.outputs, "data_streams.outputs")
_ensure_unique_io_names(config.control_changes.inputs, "control_changes.inputs")
_ensure_unique_io_names(config.control_changes.outputs, "control_changes.outputs")
# Add data stream inputs
for io in config.data_streams.inputs:
ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.input, unit=io.unit)
# Add data stream outputs (must not conflict with inputs)
for io in config.data_streams.outputs:
if io.name in ios_map:
raise ConfigError(f"io {io.name} is defined as input and output")
ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.output, unit=io.unit)
# Add control change inputs (may combine with existing outputs)
for io in config.control_changes.inputs:
existing = ios_map.get(io.name)
if not existing:
ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.input_cc, unit=io.unit)
continue
_validate_io_compatibility(existing, io, "control changes")
if existing.way == IOWay.input:
raise ConfigError(f"io {io.name} is defined as input and input control changes")
if existing.way != IOWay.output:
raise ConfigError(f"unexpected configuration of IO {io.name} way, previous way: {existing.way}")
existing.way = IOWay.input_cc_output
# Add control change outputs (may combine with existing inputs)
for io in config.control_changes.outputs:
existing = ios_map.get(io.name)
if not existing:
ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.output_cc, unit=io.unit)
continue
_validate_io_compatibility(existing, io, "control changes")
if existing.way == IOWay.output:
raise ConfigError(f"io {io.name} is defined as output and output control changes")
if existing.way != IOWay.input:
raise ConfigError(f"unexpected configuration of IO {io.name} way, previous way: {existing.way}")
existing.way = IOWay.input_output_cc
return list(ios_map.values())
def _build_schemas(config: SmartAppConfig, workdir: Path, read_schemas: bool) -> SchemasDefinition:
"""Build schema definitions, optionally reading schema files from disk."""
schemas = SchemasDefinition()
if not read_schemas:
return schemas
if config.ui_schemas.configuration:
schema_path = resolve_schema_path(workdir, config.ui_schemas.configuration)
schemas.configuration = read_schema_file(schema_path)
if config.ui_schemas.parameters:
schema_path = resolve_schema_path(workdir, config.ui_schemas.parameters)
schemas.parameters = read_schema_file(schema_path)
return schemas
def _build_custom_actions(config: SmartAppConfig) -> list[ManifCustomAction]:
"""Convert input/output custom actions to manifest format."""
input_actions = [
ManifCustomAction(type=action.type, way=CustomActionWay.input_ca) for action in config.custom_actions.inputs
]
output_actions = [
ManifCustomAction(type=action.type, way=CustomActionWay.output_ca) for action in config.custom_actions.outputs
]
return input_actions + output_actions
def _build_data_quality(config: SmartAppConfig) -> list[ManifCustomDataQuality]:
"""Convert input/output data quality definitions to manifest format."""
input_dq = [
ManifCustomDataQuality(
name=dq.name,
data_type=dq.data_type,
way=DQWay.input,
datastreams=dq.data_streams,
)
for dq in config.data_quality.inputs
]
output_dq = [
ManifCustomDataQuality(
name=dq.name,
data_type=dq.data_type,
way=DQWay.output,
datastreams=dq.data_streams,
)
for dq in config.data_quality.outputs
]
return input_dq + output_dq
def _build_defaults(config: SmartAppConfig) -> DefaultsDefinition | None:
"""Build defaults definition if any defaults were explicitly set."""
fields_set = config.defaults.model_fields_set
has_configuration = "configuration" in fields_set
has_datastream_mapping = "datastream_mapping" in fields_set
has_system = "system" in fields_set
has_api_permissions = config.api_permissions is not None
if not (has_configuration or has_datastream_mapping or has_system or has_api_permissions):
return None
defaults = DefaultsDefinition()
if has_configuration or has_datastream_mapping:
defaults.app = AppDefaults()
if has_configuration:
defaults.app.configuration = config.defaults.configuration
if has_datastream_mapping:
defaults.app.io_datastream_mapping = [
IODatastreamMapping(io=mapping.app, datastream=mapping.datastream)
for mapping in config.defaults.datastream_mapping
]
if has_system:
defaults.system = config.defaults.system
if has_api_permissions:
defaults.api_permissions = config.api_permissions
return defaults
def _build_flags(config: SmartAppConfig) -> Flags:
"""Build manifest flags from smart app config."""
return Flags(
spec_version=config.spec_version,
enable_runtime_update=RuntimeUpdateFlags(
resource_parameters=config.flags.enable_runtime_update.parameters,
resource_properties=config.flags.enable_runtime_update.resource_properties,
configuration=config.flags.enable_runtime_update.configuration,
),
resources_required=True,
)
def _build_parameters(config: SmartAppConfig) -> list[ParamDefinition]:
"""Build parameter definitions with their default values."""
return [
ParamDefinition(name=p.name, data_type=p.data_type, default=config.defaults.parameters.get(p.name))
for p in config.parameters
]
def _build_data_tags(config: SmartAppConfig) -> list[ManifDataTag]:
"""Convert input/output data tags to manifest format."""
return [ManifDataTag(tag_name=tag, way=DataTagWay.input) for tag in config.data_tags.inputs] + [
ManifDataTag(tag_name=tag, way=DataTagWay.output) for tag in config.data_tags.outputs
]
[docs]
def convert_smart_app_to_manifest(config: SmartAppConfig, workdir: Path, read_schemas: bool = True) -> AppManifest:
"""Convert a SmartAppConfig to its AppManifest representation."""
return AppManifest(
name=config.name,
title=config.title,
description=config.description,
type=config.type,
version=config.version,
category=config.category,
flags=_build_flags(config),
parameters=_build_parameters(config),
io=_build_io_definitions(config),
schemas=_build_schemas(config, workdir, read_schemas),
defaults=_build_defaults(config),
custom_actions=_build_custom_actions(config),
data_quality=_build_data_quality(config),
data_tags=_build_data_tags(config),
)