Source code for kelvin.config.appyaml

from __future__ import annotations

from pathlib import Path
from typing import Any, Dict, List, Literal, Optional

from pydantic import BaseModel

from .common import AppTypes, ConfigBaseModel
from .manifest import AppManifest


[docs] class ConfigurationError(Exception): pass
[docs] class MetricInfo(BaseModel): model_config = {"extra": "allow"} asset_names: Optional[List[str]] = []
[docs] class Metric(BaseModel): model_config = {"extra": "allow"} name: str data_type: str control_change: bool = False
[docs] class ParameterDefinition(BaseModel): model_config = {"extra": "allow"} name: str data_type: str default: Optional[Dict] = None
[docs] class MetricInput(Metric): model_config = {"extra": "allow"} sources: Optional[List[MetricInfo]] = []
[docs] class MetricOutput(Metric): model_config = {"extra": "allow"} targets: Optional[List[MetricInfo]] = []
[docs] class AssetsEntry(BaseModel): model_config = {"extra": "allow"} name: str parameters: Dict[str, Any] = {} properties: Dict[str, Any] = {}
[docs] class MetricMap(BaseModel): model_config = {"extra": "allow"} name: str asset_name: str data_type: str access: str = "RO" configuration: Dict = {}
[docs] class AppKelvin(BaseModel): model_config = {"extra": "allow"} assets: List[AssetsEntry] = [] inputs: List[Metric] = [] outputs: List[Metric] = [] parameters: List[ParameterDefinition] = [] configuration: Dict = {}
[docs] class AppBridge(BaseModel): model_config = {"extra": "allow"} metrics_map: List[MetricMap] = [] configuration: Dict = {}
[docs] class AppDocker(ConfigBaseModel): model_config = {"extra": "allow"}
[docs] class EnvironmentConfig(ConfigBaseModel): workload_name: str node_name: str
[docs] class AppConfig(ConfigBaseModel): type: Literal[AppTypes.bridge, AppTypes.kelvin_app, AppTypes.docker] kelvin: Optional[AppKelvin] = None bridge: Optional[AppBridge] = None docker: Optional[AppDocker] = None
[docs] class AppYamlInfo(ConfigBaseModel): name: str title: str description: str version: str
[docs] class SystemConfig(ConfigBaseModel): model_config = {"extra": "allow"} privileged: bool = False
[docs] class AppYaml(ConfigBaseModel): spec_version: str environment: Optional[EnvironmentConfig] = None info: AppYamlInfo app: AppConfig system: Optional[SystemConfig] = None
[docs] def to_manifest(self, read_schemas: bool = True, workdir: Path = Path(".")) -> AppManifest: return AppManifest( name=self.info.name, title=self.info.title, description=self.info.description, version=self.info.version, type=self.app.type, spec_version=self.spec_version, )