Source code for kelvin.config.appyaml

from __future__ import annotations

from pathlib import Path
from typing import Any, ClassVar, Literal, Optional

from pydantic import BaseModel, ConfigDict, Field

from .common import AppTypes, ConfigBaseModel
from .manifest import AppManifest, Flags


[docs] class ConfigurationError(Exception): pass
[docs] class MetricInfo(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") asset_names: Optional[list[str]] = Field(default_factory=list)
[docs] class Metric(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") name: str data_type: str control_change: bool = False
[docs] class ParameterDefinition(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") name: str data_type: str default: Optional[dict[str, Any]] = None
[docs] class MetricInput(Metric): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") sources: Optional[list[MetricInfo]] = Field(default_factory=list)
[docs] class MetricOutput(Metric): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") targets: Optional[list[MetricInfo]] = Field(default_factory=list)
[docs] class AssetsEntry(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") name: str parameters: dict[str, Any] = Field(default_factory=dict) properties: dict[str, Any] = Field(default_factory=dict)
[docs] class MetricMap(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") name: str asset_name: str data_type: str access: str = "RO" configuration: dict[str, Any] = Field(default_factory=dict)
[docs] class AppKelvin(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") assets: list[AssetsEntry] = Field(default_factory=list) inputs: list[Metric] = Field(default_factory=list) outputs: list[Metric] = Field(default_factory=list) parameters: list[ParameterDefinition] = Field(default_factory=list) configuration: dict[str, Any] = Field(default_factory=dict)
[docs] class AppBridge(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") metrics_map: list[MetricMap] = Field(default_factory=list) configuration: dict[str, Any] = Field(default_factory=dict)
[docs] class AppDocker(ConfigBaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow")
[docs] class EnvironmentConfig(ConfigBaseModel): workload_name: str node_name: str
[docs] class AppConfig(ConfigBaseModel): type: Literal[AppTypes.bridge, AppTypes.kelvin_app, AppTypes.docker] kelvin: Optional[AppKelvin] = None bridge: Optional[AppBridge] = None docker: Optional[AppDocker] = None
[docs] class AppYamlInfo(ConfigBaseModel): name: str title: str description: str version: str
[docs] class SystemConfig(ConfigBaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") privileged: bool = False
[docs] class AppYaml(ConfigBaseModel): spec_version: str environment: Optional[EnvironmentConfig] = None info: AppYamlInfo app: AppConfig system: Optional[SystemConfig] = None
[docs] def to_manifest(self, workdir: Path, read_schemas: bool) -> AppManifest: _ = workdir, read_schemas return AppManifest( name=self.info.name, title=self.info.title, description=self.info.description, version=self.info.version, type=self.app.type, flags=Flags(spec_version=self.spec_version), )