from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
from pydantic import BaseModel, Field, StrictBool, StrictFloat, StrictInt, StrictStr, field_serializer
from typing_extensions import Literal, TypeAlias
from kelvin.krn import KRN, KRNAsset
from kelvin.message.evidences import BaseEvidence
from kelvin.message.message import Message
from kelvin.message.msg_type import (
KMessageTypeAction,
KMessageTypeActionAck,
KMessageTypeControl,
KMessageTypeControlAck,
KMessageTypeControlStatus,
KMessageTypeData,
KMessageTypeDataTag,
KMessageTypeParameters,
KMessageTypeRecommendation,
)
from kelvin.message.utils import to_rfc3339_timestamp
ParameterType: TypeAlias = Union[StrictBool, StrictInt, StrictFloat, StrictStr]
PropertyType: TypeAlias = Union[
StrictBool, StrictInt, StrictFloat, StrictStr, list[StrictBool], list[StrictInt], list[StrictFloat], list[StrictStr]
]
[docs]
class ValuePoint(BaseModel):
value: Any
timestamp: datetime
source: Optional[str] = None
[docs]
@field_serializer("timestamp")
def serialize_timestamp(self, ts: datetime) -> str:
return to_rfc3339_timestamp(ts)
[docs]
class ControlChangePayload(BaseModel):
model_config = {"populate_by_name": True}
timeout: Optional[int] = Field(None, description="Timeout for retries")
retries: Optional[int] = Field(None, description="Max retries")
expiration_date: datetime = Field(description="Absolute expiration Date in UTC")
payload: Any = Field(None, description="Control Change payload")
from_value: Optional[ValuePoint] = Field(
None, description="Optional value of the datastream at the moment of the creation", alias="from"
)
[docs]
@field_serializer("expiration_date")
def serialize_timestamp(self, ts: datetime) -> str:
return to_rfc3339_timestamp(ts)
[docs]
class ControlChangeMsg(Message):
"""Generic Control Change Message"""
TYPE_ = KMessageTypeControl()
type: KMessageTypeControl = KMessageTypeControl()
payload: ControlChangePayload
[docs]
class StateEnum(str, Enum):
ready = "ready"
sent = "sent"
failed = "failed"
processed = "processed"
applied = "applied"
rejected = "rejected"
class ReportedValues(BaseModel):
before: Optional[ValuePoint] = None
after: Optional[ValuePoint] = None
class ControlChangeStatusPayload(BaseModel):
state: StateEnum
message: Optional[str] = None
reported: Optional[ReportedValues] = None
# control_change_id required, kept optional for backward compatibility
control_change_id: Optional[UUID] = None
metadata: Optional[Dict] = None
[docs]
class ControlChangeStatus(Message):
"""Control Change Status"""
TYPE_ = KMessageTypeControlStatus()
type: KMessageTypeControlStatus = KMessageTypeControlStatus()
payload: ControlChangeStatusPayload
[docs]
class ControlChangeAckPayload(ControlChangeStatusPayload):
"""Control Change Ack Payload"""
[docs]
class ControlChangeAck(Message):
"""Control Change Ack"""
TYPE_ = KMessageTypeControlAck()
type: KMessageTypeControlAck = KMessageTypeControlAck()
payload: ControlChangeAckPayload
class SensorDataPayload(BaseModel):
data: List[float] = Field(..., description="Array of sensor measurements.", min_length=1)
sample_rate: float = Field(..., description="Sensor sample-rate in Hertz.", gt=0.0)
class SensorDataMsg(Message):
"""Sensor data."""
TYPE_ = KMessageTypeData("object", "kelvin.sensor_data")
type: KMessageTypeData = KMessageTypeData("object", "kelvin.sensor_data")
payload: SensorDataPayload
class EdgeParameter(BaseModel):
name: str
value: ParameterType
comment: Optional[str] = None
class ResourceParameters(BaseModel):
resource: KRN
parameters: List[EdgeParameter]
class ParametersPayload(BaseModel):
source: Optional[KRN] = None
resource_parameters: List[ResourceParameters]
class ParametersMsg(Message):
TYPE_ = KMessageTypeParameters()
type: KMessageTypeParameters = KMessageTypeParameters()
payload: ParametersPayload
class DataTagPayload(BaseModel):
start_date: datetime
end_date: datetime
tag_name: str
resource: KRNAsset
description: Optional[str] = Field(None, max_length=256)
contexts: Optional[List[KRN]] = None
source: Optional[KRN] = None
@field_serializer("start_date", "end_date")
def serialize_timestamp(self, ts: datetime) -> str:
return to_rfc3339_timestamp(ts)
[docs]
class DataTagMsg(Message):
TYPE_ = KMessageTypeDataTag()
type: KMessageTypeDataTag = KMessageTypeDataTag()
resource: KRNAsset
payload: DataTagPayload
class CustomActionPayload(BaseModel):
title: str
expiration_date: datetime
description: Optional[str] = None
payload: Dict = {}
@field_serializer("expiration_date")
def serialize_timestamp(self, ts: datetime) -> str:
return to_rfc3339_timestamp(ts)
[docs]
class CustomActionMsg(Message):
TYPE_ = KMessageTypeAction()
type: KMessageTypeAction = KMessageTypeAction()
payload: CustomActionPayload
class CustomActionResultPayload(BaseModel):
id: UUID
success: bool
message: Optional[str] = None
metadata: Optional[Dict] = None
[docs]
class CustomActionResultMsg(Message):
TYPE_ = KMessageTypeActionAck()
type: KMessageTypeActionAck = KMessageTypeActionAck()
payload: CustomActionResultPayload
[docs]
class RecommendationControlChange(ControlChangePayload):
retries: Optional[int] = Field(None, description="Max retries", alias="retry")
state: Optional[str] = None
resource: Optional[KRN] = None
control_change_id: Optional[UUID] = Field(None, description="Control Change ID")
trace_id: Optional[str] = None
class RecommendationCustomAction(CustomActionPayload):
type: str
resource: KRN
custom_action_id: Optional[UUID] = None
trace_id: Optional[str] = None
[docs]
class RecommendationActions(BaseModel):
control_changes: List[RecommendationControlChange] = []
custom_actions: List[RecommendationCustomAction] = []
class RecommendationPayload(BaseModel):
source: Optional[KRN] = None
resource: KRN
type: str
description: Optional[str] = None
confidence: Optional[int] = Field(None, ge=1, le=4)
expiration_date: Optional[datetime] = None
actions: RecommendationActions = RecommendationActions()
metadata: Optional[Dict] = None
state: Optional[Literal["pending", "auto_accepted"]] = None
evidences: List[BaseEvidence] = []
custom_identifier: Optional[str] = None
trace_id: Optional[str] = None
@field_serializer("expiration_date")
def serialize_timestamp(self, ts: datetime) -> str:
return to_rfc3339_timestamp(ts)
[docs]
class RecommendationMsg(Message):
TYPE_ = KMessageTypeRecommendation()
type: KMessageTypeRecommendation = KMessageTypeRecommendation()
payload: RecommendationPayload