Source code for kelvin.message.runtime_manifest

from __future__ import annotations

from enum import Enum
from typing import Any, Dict, List, Literal, Optional

from pydantic import BaseModel, model_validator
from typing_extensions import TypeAlias

from kelvin.krn import KRN, KRNAsset
from kelvin.message import Message
from kelvin.message.base_messages import ParameterType, PropertyType
from kelvin.message.msg_type import KMessageTypeRuntimeManifest


class ManifestDatastream(BaseModel):
    model_config = {"extra": "allow"}

    name: str
    data_type_name: Optional[str] = None
    semantic_type_name: Optional[str] = None
    unit_name: Optional[str] = None

    # legacy fields for backward compatibility
    primitive_type_name: Optional[str] = None

    @model_validator(mode="before")
    @classmethod
    def prefill_resource(cls, data: dict[str, Any]) -> dict[str, Any]:
        if isinstance(data, dict):
            # If primitive_type_name is not set, try to infer it from data_type_name
            if not data.get("primitive_type_name"):
                data["primitive_type_name"] = data.get("data_type_name")
        return data


class IOStorage(str, Enum):
    node_and_cloud = "node-and-cloud"
    node = "node"
    none = "none"


[docs] class WayEnum(str, Enum): output = "output" input_cc = "input-cc" input_cc_output = "input-cc+output" input = "input" output_cc = "output-cc" input_output_cc = "input+output-cc"
class SimpleValueGuardrail(BaseModel): value: float inclusive: bool = False class RelativeValueType(str, Enum): value = "value" percentage = "percentage" class RelativeValueGuardrail(BaseModel): value: float inclusive: bool = False type: RelativeValueType = RelativeValueType.value class RelativeMinMax(BaseModel): min: Optional[RelativeValueGuardrail] = None max: Optional[RelativeValueGuardrail] = None class RelativeGuardrail(BaseModel): increase: Optional[RelativeMinMax] = None decrease: Optional[RelativeMinMax] = None class NumericGuardrail(BaseModel): min: Optional[SimpleValueGuardrail] = None max: Optional[SimpleValueGuardrail] = None relative: Optional[RelativeGuardrail] = None class Guardrail(BaseModel): control_disabled: bool = False numeric: Optional[NumericGuardrail] = None class ResourceDatastream(BaseModel): map_to: Optional[str] = None remote: Optional[bool] = None configuration: Dict = {} way: WayEnum = WayEnum.output storage: IOStorage = IOStorage.node_and_cloud guardrail: Optional[Guardrail] = None # legacy fields access: Literal["RO", "RW", "WO"] = "RO" owned: Optional[bool] = False @model_validator(mode="before") @classmethod def prefill_resource(cls, data: dict[str, Any]) -> dict[str, Any]: if isinstance(data, dict): # set "way" if not set for backward compatibility if data.get("way"): return data owned = data.get("owned", False) access = data.get("access", "RO") if owned: if access == "RO": data["way"] = WayEnum.output elif access == "RW": data["way"] = WayEnum.input_cc_output elif access == "WO": data["way"] = WayEnum.input_cc else: if access == "RO": data["way"] = WayEnum.input elif access == "RW": data["way"] = WayEnum.input_output_cc elif access == "WO": data["way"] = WayEnum.output_cc return data class DQWayEnum(str, Enum): output = "output" input = "input" DQConfiguration: TypeAlias = Dict[str, Any] class DataQuality(BaseModel): way: DQWayEnum = DQWayEnum.output datastreams: Dict[str, DQConfiguration] = {} configuration: DQConfiguration = {} class AssetTypeDetail(BaseModel): name: str title: str class AssetDetail(BaseModel): name: str title: str type: AssetTypeDetail class Resource(BaseModel): resource: KRN datastreams: Dict[str, ResourceDatastream] = {} properties: Dict[str, PropertyType] = {} parameters: Dict[str, ParameterType] = {} data_quality: Dict[str, Any] = {} asset: Optional[AssetDetail] = None # Deprecated fields for backward compatibility type: str = "" name: str = "" @model_validator(mode="before") @classmethod def prefill_resource(cls, data: dict[str, Any]) -> dict[str, Any]: if isinstance(data, dict): # If resource is not provided, try to infer it from type and name if not data.get("resource") and data.get("type") == "asset" and data.get("name"): data["resource"] = KRNAsset(data["name"]) return data class CAWayEnum(str, Enum): input_ca = "input-ca" output_ca = "output-ca" class CustomAction(BaseModel): type: str way: CAWayEnum class RuntimeManifestPayload(BaseModel): model_config = {"extra": "allow"} resources: List[Resource] = [] configuration: Dict = {} datastreams: List[ManifestDatastream] = [] custom_actions: List[CustomAction] = []
[docs] class RuntimeManifest(Message): TYPE_ = KMessageTypeRuntimeManifest() type: KMessageTypeRuntimeManifest = KMessageTypeRuntimeManifest() payload: RuntimeManifestPayload