from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Any, ClassVar, Optional, Union
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field, StrictBool, StrictFloat, StrictInt, StrictStr, field_serializer
from typing_extensions import Literal, TypeAlias
from kelvin.krn import KRN, KRNAsset, KRNDatastream
from kelvin.message.evidences import BaseEvidence
from kelvin.message.message import Message
from kelvin.message.msg_type import (
KMessageTypeAction,
KMessageTypeActionAck,
KMessageTypeControl,
KMessageTypeControlAck,
KMessageTypeControlStatus,
KMessageTypeData,
KMessageTypeDataTag,
KMessageTypeParameters,
KMessageTypeRecommendation,
)
from kelvin.message.utils import to_rfc3339_timestamp
ParameterType: TypeAlias = Union[StrictBool, StrictInt, StrictFloat, StrictStr]
PropertyType: TypeAlias = Union[
StrictBool, StrictInt, StrictFloat, StrictStr, list[StrictBool], list[StrictInt], list[StrictFloat], list[StrictStr]
]
[docs]
class ControlChangeQueueing(str, Enum):
"""Control change queueing mode.
- `ENQUEUE`: default, add new CCs to the queue.
- `NEXT`: marks all CCs in queue as superseded and
enters the queue to be executed next.
- `IMMEDIATE`: marks all CCs in queue as
superseded, marks the current CC in progress as
interrupted and is immediately sent to the Bridge.
"""
ENQUEUE = "enqueue"
NEXT = "next"
IMMEDIATE = "immediate"
[docs]
class ValuePoint(BaseModel):
value: Any
timestamp: datetime
source: Optional[str] = None
[docs]
@field_serializer("timestamp")
def serialize_timestamp(self, ts: datetime) -> str:
return to_rfc3339_timestamp(ts)
class QueueingMode(BaseModel):
mode: ControlChangeQueueing = Field(
ControlChangeQueueing.ENQUEUE, description="Queueing strategy for the control change"
)
class BaseControlChangeFields(BaseModel):
"""Shared fields for control change payloads.
Both :class:`ControlChangePayload` (standalone) and
:class:`RecommendationControlChange` derive from this base. Fields that
differ in optionality or aliasing between the two are defined on the
concrete subclasses so that Liskov Substitution is preserved.
"""
model_config: ClassVar[ConfigDict] = ConfigDict(populate_by_name=True)
timeout: Optional[int] = Field(None, description="Timeout for retries")
payload: Any = Field(None, description="Control Change payload")
from_value: Optional[ValuePoint] = Field(
None, description="Optional value of the datastream at the moment of the creation", alias="from"
)
queueing: Optional[QueueingMode] = Field(None, description="Queueing mode for the control change")
[docs]
class ControlChangePayload(BaseControlChangeFields):
retries: Optional[int] = Field(None, description="Max retries")
expiration_date: datetime = Field(description="Absolute expiration Date in UTC")
[docs]
@field_serializer("expiration_date")
def serialize_timestamp(self, ts: datetime) -> str:
return to_rfc3339_timestamp(ts)
[docs]
class ControlChangeMsg(Message):
"""Generic Control Change Message"""
TYPE_: ClassVar[KMessageTypeControl] = KMessageTypeControl() # pyright: ignore[reportIncompatibleVariableOverride]
type: KMessageTypeControl = KMessageTypeControl() # pyright: ignore[reportIncompatibleVariableOverride]
payload: ControlChangePayload # pyright: ignore[reportGeneralTypeIssues]
[docs]
class StateEnum(str, Enum):
ready = "ready"
sent = "sent"
failed = "failed"
processed = "processed"
applied = "applied"
rejected = "rejected"
class ReportedValues(BaseModel):
before: Optional[ValuePoint] = None
after: Optional[ValuePoint] = None
class ControlChangeStatusPayload(BaseModel):
state: StateEnum
message: Optional[str] = None
reported: Optional[ReportedValues] = None
# control_change_id required, kept optional for backward compatibility
control_change_id: Optional[UUID] = None
metadata: Optional[dict[str, Any]] = None
[docs]
class ControlChangeStatus(Message):
"""Control Change Status"""
TYPE_: ClassVar[KMessageTypeControlStatus] = KMessageTypeControlStatus() # pyright: ignore[reportIncompatibleVariableOverride]
type: KMessageTypeControlStatus = KMessageTypeControlStatus() # pyright: ignore[reportIncompatibleVariableOverride]
payload: ControlChangeStatusPayload # pyright: ignore[reportGeneralTypeIssues]
[docs]
class ControlChangeAckPayload(ControlChangeStatusPayload):
"""Control Change Ack Payload"""
[docs]
class ControlChangeAck(Message):
"""Control Change Ack"""
TYPE_: ClassVar[KMessageTypeControlAck] = KMessageTypeControlAck() # pyright: ignore[reportIncompatibleVariableOverride]
type: KMessageTypeControlAck = KMessageTypeControlAck() # pyright: ignore[reportIncompatibleVariableOverride]
payload: ControlChangeAckPayload # pyright: ignore[reportGeneralTypeIssues]
class SensorDataPayload(BaseModel):
data: list[float] = Field(description="Array of sensor measurements.", min_length=1)
sample_rate: float = Field(description="Sensor sample-rate in Hertz.", gt=0.0)
class SensorDataMsg(Message):
"""Sensor data."""
TYPE_: ClassVar[KMessageTypeData] = KMessageTypeData("object", "kelvin.sensor_data") # pyright: ignore[reportIncompatibleVariableOverride]
type: KMessageTypeData = KMessageTypeData("object", "kelvin.sensor_data") # pyright: ignore[reportIncompatibleVariableOverride]
payload: SensorDataPayload # pyright: ignore[reportGeneralTypeIssues]
class EdgeParameter(BaseModel):
name: str
value: ParameterType
comment: Optional[str] = None
class ResourceParameters(BaseModel):
resource: KRN
parameters: list[EdgeParameter]
class ParametersPayload(BaseModel):
source: Optional[KRN] = None
resource_parameters: list[ResourceParameters]
class ParametersMsg(Message):
TYPE_: ClassVar[KMessageTypeParameters] = KMessageTypeParameters() # pyright: ignore[reportIncompatibleVariableOverride]
type: KMessageTypeParameters = KMessageTypeParameters() # pyright: ignore[reportIncompatibleVariableOverride]
payload: ParametersPayload # pyright: ignore[reportGeneralTypeIssues]
class DataTagPayload(BaseModel):
start_date: datetime
end_date: datetime
tag_name: str
resource: KRNAsset
description: Optional[str] = Field(None, max_length=256)
contexts: Optional[list[KRNDatastream]] = None
source: Optional[KRN] = None
@field_serializer("start_date", "end_date")
def serialize_timestamp(self, ts: datetime) -> str:
return to_rfc3339_timestamp(ts)
[docs]
class DataTagMsg(Message):
TYPE_: ClassVar[KMessageTypeDataTag] = KMessageTypeDataTag() # pyright: ignore[reportIncompatibleVariableOverride]
type: KMessageTypeDataTag = KMessageTypeDataTag() # pyright: ignore[reportIncompatibleVariableOverride]
resource: KRNAsset # pyright: ignore[reportIncompatibleVariableOverride, reportGeneralTypeIssues]
payload: DataTagPayload # pyright: ignore[reportGeneralTypeIssues]
class CustomActionPayload(BaseModel):
title: str
expiration_date: datetime
description: Optional[str] = None
payload: dict[str, Any] = Field(default_factory=dict)
@field_serializer("expiration_date")
def serialize_timestamp(self, ts: datetime) -> str:
return to_rfc3339_timestamp(ts)
[docs]
class CustomActionMsg(Message):
TYPE_: ClassVar[KMessageTypeAction] = KMessageTypeAction() # pyright: ignore[reportIncompatibleVariableOverride]
type: KMessageTypeAction = KMessageTypeAction() # pyright: ignore[reportIncompatibleVariableOverride]
payload: CustomActionPayload # pyright: ignore[reportGeneralTypeIssues]
class CustomActionResultPayload(BaseModel):
id: UUID
success: bool
message: Optional[str] = None
metadata: Optional[dict[str, Any]] = None
[docs]
class CustomActionResultMsg(Message):
TYPE_: ClassVar[KMessageTypeActionAck] = KMessageTypeActionAck() # pyright: ignore[reportIncompatibleVariableOverride]
type: KMessageTypeActionAck = KMessageTypeActionAck() # pyright: ignore[reportIncompatibleVariableOverride]
payload: CustomActionResultPayload # pyright: ignore[reportGeneralTypeIssues]
[docs]
class RecommendationControlChange(BaseControlChangeFields):
retries: Optional[int] = Field(None, description="Max retries", alias="retry")
expiration_date: Optional[datetime] = Field(None, description="Absolute expiration Date in UTC")
expiration_delta: Optional[float] = Field(None, description="Expiration delta in seconds", gt=0)
state: Optional[str] = None
resource: Optional[KRN] = None
control_change_id: Optional[UUID] = Field(None, description="Control Change ID")
trace_id: Optional[str] = None
[docs]
@field_serializer("expiration_date")
def serialize_timestamp(self, ts: Optional[datetime]) -> Optional[str]:
if ts is None:
return None
return to_rfc3339_timestamp(ts)
class RecommendationCustomAction(CustomActionPayload):
type: str
resource: KRN
custom_action_id: Optional[UUID] = None
trace_id: Optional[str] = None
[docs]
class RecommendationActions(BaseModel):
control_changes: list[RecommendationControlChange] = Field(default_factory=list)
custom_actions: list[RecommendationCustomAction] = Field(default_factory=list)
class RecommendationPayload(BaseModel):
source: Optional[KRN] = None
resource: KRN
type: str
description: Optional[str] = None
confidence: Optional[int] = Field(None, ge=1, le=4)
expiration_date: Optional[datetime] = None
actions: RecommendationActions = Field(default_factory=RecommendationActions)
metadata: Optional[dict[str, Any]] = None
state: Optional[Literal["pending", "auto_accepted"]] = None
evidences: list[BaseEvidence] = Field(default_factory=list)
custom_identifier: Optional[str] = None
trace_id: Optional[str] = None
@field_serializer("expiration_date")
def serialize_timestamp(self, ts: datetime) -> str:
return to_rfc3339_timestamp(ts)
[docs]
class RecommendationMsg(Message):
TYPE_: ClassVar[KMessageTypeRecommendation] = KMessageTypeRecommendation() # pyright: ignore[reportIncompatibleVariableOverride]
type: KMessageTypeRecommendation = KMessageTypeRecommendation() # pyright: ignore[reportIncompatibleVariableOverride]
payload: RecommendationPayload # pyright: ignore[reportGeneralTypeIssues]