Source code for kelvin.config.manifest

from __future__ import annotations

from enum import Enum
from typing import Any, Dict, List, Literal, Optional

from pydantic import Field

from kelvin.config.common import AppBaseConfig, ConfigBaseModel, CustomActionTypeStr, VersionStr
from kelvin.krn import KRN
from kelvin.message import ParameterType
from kelvin.message.msg_type import PrimitiveTypes
from kelvin.message.runtime_manifest import IOStorage


[docs] class RuntimeUpdateFlags(ConfigBaseModel): io: bool = False configuration: bool = False resource_parameters: bool = True resource_properties: bool = True
[docs] class DeploymentFlags(ConfigBaseModel): allowed_resources: List[KRN] = []
[docs] class Flags(ConfigBaseModel): spec_version: VersionStr enable_runtime_update: RuntimeUpdateFlags = RuntimeUpdateFlags() deployment: DeploymentFlags = DeploymentFlags() resources_required: Optional[bool] = None
[docs] class IOWay(str, Enum): output = "output" input_cc = "input-cc" input_cc_output = "input-cc+output" input = "input" output_cc = "output-cc" input_output_cc = "input+output-cc"
[docs] class DQWay(str, Enum): output = "output" input = "input"
[docs] class IODefinition(ConfigBaseModel): name: str data_type: str unit: Optional[str] way: IOWay = IOWay.output storage: IOStorage = IOStorage.node_and_cloud
[docs] class DynamicIoOwnership(str, Enum): both = "both" owned = "owned" remote = "remote"
[docs] class DynamicIoType(str, Enum): both = "both" data = "data" control = "control"
[docs] class DynamicIODataTypes(ConfigBaseModel): name: str
[docs] class DynamicIODefinition(ConfigBaseModel): type_name: str ownership: DynamicIoOwnership = DynamicIoOwnership.both type: DynamicIoType = DynamicIoType.both data_types: List[str] = []
[docs] class CustomActionWay(str, Enum): input_ca = "input-ca" output_ca = "output-ca"
[docs] class ManifCustomAction(ConfigBaseModel): type: CustomActionTypeStr way: CustomActionWay
ParameterDataTypes = Literal[PrimitiveTypes.number, PrimitiveTypes.string, PrimitiveTypes.boolean] # type: ignore
[docs] class ParamDefinition(ConfigBaseModel): name: str title: Optional[str] = None data_type: Optional[ParameterDataTypes] = None default: Optional[ParameterType] = None
[docs] class IOSchema(ConfigBaseModel): type_name: str io_schema: dict = Field(default_factory=dict, alias="schema")
[docs] class SchemasDefinition(ConfigBaseModel): parameters: dict = {} configuration: dict = {} io_configurations: List[IOSchema] = []
[docs] class DeploymentType(str, Enum): standard = "standard" staged_instant_apply = "staged+instant-apply" staged_only = "staged-only"
[docs] class ClusterDefinition(ConfigBaseModel): name: str
[docs] class DeploymentTargetDefaults(ConfigBaseModel): type: Optional[str] = None cluster: Optional[ClusterDefinition] = None
[docs] class DeploymentDefaults(ConfigBaseModel): max_resources: Optional[int] = None deployment_type: Optional[DeploymentType] = None target: Optional[DeploymentTargetDefaults] = None
[docs] class IODatastreamMapping(ConfigBaseModel): io: str datastream: str
[docs] class AppDefaults(ConfigBaseModel): configuration: Dict[str, Any] = {} io_datastream_mapping: Optional[List[IODatastreamMapping]] = None
[docs] class DefaultsDefinition(ConfigBaseModel): deployment: Optional[DeploymentDefaults] = None app: Optional[AppDefaults] = None system: Optional[dict] = None
[docs] class ManifCustomDataQuality(ConfigBaseModel): name: str data_type: str way: DQWay datastreams: List[str] = []
[docs] class AppManifest(AppBaseConfig): flags: Optional[Flags] = None io: List[IODefinition] = [] dynamic_io: List[DynamicIODefinition] = [] parameters: List[ParamDefinition] = [] schemas: Optional[SchemasDefinition] = None defaults: Optional[DefaultsDefinition] = None custom_actions: List[ManifCustomAction] = [] data_quality: List[ManifCustomDataQuality] = []