Source code for kelvin.config.smart_app

from __future__ import annotations

from pathlib import Path
from typing import Dict, List, Literal, Optional

from kelvin.config.common import (
    AppBaseConfig,
    AppTypes,
    ConfigBaseModel,
    ConfigError,
    CustomActionsIO,
    read_schema_file,
)
from kelvin.message import ParameterType
from kelvin.message.msg_type import PrimitiveTypes

from .manifest import (
    AppDefaults,
    AppManifest,
    CustomActionWay,
    DefaultsDefinition,
    DQWay,
    Flags,
    IODatastreamMapping,
    IODefinition,
    IOWay,
    ManifCustomAction,
    ManifCustomDataQuality,
    ParamDefinition,
    RuntimeUpdateFlags,
    SchemasDefinition,
)


[docs] class RuntimeUpdateConfig(ConfigBaseModel): configuration: bool = False resources: bool = False parameters: bool = True resource_properties: bool = True
[docs] class SmartAppFlags(ConfigBaseModel): enable_runtime_update: RuntimeUpdateConfig = RuntimeUpdateConfig()
[docs] class IOConfig(ConfigBaseModel): name: str data_type: str unit: Optional[str] = None
[docs] class DataIo(ConfigBaseModel): inputs: List[IOConfig] = [] outputs: List[IOConfig] = []
[docs] class DataQualityConfig(ConfigBaseModel): name: str data_type: str data_streams: List[str] = []
[docs] class DataQualityIO(ConfigBaseModel): inputs: List[DataQualityConfig] = [] outputs: List[DataQualityConfig] = []
[docs] class SmartAppParams(ConfigBaseModel): name: str data_type: Literal[PrimitiveTypes.number, PrimitiveTypes.string, PrimitiveTypes.boolean] # type: ignore
[docs] class SchemasConfig(ConfigBaseModel): configuration: Optional[str] = None parameters: Optional[str] = None
[docs] class DatastreamMapping(ConfigBaseModel): app: str datastream: str
[docs] class DeploymentDefaults(ConfigBaseModel): system: Dict = {} datastream_mapping: List[DatastreamMapping] = [] configuration: Dict = {} parameters: Dict[str, ParameterType] = {}
[docs] class SmartAppConfig(AppBaseConfig): type: Literal[AppTypes.app] # type: ignore spec_version: str = "5.0.0" flags: SmartAppFlags = SmartAppFlags() data_streams: DataIo = DataIo() control_changes: DataIo = DataIo() data_quality: DataQualityIO = DataQualityIO() parameters: List[SmartAppParams] = [] ui_schemas: SchemasConfig = SchemasConfig() defaults: DeploymentDefaults = DeploymentDefaults() custom_actions: CustomActionsIO = CustomActionsIO()
[docs] def to_manifest(self, read_schemas: bool = True, workdir: Path = Path(".")) -> AppManifest: return convert_smart_app_to_manifest(self, read_schemas=read_schemas, workdir=workdir)
[docs] def convert_smart_app_to_manifest( config: SmartAppConfig, read_schemas: bool = True, workdir: Path = Path(".") ) -> AppManifest: ios_map: Dict[str, IODefinition] = {} for io in config.data_streams.inputs: ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.input, unit=io.unit) for io in config.data_streams.outputs: if io.name in ios_map: raise ConfigError(f"IO {io.name} is defined as input and output") ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.output, unit=io.unit) for io in config.control_changes.inputs: io_exist = ios_map.get(io.name) if not io_exist: ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.input_cc, unit=io.unit) continue if io_exist.data_type != io.data_type: raise ConfigError(f"IO {io.name} has different data type in data streams and control changes") if io_exist.unit != io.unit: raise ConfigError(f"IO {io.name} has different unit in data streams and control changes") if io_exist.way == IOWay.input: raise ConfigError(f"IO {io.name} is defined as input and input control changes") if io_exist.way != IOWay.output: raise ConfigError(f"Unexpected configuration of IO {io.name} way. Previous way: {io_exist.way}") io_exist.way = IOWay.input_cc_output for io in config.control_changes.outputs: io_exist = ios_map.get(io.name) if not io_exist: ios_map[io.name] = IODefinition(name=io.name, data_type=io.data_type, way=IOWay.output_cc, unit=io.unit) continue if io_exist.data_type != io.data_type: raise ConfigError(f"IO {io.name} has different data type in data streams and control changes") if io_exist.unit != io.unit: raise ConfigError(f"IO {io.name} has different unit in data streams and control changes") if io_exist.way == IOWay.output: raise ConfigError(f"IO {io.name} is defined as output and output control changes") if io_exist.way != IOWay.input: raise ConfigError(f"Unexpected configuration of IO {io.name} way. Previous way: {io_exist.way}") io_exist.way = IOWay.input_output_cc io_map = [ IODatastreamMapping(io=smart_io.app, datastream=smart_io.datastream) for smart_io in config.defaults.datastream_mapping ] schemas = SchemasDefinition() if read_schemas: schemas.configuration = ( read_schema_file(workdir / config.ui_schemas.configuration) if config.ui_schemas.configuration else {} ) schemas.parameters = ( read_schema_file(workdir / config.ui_schemas.parameters) if config.ui_schemas.parameters else {} ) manif_custom_actions = [ ManifCustomAction(type=cai.type, way=CustomActionWay.input_ca) for cai in config.custom_actions.inputs ] + [ManifCustomAction(type=cai.type, way=CustomActionWay.output_ca) for cai in config.custom_actions.outputs] manif_data_qualities = [ ManifCustomDataQuality(name=dq.name, data_type=dq.data_type, way=DQWay.input, datastreams=dq.data_streams) for dq in config.data_quality.inputs ] + [ ManifCustomDataQuality(name=dq.name, data_type=dq.data_type, way=DQWay.output, datastreams=dq.data_streams) for dq in config.data_quality.outputs ] defaults = DefaultsDefinition() defaults_has_content = False if "configuration" in config.defaults.model_fields_set or "datastream_mapping" in config.defaults.model_fields_set: defaults_has_content = True defaults.app = AppDefaults() if "configuration" in config.defaults.model_fields_set: defaults.app.configuration = config.defaults.configuration if "datastream_mapping" in config.defaults.model_fields_set: defaults.app.io_datastream_mapping = io_map if "system" in config.defaults.model_fields_set: defaults_has_content = True defaults.system = config.defaults.system manif_defaults = defaults if defaults_has_content else None return AppManifest( name=config.name, title=config.title, description=config.description, type=config.type, version=config.version, category=config.category, flags=Flags( spec_version=config.spec_version, enable_runtime_update=RuntimeUpdateFlags( io=config.flags.enable_runtime_update.resources, resource_parameters=config.flags.enable_runtime_update.parameters, resource_properties=config.flags.enable_runtime_update.resource_properties, configuration=config.flags.enable_runtime_update.configuration, ), resources_required=True, ), parameters=[ ParamDefinition(name=p.name, data_type=p.data_type, default=config.defaults.parameters.get(p.name)) for p in config.parameters ], io=list(ios_map.values()), schemas=schemas, defaults=manif_defaults, custom_actions=manif_custom_actions, data_quality=manif_data_qualities, )