Source code for kelvin.message.base_messages

from __future__ import annotations

from datetime import datetime
from enum import Enum
from typing import Any, ClassVar, Optional, Union
from uuid import UUID

from pydantic import BaseModel, ConfigDict, Field, StrictBool, StrictFloat, StrictInt, StrictStr, field_serializer
from typing_extensions import Literal, TypeAlias

from kelvin.krn import KRN, KRNAsset, KRNDatastream
from kelvin.message.evidences import BaseEvidence
from kelvin.message.message import Message
from kelvin.message.msg_type import (
    KMessageTypeAction,
    KMessageTypeActionAck,
    KMessageTypeControl,
    KMessageTypeControlAck,
    KMessageTypeControlStatus,
    KMessageTypeData,
    KMessageTypeDataTag,
    KMessageTypeParameters,
    KMessageTypeRecommendation,
)
from kelvin.message.utils import to_rfc3339_timestamp

ParameterType: TypeAlias = Union[StrictBool, StrictInt, StrictFloat, StrictStr]
PropertyType: TypeAlias = Union[
    StrictBool, StrictInt, StrictFloat, StrictStr, list[StrictBool], list[StrictInt], list[StrictFloat], list[StrictStr]
]


[docs] class ControlChangeQueueing(str, Enum): """Control change queueing mode. - `ENQUEUE`: default, add new CCs to the queue. - `NEXT`: marks all CCs in queue as superseded and enters the queue to be executed next. - `IMMEDIATE`: marks all CCs in queue as superseded, marks the current CC in progress as interrupted and is immediately sent to the Bridge. """ ENQUEUE = "enqueue" NEXT = "next" IMMEDIATE = "immediate"
[docs] class ValuePoint(BaseModel): value: Any timestamp: datetime source: Optional[str] = None
[docs] @field_serializer("timestamp") def serialize_timestamp(self, ts: datetime) -> str: return to_rfc3339_timestamp(ts)
class QueueingMode(BaseModel): mode: ControlChangeQueueing = Field( ControlChangeQueueing.ENQUEUE, description="Queueing strategy for the control change" ) class BaseControlChangeFields(BaseModel): """Shared fields for control change payloads. Both :class:`ControlChangePayload` (standalone) and :class:`RecommendationControlChange` derive from this base. Fields that differ in optionality or aliasing between the two are defined on the concrete subclasses so that Liskov Substitution is preserved. """ model_config: ClassVar[ConfigDict] = ConfigDict(populate_by_name=True) timeout: Optional[int] = Field(None, description="Timeout for retries") payload: Any = Field(None, description="Control Change payload") from_value: Optional[ValuePoint] = Field( None, description="Optional value of the datastream at the moment of the creation", alias="from" ) queueing: Optional[QueueingMode] = Field(None, description="Queueing mode for the control change")
[docs] class ControlChangePayload(BaseControlChangeFields): retries: Optional[int] = Field(None, description="Max retries") expiration_date: datetime = Field(description="Absolute expiration Date in UTC")
[docs] @field_serializer("expiration_date") def serialize_timestamp(self, ts: datetime) -> str: return to_rfc3339_timestamp(ts)
[docs] class ControlChangeMsg(Message): """Generic Control Change Message""" TYPE_: ClassVar[KMessageTypeControl] = KMessageTypeControl() # pyright: ignore[reportIncompatibleVariableOverride] type: KMessageTypeControl = KMessageTypeControl() # pyright: ignore[reportIncompatibleVariableOverride] payload: ControlChangePayload # pyright: ignore[reportGeneralTypeIssues]
[docs] class StateEnum(str, Enum): ready = "ready" sent = "sent" failed = "failed" processed = "processed" applied = "applied" rejected = "rejected"
class ReportedValues(BaseModel): before: Optional[ValuePoint] = None after: Optional[ValuePoint] = None class ControlChangeStatusPayload(BaseModel): state: StateEnum message: Optional[str] = None reported: Optional[ReportedValues] = None # control_change_id required, kept optional for backward compatibility control_change_id: Optional[UUID] = None metadata: Optional[dict[str, Any]] = None
[docs] class ControlChangeStatus(Message): """Control Change Status""" TYPE_: ClassVar[KMessageTypeControlStatus] = KMessageTypeControlStatus() # pyright: ignore[reportIncompatibleVariableOverride] type: KMessageTypeControlStatus = KMessageTypeControlStatus() # pyright: ignore[reportIncompatibleVariableOverride] payload: ControlChangeStatusPayload # pyright: ignore[reportGeneralTypeIssues]
[docs] class ControlChangeAckPayload(ControlChangeStatusPayload): """Control Change Ack Payload"""
[docs] class ControlChangeAck(Message): """Control Change Ack""" TYPE_: ClassVar[KMessageTypeControlAck] = KMessageTypeControlAck() # pyright: ignore[reportIncompatibleVariableOverride] type: KMessageTypeControlAck = KMessageTypeControlAck() # pyright: ignore[reportIncompatibleVariableOverride] payload: ControlChangeAckPayload # pyright: ignore[reportGeneralTypeIssues]
class SensorDataPayload(BaseModel): data: list[float] = Field(description="Array of sensor measurements.", min_length=1) sample_rate: float = Field(description="Sensor sample-rate in Hertz.", gt=0.0) class SensorDataMsg(Message): """Sensor data.""" TYPE_: ClassVar[KMessageTypeData] = KMessageTypeData("object", "kelvin.sensor_data") # pyright: ignore[reportIncompatibleVariableOverride] type: KMessageTypeData = KMessageTypeData("object", "kelvin.sensor_data") # pyright: ignore[reportIncompatibleVariableOverride] payload: SensorDataPayload # pyright: ignore[reportGeneralTypeIssues] class EdgeParameter(BaseModel): name: str value: ParameterType comment: Optional[str] = None class ResourceParameters(BaseModel): resource: KRN parameters: list[EdgeParameter] class ParametersPayload(BaseModel): source: Optional[KRN] = None resource_parameters: list[ResourceParameters] class ParametersMsg(Message): TYPE_: ClassVar[KMessageTypeParameters] = KMessageTypeParameters() # pyright: ignore[reportIncompatibleVariableOverride] type: KMessageTypeParameters = KMessageTypeParameters() # pyright: ignore[reportIncompatibleVariableOverride] payload: ParametersPayload # pyright: ignore[reportGeneralTypeIssues] class DataTagPayload(BaseModel): start_date: datetime end_date: datetime tag_name: str resource: KRNAsset description: Optional[str] = Field(None, max_length=256) contexts: Optional[list[KRNDatastream]] = None source: Optional[KRN] = None @field_serializer("start_date", "end_date") def serialize_timestamp(self, ts: datetime) -> str: return to_rfc3339_timestamp(ts)
[docs] class DataTagMsg(Message): TYPE_: ClassVar[KMessageTypeDataTag] = KMessageTypeDataTag() # pyright: ignore[reportIncompatibleVariableOverride] type: KMessageTypeDataTag = KMessageTypeDataTag() # pyright: ignore[reportIncompatibleVariableOverride] resource: KRNAsset # pyright: ignore[reportIncompatibleVariableOverride, reportGeneralTypeIssues] payload: DataTagPayload # pyright: ignore[reportGeneralTypeIssues]
class CustomActionPayload(BaseModel): title: str expiration_date: datetime description: Optional[str] = None payload: dict[str, Any] = Field(default_factory=dict) @field_serializer("expiration_date") def serialize_timestamp(self, ts: datetime) -> str: return to_rfc3339_timestamp(ts)
[docs] class CustomActionMsg(Message): TYPE_: ClassVar[KMessageTypeAction] = KMessageTypeAction() # pyright: ignore[reportIncompatibleVariableOverride] type: KMessageTypeAction = KMessageTypeAction() # pyright: ignore[reportIncompatibleVariableOverride] payload: CustomActionPayload # pyright: ignore[reportGeneralTypeIssues]
class CustomActionResultPayload(BaseModel): id: UUID success: bool message: Optional[str] = None metadata: Optional[dict[str, Any]] = None
[docs] class CustomActionResultMsg(Message): TYPE_: ClassVar[KMessageTypeActionAck] = KMessageTypeActionAck() # pyright: ignore[reportIncompatibleVariableOverride] type: KMessageTypeActionAck = KMessageTypeActionAck() # pyright: ignore[reportIncompatibleVariableOverride] payload: CustomActionResultPayload # pyright: ignore[reportGeneralTypeIssues]
[docs] class RecommendationControlChange(BaseControlChangeFields): retries: Optional[int] = Field(None, description="Max retries", alias="retry") expiration_date: Optional[datetime] = Field(None, description="Absolute expiration Date in UTC") expiration_delta: Optional[float] = Field(None, description="Expiration delta in seconds", gt=0) state: Optional[str] = None resource: Optional[KRN] = None control_change_id: Optional[UUID] = Field(None, description="Control Change ID") trace_id: Optional[str] = None
[docs] @field_serializer("expiration_date") def serialize_timestamp(self, ts: Optional[datetime]) -> Optional[str]: if ts is None: return None return to_rfc3339_timestamp(ts)
class RecommendationCustomAction(CustomActionPayload): type: str resource: KRN custom_action_id: Optional[UUID] = None trace_id: Optional[str] = None
[docs] class RecommendationActions(BaseModel): control_changes: list[RecommendationControlChange] = Field(default_factory=list) custom_actions: list[RecommendationCustomAction] = Field(default_factory=list)
class RecommendationPayload(BaseModel): source: Optional[KRN] = None resource: KRN type: str description: Optional[str] = None confidence: Optional[int] = Field(None, ge=1, le=4) expiration_date: Optional[datetime] = None actions: RecommendationActions = Field(default_factory=RecommendationActions) metadata: Optional[dict[str, Any]] = None state: Optional[Literal["pending", "auto_accepted"]] = None evidences: list[BaseEvidence] = Field(default_factory=list) custom_identifier: Optional[str] = None trace_id: Optional[str] = None @field_serializer("expiration_date") def serialize_timestamp(self, ts: datetime) -> str: return to_rfc3339_timestamp(ts)
[docs] class RecommendationMsg(Message): TYPE_: ClassVar[KMessageTypeRecommendation] = KMessageTypeRecommendation() # pyright: ignore[reportIncompatibleVariableOverride] type: KMessageTypeRecommendation = KMessageTypeRecommendation() # pyright: ignore[reportIncompatibleVariableOverride] payload: RecommendationPayload # pyright: ignore[reportGeneralTypeIssues]