from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from uuid import UUID, uuid4
from pydantic.dataclasses import dataclass
from pydantic.fields import Field
from kelvin.krn import KRN, KRNAppVersion, KRNAsset, KRNAssetDataStream, KRNAssetParameter
from kelvin.message import Message, ParameterType
from kelvin.message.base_messages import (
ControlChangeAck,
ControlChangeAckPayload,
ControlChangeMsg,
ControlChangePayload,
CustomActionMsg,
CustomActionPayload,
CustomActionResultMsg,
CustomActionResultPayload,
DataTagMsg,
DataTagPayload,
EdgeParameter,
ParametersMsg,
ParametersPayload,
RecommendationActions,
RecommendationControlChange,
RecommendationCustomAction,
RecommendationMsg,
RecommendationPayload,
ReportedValues,
ResourceParameters,
StateEnum,
ValuePoint,
)
from kelvin.message.evidences import BaseEvidence, Evidence
from kelvin.message.msg_type import KMessageTypeAction
TMessage = TypeVar("TMessage", bound=Message)
TBuilder = TypeVar("TBuilder", bound="MessageBuilder")
class MessageBuilder(Generic[TMessage]):
_msg: Optional[TMessage] = Field(default=None, init=False, repr=False)
def to_message(self) -> TMessage:
return NotImplemented
@classmethod
def from_message(cls: Type[TBuilder], msg: TMessage) -> TBuilder:
return NotImplemented
[docs]
@dataclass
class ControlChange(MessageBuilder[ControlChangeMsg]):
"""Control change builder
Use this helper class to build a Kelvin Control change.
Args:
resource (KRN): The kelvin resource targeted by the control change, represented by a KRN (usually
KRNAssetDataStream)
expiration_date (datetime | timedelta): The absolute time in the future when the Control Change expires. Provide
either a absolute datetime or a timedelta from now
payload (bool, int, float, str): The desired target value for the control change
retries (int): Optional number of retries
timeout (int): Optional timeout time (for retries)
control_change_id (UUID): Optional UUID to set an specific ID for the control change
from_value (ValuePoint): Optional initial value for the control change
trace_id (str): Optional trace ID for the control change
"""
resource: KRN
expiration_date: Union[datetime, timedelta]
payload: Any
retries: Optional[int] = None
timeout: Optional[int] = None
control_change_id: Optional[UUID] = None
from_value: Optional[ValuePoint] = None
trace_id: Optional[str] = None
[docs]
def to_message(self) -> ControlChangeMsg:
if isinstance(self.expiration_date, datetime):
expiration = self.expiration_date
else:
expiration = datetime.now() + self.expiration_date
return ControlChangeMsg(
id=self.control_change_id if self.control_change_id is not None else uuid4(),
trace_id=self.trace_id,
resource=self.resource,
payload=ControlChangePayload(
timeout=self.timeout,
retries=self.retries,
expiration_date=expiration,
payload=self.payload,
from_value=self.from_value,
),
)
[docs]
@classmethod
def from_message(cls, msg: ControlChangeMsg) -> ControlChange:
obj = cls(
resource=msg.resource, # type: ignore
expiration_date=msg.payload.expiration_date,
payload=msg.payload.payload,
retries=msg.payload.retries,
timeout=msg.payload.timeout,
control_change_id=msg.id,
from_value=msg.payload.from_value,
trace_id=msg.trace_id,
)
obj._msg = msg
return obj
[docs]
@dataclass
class Recommendation(MessageBuilder[RecommendationMsg]):
"""Recommendation Builder. Use this helper class to build a Kelvin Recommendation.
Args:
resource (KRNAsset): The kelvin asset resource targeted by the recommendation
type (str): the type of the recommendation, chose one from the available on the kelvin platform (eg generic,
speed_inc, speed_dec, ...)
expiration_date (datetime | timedelta): The absolute time in the future when the recommendation expires. Provide
either a absolute datetime or a timedelta from now
description (str): An optional description for the recommendation
confidence (int): Optional confidence of the recommendation (from 1 to 4)
control_changes (List[ControlChanges]): the list of ControlChanges associated with the recommendation
metadata: (Dict[str, Any]): Optional metadata for the recommendation
auto_accepted (bool): Sets the Recommendation as auto accepted. Default is False
evidences (List[Evidence]): List of evidences associated with the recommendation
custom_identifier (str): Optional custom identifier for the recommendation
actions (List[CustomAction]): List of custom actions associated with the recommendation
trace_id (str): Optional trace ID for the recommendation
"""
resource: KRNAsset
type: str
expiration_date: Optional[Union[datetime, timedelta]] = None
description: Optional[str] = None
confidence: Optional[int] = Field(default=None, ge=1, le=4)
control_changes: List[ControlChange] = Field(default_factory=list)
metadata: Optional[Dict] = None
auto_accepted: bool = False
evidences: List[Evidence] = Field(default_factory=list)
custom_identifier: Optional[str] = None
actions: List[CustomAction] = Field(default_factory=list)
trace_id: Optional[str] = None
[docs]
def to_message(self) -> RecommendationMsg:
now = datetime.now()
if self.expiration_date is None:
rec_expiration_date = None
elif isinstance(self.expiration_date, datetime):
rec_expiration_date = self.expiration_date
else:
rec_expiration_date = now + self.expiration_date
ccs = [
RecommendationControlChange(
retry=cc.retries,
timeout=cc.timeout,
expiration_date=(
cc.expiration_date if isinstance(cc.expiration_date, datetime) else now + cc.expiration_date
),
payload=cc.payload,
resource=cc.resource,
control_change_id=cc.control_change_id,
from_value=cc.from_value,
trace_id=cc.trace_id,
)
for cc in self.control_changes
]
actions = [
RecommendationCustomAction(
type=action.type,
resource=action.resource,
trace_id=action.trace_id,
title=action.title,
description=action.description,
expiration_date=(
action.expiration_date
if isinstance(action.expiration_date, datetime)
else now + action.expiration_date
),
payload=action.payload,
)
for action in self.actions
]
return RecommendationMsg(
resource=self.resource,
trace_id=self.trace_id,
payload=RecommendationPayload(
resource=self.resource,
type=self.type,
description=self.description,
expiration_date=rec_expiration_date,
confidence=self.confidence,
actions=RecommendationActions(control_changes=ccs, custom_actions=actions),
metadata=self.metadata,
state="auto_accepted" if self.auto_accepted is True else None,
evidences=[BaseEvidence(type=ev._TYPE, payload=ev) for ev in self.evidences],
custom_identifier=self.custom_identifier,
trace_id=self.trace_id,
),
)
[docs]
@dataclass
class AppParameter:
"""App Parameter Helper.
Args:
resource (KRNAssetParameter): Kelvin Resource name for the target Asset Parameter
value (Union[bool, int, float, string]): parameter value
comment (Optional[str]): optional comment for parameter change
"""
resource: KRNAssetParameter
value: ParameterType
comment: Optional[str] = None
[docs]
class AssetParameter(AppParameter):
"""[Deprecated] Use AppParameter instead
Asset Parameter Helper.
Args:
resource (KRNAssetParameter): Kelvin Resource name for the target Asset Parameter
value (Union[bool, int, float, string]): parameter value
comment (Optional[str]): optional comment for parameter change
"""
[docs]
@dataclass
class AppParameters(MessageBuilder):
"""Parameters Builder. Set application parameters in bulk.
Args:
resource (Optional[KRNAppVersion]): Optional Kelvin Resource name for the target App Version.
Defaults to current app.
parameters (List[AssetParameters]): list of single asset parameters
"""
parameters: List[AppParameter]
resource: Optional[KRNAppVersion] = None
[docs]
def to_message(self) -> ParametersMsg:
asset_params: Dict[str, List[EdgeParameter]] = {}
for asset_param in self.parameters:
asset_params.setdefault(asset_param.resource.asset, []).append(
EdgeParameter(name=asset_param.resource.parameter, value=asset_param.value, comment=asset_param.comment)
)
param_models = [
ResourceParameters(resource=KRNAsset(asset), parameters=params) for asset, params in asset_params.items()
]
return ParametersMsg(resource=self.resource, payload=ParametersPayload(resource_parameters=param_models))
[docs]
class AssetParameters(AppParameters):
"""[Deprecated] Use AppParameters instead
Parameters Builder. Set application parameters in bulk.
Args:
resource (Optional[KRNAppVersion]): Optional Kelvin Resource name for the target App Version.
Defaults to current app.
parameters (List[AssetParameters]): list of single asset parameters
"""
[docs]
@dataclass
class DataTag(MessageBuilder):
"""Data Tag.
Args:
start_date (datetime): The start date of the data tag.
tag_name (str): The name of the data tag.
resource (KRNAsset): The asset resource associated with the data tag.
end_date (Optional[datetime]): The end date of the data tag. If not specified, the data tag is
considered one point in time, the start_date.
contexts (Optional[List[KRN]]): The list of contexts associated with the data tag.
description (Optional[str]): The description of the data tag. Truncated to 256 characters.
"""
start_date: datetime
tag_name: str
resource: KRNAsset
end_date: Optional[datetime] = None
contexts: Optional[List[KRN]] = None
description: Optional[str] = None
[docs]
def to_message(self) -> DataTagMsg:
end_date = self.end_date or self.start_date
description = self.description[:256] if self.description else None
return DataTagMsg(
resource=self.resource,
payload=DataTagPayload(
start_date=self.start_date,
end_date=end_date,
tag_name=self.tag_name,
resource=self.resource,
description=description,
contexts=self.contexts,
),
)
[docs]
@dataclass
class ControlAck(MessageBuilder):
"""Control Change Ack
Args:
resource (KRNAssetDataStream): The resource associated with the control change
state (StateEnum): The state of the control change
message (Optional[str]): Optional message
before (Optional[ValuePoint]): Optional value point before the control change
after (Optional[ValuePoint]): Optional value point after the control change
metadata (Optional[Dict]): Optional metadata for the control change ack
"""
resource: KRNAssetDataStream
state: StateEnum
message: Optional[str] = None
before: Optional[ValuePoint] = None
after: Optional[ValuePoint] = None
metadata: Optional[Dict] = None
[docs]
def to_message(self) -> ControlChangeAck:
return ControlChangeAck(
resource=self.resource,
payload=ControlChangeAckPayload(
state=self.state,
message=self.message,
reported=ReportedValues(before=self.before, after=self.after) if self.before or self.after else None,
metadata=self.metadata,
),
)
[docs]
@dataclass
class CustomAction(MessageBuilder[CustomActionMsg]):
"""Custom Action Builder. Use this helper class to build a Kelvin custom action.
Args:
resource (KRN): The kelvin resource targeted by the action, represented by a KRN (usually KRNAssetData)
type (str): The type of the action
title (str): The title of the action
description (Optional[str]): Optional description for the action
expiration_date (Union[datetime, timedelta]): Expiration date for the action.
Provide either a absolute datetime or a timedelta from now
payload (Dict): Optional payload for the action
trace_id (Optional[str]): Optional trace ID for the action
custom_action_id (UUID): Optional UUID to set an specific ID for the control change
"""
resource: KRN
type: str
title: str
expiration_date: Union[datetime, timedelta]
description: Optional[str] = None
payload: Dict = Field(default_factory=dict)
trace_id: Optional[str] = None
custom_action_id: Optional[UUID] = None
[docs]
def to_message(self) -> CustomActionMsg:
if isinstance(self.expiration_date, timedelta):
expiration = datetime.now() + self.expiration_date
else:
expiration = self.expiration_date
return CustomActionMsg(
id=self.custom_action_id if self.custom_action_id is not None else uuid4(),
type=KMessageTypeAction(action_type=self.type),
trace_id=self.trace_id,
resource=self.resource,
payload=CustomActionPayload(
title=self.title,
description=self.description,
expiration_date=expiration,
payload=self.payload,
),
)
[docs]
@classmethod
def from_message(cls, msg: CustomActionMsg) -> CustomAction:
if not isinstance(msg.type, KMessageTypeAction):
raise ValueError(f"Invalid message type: {msg.type}")
obj = cls(
resource=msg.resource, # type: ignore
type=msg.type.type,
title=msg.payload.title,
description=msg.payload.description,
expiration_date=msg.payload.expiration_date,
payload=msg.payload.payload,
trace_id=msg.trace_id,
custom_action_id=msg.id,
)
obj._msg = msg
return obj
[docs]
def result(
self, success: bool, message: Optional[str] = None, metadata: Optional[Dict] = None
) -> CustomActionResult:
"""Create a result for the custom action.
Args:
success (bool): Whether the action was successful
message (Optional[str]): Optional message
metadata (Optional[Dict]): Optional metadata for the action ack
Returns:
CustomActionAck: The custom action ack
"""
if self._msg is None:
raise ValueError("Can't build CustomActionAck, internal _msg not set")
return CustomActionResult(
resource=self.resource,
action_id=self._msg.id,
success=success,
message=message,
metadata=metadata,
trace_id=self.trace_id,
)
[docs]
@dataclass
class CustomActionResult(MessageBuilder[CustomActionResultMsg]):
"""Custom Action Result Builder. Use this helper class to build a Kelvin custom action ack.
Args:
resource (KRN): The kelvin resource targeted by the action, represented by a KRN (usually
KRNAssetDataStream)
action_id (UUID): The ID of the action
success (bool): Whether the action was successful
message (Optional[str]): Optional message
metadata: Dict[str, Any]: Optional metadata for the action ack
trace_id (Optional[str]): Optional trace ID for the action
"""
resource: KRN
action_id: UUID
success: bool
message: Optional[str] = None
metadata: Optional[Dict] = None
trace_id: Optional[str] = None
[docs]
def to_message(self) -> CustomActionResultMsg:
return CustomActionResultMsg(
resource=self.resource,
trace_id=self.trace_id,
payload=CustomActionResultPayload(
id=self.action_id,
success=self.success,
message=self.message,
metadata=self.metadata,
),
)
[docs]
@classmethod
def from_message(cls, msg: CustomActionResultMsg) -> CustomActionResult:
obj = cls(
resource=msg.resource, # type: ignore
action_id=msg.payload.id,
success=msg.payload.success,
message=msg.payload.message,
metadata=msg.payload.metadata,
trace_id=msg.trace_id,
)
obj._msg = msg
return obj
BUILDER_REGISTRY: Dict[Type[Message], Type[MessageBuilder]] = {
ControlChangeMsg: ControlChange,
CustomActionMsg: CustomAction,
CustomActionResultMsg: CustomActionResult,
}
def convert_message(msg: Message) -> Optional[MessageBuilder]:
"""Convert a message to its builder class.
Args:
msg (Message): The message to convert
Returns:
Optional[MessageBuilder]: The converted message builder
"""
if msg.__class__ in BUILDER_REGISTRY:
return BUILDER_REGISTRY[msg.__class__].from_message(msg)
return None