Source code for kelvin.message.runtime_manifest

from __future__ import annotations

from enum import Enum
from typing import Any, ClassVar, Literal, Optional, cast

from pydantic import BaseModel, ConfigDict, Field, model_validator
from typing_extensions import TypeAlias

from kelvin.krn import KRN, KRNAsset
from kelvin.message.base_messages import ParameterType, PropertyType
from kelvin.message.message import Message
from kelvin.message.msg_type import KMessageTypeRuntimeManifest


class ManifestDatastream(BaseModel):
    model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow")

    name: str
    data_type_name: Optional[str] = None
    semantic_type_name: Optional[str] = None
    unit_name: Optional[str] = None

    # legacy fields for backward compatibility
    primitive_type_name: Optional[str] = None

    @model_validator(mode="before")
    @classmethod
    def prefill_resource(cls, data: Any) -> Any:
        if isinstance(data, dict):
            # If primitive_type_name is not set, try to infer it from data_type_name
            data = cast(dict[str, Any], data)
            if not data.get("primitive_type_name"):
                data["primitive_type_name"] = data.get("data_type_name")
        return data


class IOStorage(str, Enum):
    node_and_cloud = "node-and-cloud"
    node = "node"
    none = "none"


[docs] class WayEnum(str, Enum): output = "output" input_cc = "input-cc" input_cc_output = "input-cc+output" input = "input" output_cc = "output-cc" input_output_cc = "input+output-cc"
class SimpleValueGuardrail(BaseModel): value: float inclusive: bool = False class RelativeValueType(str, Enum): value = "value" # pyright: ignore[reportIncompatibleMethodOverride, reportAssignmentType] percentage = "percentage" class RelativeValueGuardrail(BaseModel): value: float inclusive: bool = False type: RelativeValueType = RelativeValueType.value # pyright: ignore[reportAssignmentType] class RelativeMinMax(BaseModel): min: Optional[RelativeValueGuardrail] = None max: Optional[RelativeValueGuardrail] = None class RelativeGuardrail(BaseModel): increase: Optional[RelativeMinMax] = None decrease: Optional[RelativeMinMax] = None class NumericGuardrail(BaseModel): min: Optional[SimpleValueGuardrail] = None max: Optional[SimpleValueGuardrail] = None relative: Optional[RelativeGuardrail] = None class Guardrail(BaseModel): control_disabled: bool = False numeric: Optional[NumericGuardrail] = None class ResourceDatastream(BaseModel): map_to: Optional[str] = None remote: Optional[bool] = None configuration: dict[str, Any] = Field(default_factory=dict) way: WayEnum = WayEnum.output storage: IOStorage = IOStorage.node_and_cloud guardrail: Optional[Guardrail] = None # legacy fields access: Literal["RO", "RW", "WO"] = "RO" owned: Optional[bool] = False @model_validator(mode="before") @classmethod def prefill_resource(cls, data: Any) -> Any: if isinstance(data, dict): data = cast(dict[str, Any], data) # set "way" if not set for backward compatibility if data.get("way"): return data owned = data.get("owned", False) access = data.get("access", "RO") if owned: if access == "RO": data["way"] = WayEnum.output elif access == "RW": data["way"] = WayEnum.input_cc_output elif access == "WO": data["way"] = WayEnum.input_cc else: if access == "RO": data["way"] = WayEnum.input elif access == "RW": data["way"] = WayEnum.input_output_cc elif access == "WO": data["way"] = WayEnum.output_cc return data class DQWayEnum(str, Enum): output = "output" input = "input" class DataTagWayEnum(str, Enum): output = "output" input = "input" class ResourceDataTag(BaseModel): way: DataTagWayEnum = DataTagWayEnum.output DQConfiguration: TypeAlias = dict[str, Any] class DataQuality(BaseModel): way: DQWayEnum = DQWayEnum.output datastreams: dict[str, DQConfiguration] = Field(default_factory=dict) configuration: DQConfiguration = Field(default_factory=dict) class AssetTypeDetail(BaseModel): name: str title: str class AssetDetail(BaseModel): name: str title: str type: AssetTypeDetail class Resource(BaseModel): resource: KRN datastreams: dict[str, ResourceDatastream] = Field(default_factory=dict) properties: dict[str, PropertyType] = Field(default_factory=dict) parameters: dict[str, ParameterType] = Field(default_factory=dict) data_quality: dict[str, Any] = Field(default_factory=dict) data_tags: dict[str, ResourceDataTag] = Field(default_factory=dict) asset: Optional[AssetDetail] = None # Deprecated fields for backward compatibility type: str = "" name: str = "" @model_validator(mode="before") @classmethod def prefill_resource(cls, data: Any) -> Any: if isinstance(data, dict): data = cast(dict[str, Any], data) # If resource is not provided, try to infer it from type and name if not data.get("resource") and data.get("type") == "asset" and data.get("name"): data["resource"] = KRNAsset(data["name"]) return data class CAWayEnum(str, Enum): input_ca = "input-ca" output_ca = "output-ca" class CustomAction(BaseModel): type: str way: CAWayEnum class RuntimeManifestPayload(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") resources: list[Resource] = Field(default_factory=list) configuration: dict[str, Any] = Field(default_factory=dict) datastreams: list[ManifestDatastream] = Field(default_factory=list) custom_actions: list[CustomAction] = Field(default_factory=list)
[docs] class RuntimeManifest(Message): TYPE_: ClassVar[KMessageTypeRuntimeManifest] = KMessageTypeRuntimeManifest() # pyright: ignore[reportIncompatibleVariableOverride] type: KMessageTypeRuntimeManifest = KMessageTypeRuntimeManifest() # pyright: ignore[reportIncompatibleVariableOverride] payload: RuntimeManifestPayload # pyright: ignore[reportGeneralTypeIssues]