Source code for kelvin.config.manifest
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Literal, Optional
from pydantic import Field
from kelvin.config.common import AppBaseConfig, ConfigBaseModel, CustomActionTypeStr, VersionStr
from kelvin.krn import KRN
from kelvin.message import ParameterType
from kelvin.message.msg_type import PrimitiveTypes
from kelvin.message.runtime_manifest import IOStorage
[docs]
class RuntimeUpdateFlags(ConfigBaseModel):
io: bool = False
configuration: bool = False
resource_parameters: bool = True
resource_properties: bool = True
[docs]
class DeploymentFlags(ConfigBaseModel):
allowed_resources: List[KRN] = []
[docs]
class Flags(ConfigBaseModel):
spec_version: VersionStr
enable_runtime_update: RuntimeUpdateFlags = RuntimeUpdateFlags()
deployment: DeploymentFlags = DeploymentFlags()
resources_required: Optional[bool] = None
[docs]
class IOWay(str, Enum):
output = "output"
input_cc = "input-cc"
input_cc_output = "input-cc+output"
input = "input"
output_cc = "output-cc"
input_output_cc = "input+output-cc"
[docs]
class DQWay(str, Enum):
output = "output"
input = "input"
[docs]
class IODefinition(ConfigBaseModel):
name: str
data_type: str
unit: Optional[str]
way: IOWay = IOWay.output
storage: IOStorage = IOStorage.node_and_cloud
[docs]
class DynamicIoOwnership(str, Enum):
both = "both"
owned = "owned"
remote = "remote"
[docs]
class DynamicIoType(str, Enum):
both = "both"
data = "data"
control = "control"
[docs]
class DynamicIODataTypes(ConfigBaseModel):
name: str
[docs]
class DynamicIODefinition(ConfigBaseModel):
type_name: str
ownership: DynamicIoOwnership = DynamicIoOwnership.both
type: DynamicIoType = DynamicIoType.both
data_types: List[str] = []
[docs]
class CustomActionWay(str, Enum):
input_ca = "input-ca"
output_ca = "output-ca"
[docs]
class ManifCustomAction(ConfigBaseModel):
type: CustomActionTypeStr
way: CustomActionWay
ParameterDataTypes = Literal[PrimitiveTypes.number, PrimitiveTypes.string, PrimitiveTypes.boolean] # type: ignore
[docs]
class ParamDefinition(ConfigBaseModel):
name: str
title: Optional[str] = None
data_type: Optional[ParameterDataTypes] = None
default: Optional[ParameterType] = None
[docs]
class IOSchema(ConfigBaseModel):
type_name: str
io_schema: dict = Field(default_factory=dict, alias="schema")
[docs]
class SchemasDefinition(ConfigBaseModel):
parameters: dict = {}
configuration: dict = {}
io_configurations: List[IOSchema] = []
[docs]
class DeploymentType(str, Enum):
standard = "standard"
staged_instant_apply = "staged+instant-apply"
staged_only = "staged-only"
[docs]
class ClusterDefinition(ConfigBaseModel):
name: str
[docs]
class DeploymentTargetDefaults(ConfigBaseModel):
type: Optional[str] = None
cluster: Optional[ClusterDefinition] = None
[docs]
class DeploymentDefaults(ConfigBaseModel):
max_resources: Optional[int] = None
deployment_type: Optional[DeploymentType] = None
target: Optional[DeploymentTargetDefaults] = None
[docs]
class IODatastreamMapping(ConfigBaseModel):
io: str
datastream: str
[docs]
class AppDefaults(ConfigBaseModel):
configuration: Dict[str, Any] = {}
io_datastream_mapping: Optional[List[IODatastreamMapping]] = None
[docs]
class DefaultsDefinition(ConfigBaseModel):
deployment: Optional[DeploymentDefaults] = None
app: Optional[AppDefaults] = None
system: Optional[dict] = None
[docs]
class ManifCustomDataQuality(ConfigBaseModel):
name: str
data_type: str
way: DQWay
datastreams: List[str] = []
[docs]
class AppManifest(AppBaseConfig):
flags: Optional[Flags] = None
io: List[IODefinition] = []
dynamic_io: List[DynamicIODefinition] = []
parameters: List[ParamDefinition] = []
schemas: Optional[SchemasDefinition] = None
defaults: Optional[DefaultsDefinition] = None
custom_actions: List[ManifCustomAction] = []
data_quality: List[ManifCustomDataQuality] = []