Source code for kelvin.message.msg_builders

from __future__ import annotations

from datetime import datetime, timedelta
from typing import Any, Generic, Optional, TypeVar, Union
from uuid import UUID, uuid4

from pydantic.dataclasses import dataclass
from pydantic.fields import Field
from typing_extensions import Annotated, override

from kelvin.krn import KRN, KRNAppVersion, KRNAsset, KRNAssetDataStream, KRNAssetParameter, KRNDatastream
from kelvin.message.base_messages import (
    ControlChangeAck,
    ControlChangeAckPayload,
    ControlChangeMsg,
    ControlChangePayload,
    ControlChangeQueueing,
    CustomActionMsg,
    CustomActionPayload,
    CustomActionResultMsg,
    CustomActionResultPayload,
    DataTagMsg,
    DataTagPayload,
    EdgeParameter,
    ParametersMsg,
    ParametersPayload,
    ParameterType,
    QueueingMode,
    RecommendationActions,
    RecommendationControlChange,
    RecommendationCustomAction,
    RecommendationMsg,
    RecommendationPayload,
    ReportedValues,
    ResourceParameters,
    StateEnum,
    ValuePoint,
)
from kelvin.message.evidences import BaseEvidence, Evidence
from kelvin.message.message import Message
from kelvin.message.msg_type import KMessageTypeAction

TMessage = TypeVar("TMessage", bound=Message)
TBuilder = TypeVar("TBuilder", bound="MessageBuilder[Any]")


class MessageBuilder(Generic[TMessage]):
    _msg: Optional[TMessage] = Field(default=None, init=False, repr=False)

    def to_message(self) -> TMessage:
        return NotImplemented

    @classmethod
    def from_message(cls: type[TBuilder], msg: TMessage) -> TBuilder:  # pyright: ignore[reportUnusedParameter]
        return NotImplemented


[docs] @dataclass class ControlChange(MessageBuilder[ControlChangeMsg]): """Control change builder. Use this helper class to build a Kelvin Control change. Exactly one of ``expiration_date`` or ``expiration_delta`` must be provided when the control change is used standalone. When used inside a :class:`Recommendation`, both may be omitted — the control change will inherit the recommendation's ``expiration_date``. Args: resource (KRN): The kelvin resource targeted by the control change, represented by a KRN (usually KRNAssetDataStream) payload (bool, int, float, str): The desired target value for the control change expiration_date (Optional[datetime | timedelta]): The absolute time in the future when the Control Change expires. Provide either an absolute datetime or a timedelta from now. Mutually exclusive with expiration_delta. expiration_delta (Optional[timedelta]): Expiration delta interval. Mutually exclusive with expiration_date. When used in a standalone control change, it is converted to an absolute datetime. When used inside a recommendation, it is passed through as seconds to the backend. retries (int): Optional number of retries retry_interval (int): Optional timeout time (for retries) control_change_id (UUID): Optional UUID to set an specific ID for the control change from_value (ValuePoint): Optional initial value for the control change trace_id (str): Optional trace ID for the control change queueing (ControlChangeQueueing): Optional queueing behaviour for the control change """ resource: KRN payload: Any expiration_date: Optional[Union[datetime, timedelta]] = None expiration_delta: Optional[timedelta] = Field( default=None, description="Expiration delta in seconds", gt=timedelta(seconds=0) ) retries: Optional[int] = None timeout: Annotated[Optional[int], Field(deprecated="Field 'timeout' is deprecated")] = None retry_interval: Optional[int] = None control_change_id: Optional[UUID] = None from_value: Optional[ValuePoint] = None trace_id: Optional[str] = None queueing: Optional[ControlChangeQueueing] = None def __post_init__(self) -> None: if self.expiration_date is not None and self.expiration_delta is not None: raise ValueError("Cannot set both 'expiration_date' and 'expiration_delta'") if self.timeout is not None and self.retry_interval is not None: raise ValueError("Cannot set both 'timeout' and 'retry_interval'")
[docs] @override def to_message(self) -> ControlChangeMsg: if self.expiration_delta is not None: expiration = datetime.now() + self.expiration_delta elif isinstance(self.expiration_date, datetime): expiration = self.expiration_date elif isinstance(self.expiration_date, timedelta): expiration = datetime.now() + self.expiration_date else: raise ValueError( "Either 'expiration_date' or 'expiration_delta' must be set for a standalone control change" ) return ControlChangeMsg( id=self.control_change_id if self.control_change_id is not None else uuid4(), trace_id=self.trace_id, resource=self.resource, payload=ControlChangePayload( timeout=self.timeout if self.timeout is not None else self.retry_interval, retries=self.retries, expiration_date=expiration, payload=self.payload, from_value=self.from_value, # type: ignore[assignment] queueing=QueueingMode(mode=self.queueing) if self.queueing is not None else None, ), )
[docs] @classmethod @override def from_message(cls, msg: ControlChangeMsg) -> ControlChange: obj = cls( resource=msg.resource, # type: ignore[arg-type] expiration_date=msg.payload.expiration_date, payload=msg.payload.payload, retries=msg.payload.retries, retry_interval=msg.payload.timeout, control_change_id=msg.id, from_value=msg.payload.from_value, trace_id=msg.trace_id, queueing=msg.payload.queueing.mode if msg.payload.queueing is not None else None, ) obj._msg = msg return obj
[docs] @dataclass class Recommendation(MessageBuilder[RecommendationMsg]): """Recommendation Builder. Use this helper class to build a Kelvin Recommendation. Args: resource (KRNAsset): The kelvin asset resource targeted by the recommendation type (str): the type of the recommendation, chose one from the available on the kelvin platform (eg generic, speed_inc, speed_dec, ...) expiration_date (datetime | timedelta): The absolute time in the future when the recommendation expires. Provide either a absolute datetime or a timedelta from now description (str): An optional description for the recommendation confidence (int): Optional confidence of the recommendation (from 1 to 4) control_changes (list[ControlChanges]): the list of ControlChanges associated with the recommendation metadata: (dict[str, Any]): Optional metadata for the recommendation auto_accepted (bool): Sets the Recommendation as auto accepted. Default is False evidences (list[Evidence]): List of evidences associated with the recommendation custom_identifier (str): Optional custom identifier for the recommendation actions (list[CustomAction]): List of custom actions associated with the recommendation trace_id (str): Optional trace ID for the recommendation """ resource: KRNAsset type: str expiration_date: Optional[Union[datetime, timedelta]] = None description: Optional[str] = None confidence: Optional[int] = Field(default=None, ge=1, le=4) control_changes: list[ControlChange] = Field(default_factory=list) metadata: Optional[dict[str, Any]] = None auto_accepted: bool = False evidences: list[Evidence] = Field(default_factory=list) custom_identifier: Optional[str] = None actions: list[CustomAction] = Field(default_factory=list) trace_id: Optional[str] = None
[docs] @override def to_message(self) -> RecommendationMsg: now = datetime.now() if self.expiration_date is None: rec_expiration_date = None elif isinstance(self.expiration_date, datetime): rec_expiration_date = self.expiration_date else: rec_expiration_date = now + self.expiration_date ccs: list[RecommendationControlChange] = [] for cc in self.control_changes: if cc.expiration_delta is not None: cc_expiration_date = None cc_expiration_delta = cc.expiration_delta.total_seconds() elif cc.expiration_date is not None: cc_expiration_date = ( cc.expiration_date if isinstance(cc.expiration_date, datetime) else now + cc.expiration_date ) cc_expiration_delta = None elif rec_expiration_date is not None: cc_expiration_date = rec_expiration_date cc_expiration_delta = None else: raise ValueError( "Control change has no expiration and the recommendation has no expiration_date to inherit from" ) ccs.append( RecommendationControlChange( retry=cc.retries, timeout=cc.timeout if cc.timeout is not None else cc.retry_interval, expiration_date=cc_expiration_date, expiration_delta=cc_expiration_delta, payload=cc.payload, resource=cc.resource, control_change_id=cc.control_change_id, from_value=cc.from_value, # type: ignore[assignment] trace_id=cc.trace_id, queueing=QueueingMode(mode=cc.queueing) if cc.queueing is not None else None, ) ) actions = [ RecommendationCustomAction( type=action.type, resource=action.resource, trace_id=action.trace_id, title=action.title, description=action.description, expiration_date=( action.expiration_date if isinstance(action.expiration_date, datetime) else now + action.expiration_date ), payload=action.payload, ) for action in self.actions ] return RecommendationMsg( resource=self.resource, trace_id=self.trace_id, payload=RecommendationPayload( resource=self.resource, type=self.type, description=self.description, expiration_date=rec_expiration_date, confidence=self.confidence, actions=RecommendationActions(control_changes=ccs, custom_actions=actions), metadata=self.metadata, state="auto_accepted" if self.auto_accepted is True else None, evidences=[BaseEvidence(type=ev._TYPE, payload=ev) for ev in self.evidences], # pyright: ignore[reportPrivateUsage] custom_identifier=self.custom_identifier, trace_id=self.trace_id, ), )
[docs] @dataclass class AppParameter: """App Parameter Helper. Args: resource (KRNAssetParameter): Kelvin Resource name for the target Asset Parameter value (Union[bool, int, float, string]): parameter value comment (Optional[str]): optional comment for parameter change """ resource: KRNAssetParameter value: ParameterType comment: Optional[str] = None
[docs] class AssetParameter(AppParameter): """[Deprecated] Use AppParameter instead Asset Parameter Helper. Args: resource (KRNAssetParameter): Kelvin Resource name for the target Asset Parameter value (Union[bool, int, float, string]): parameter value comment (Optional[str]): optional comment for parameter change """
[docs] @dataclass class AppParameters(MessageBuilder[ParametersMsg]): """Parameters Builder. Set application parameters in bulk. Args: resource (Optional[KRNAppVersion]): Optional Kelvin Resource name for the target App Version. Defaults to current app. parameters (list[AssetParameters]): list of single asset parameters """ parameters: list[AppParameter] resource: Optional[KRNAppVersion] = None
[docs] @override def to_message(self) -> ParametersMsg: asset_params: dict[str, list[EdgeParameter]] = {} for asset_param in self.parameters: asset_params.setdefault(asset_param.resource.asset, []).append( EdgeParameter(name=asset_param.resource.parameter, value=asset_param.value, comment=asset_param.comment) ) param_models = [ ResourceParameters(resource=KRNAsset(asset), parameters=params) for asset, params in asset_params.items() ] return ParametersMsg(resource=self.resource, payload=ParametersPayload(resource_parameters=param_models))
[docs] class AssetParameters(AppParameters): """[Deprecated] Use AppParameters instead Parameters Builder. Set application parameters in bulk. Args: resource (Optional[KRNAppVersion]): Optional Kelvin Resource name for the target App Version. Defaults to current app. parameters (list[AssetParameters]): list of single asset parameters """
[docs] @dataclass class DataTag(MessageBuilder[DataTagMsg]): """Data Tag. Args: start_date (datetime): The start date of the data tag. tag_name (str): The name of the data tag. resource (KRNAsset): The asset resource associated with the data tag. end_date (Optional[datetime]): The end date of the data tag. If not specified, the data tag is considered one point in time, the start_date. contexts (Optional[list[KRNDatastream]]): The list of datastream contexts associated with the data tag. description (Optional[str]): The description of the data tag. Truncated to 256 characters. """ start_date: datetime tag_name: str resource: KRNAsset end_date: Optional[datetime] = None contexts: Optional[list[KRNDatastream]] = None description: Optional[str] = None
[docs] @override def to_message(self) -> DataTagMsg: end_date = self.end_date or self.start_date description = self.description[:256] if self.description else None return DataTagMsg( resource=self.resource, payload=DataTagPayload( start_date=self.start_date, end_date=end_date, tag_name=self.tag_name, resource=self.resource, description=description, contexts=self.contexts, ), )
[docs] @classmethod @override def from_message(cls, msg: DataTagMsg) -> DataTag: obj = cls( start_date=msg.payload.start_date, tag_name=msg.payload.tag_name, resource=msg.payload.resource, end_date=msg.payload.end_date, contexts=msg.payload.contexts, description=msg.payload.description, ) obj._msg = msg return obj
[docs] @dataclass class ControlAck(MessageBuilder[ControlChangeAck]): """Control Change Ack Args: resource (KRNAssetDataStream): The resource associated with the control change state (StateEnum): The state of the control change message (Optional[str]): Optional message before (Optional[ValuePoint]): Optional value point before the control change after (Optional[ValuePoint]): Optional value point after the control change metadata (Optional[dict]): Optional metadata for the control change ack """ resource: KRNAssetDataStream state: StateEnum message: Optional[str] = None before: Optional[ValuePoint] = None after: Optional[ValuePoint] = None metadata: Optional[dict[str, Any]] = None
[docs] @override def to_message(self) -> ControlChangeAck: return ControlChangeAck( resource=self.resource, payload=ControlChangeAckPayload( state=self.state, message=self.message, reported=ReportedValues(before=self.before, after=self.after) if self.before or self.after else None, metadata=self.metadata, ), )
[docs] @dataclass class CustomAction(MessageBuilder[CustomActionMsg]): """Custom Action Builder. Use this helper class to build a Kelvin custom action. Args: resource (KRN): The kelvin resource targeted by the action, represented by a KRN (usually KRNAssetData) type (str): The type of the action title (str): The title of the action description (Optional[str]): Optional description for the action expiration_date (Union[datetime, timedelta]): Expiration date for the action. Provide either a absolute datetime or a timedelta from now payload (Dict): Optional payload for the action trace_id (Optional[str]): Optional trace ID for the action custom_action_id (UUID): Optional UUID to set an specific ID for the control change """ resource: KRN type: str title: str expiration_date: Union[datetime, timedelta] description: Optional[str] = None payload: dict[str, Any] = Field(default_factory=dict) trace_id: Optional[str] = None custom_action_id: Optional[UUID] = None
[docs] @override def to_message(self) -> CustomActionMsg: if isinstance(self.expiration_date, timedelta): expiration = datetime.now() + self.expiration_date else: expiration = self.expiration_date return CustomActionMsg( id=self.custom_action_id if self.custom_action_id is not None else uuid4(), type=KMessageTypeAction(action_type=self.type), trace_id=self.trace_id, resource=self.resource, payload=CustomActionPayload( title=self.title, description=self.description, expiration_date=expiration, payload=self.payload, ), )
[docs] @classmethod @override def from_message(cls, msg: CustomActionMsg) -> CustomAction: if not isinstance(msg.type, KMessageTypeAction): # pyright: ignore[reportUnnecessaryIsInstance] raise ValueError(f"invalid message type: {msg.type}") obj = cls( resource=msg.resource, # type: ignore[arg-type] type=msg.type.type, title=msg.payload.title, description=msg.payload.description, expiration_date=msg.payload.expiration_date, payload=msg.payload.payload, trace_id=msg.trace_id, custom_action_id=msg.id, ) obj._msg = msg return obj
[docs] def result( self, success: bool, message: Optional[str] = None, metadata: Optional[dict[str, Any]] = None ) -> CustomActionResult: """Create a result for the custom action. Args: success (bool): Whether the action was successful message (Optional[str]): Optional message metadata (Optional[dict]): Optional metadata for the action ack Returns: CustomActionAck: The custom action ack """ if self._msg is None: raise ValueError("cannot build CustomActionAck, internal _msg not set") return CustomActionResult( resource=self.resource, action_id=self._msg.id, success=success, message=message, metadata=metadata, trace_id=self.trace_id, )
[docs] @dataclass class CustomActionResult(MessageBuilder[CustomActionResultMsg]): """Custom Action Result Builder. Use this helper class to build a Kelvin custom action ack. Args: resource (KRN): The kelvin resource targeted by the action, represented by a KRN (usually KRNAssetDataStream) action_id (UUID): The ID of the action success (bool): Whether the action was successful message (Optional[str]): Optional message metadata: dict[str, Any]: Optional metadata for the action ack trace_id (Optional[str]): Optional trace ID for the action """ resource: KRN action_id: UUID success: bool message: Optional[str] = None metadata: Optional[dict[str, Any]] = None trace_id: Optional[str] = None
[docs] @override def to_message(self) -> CustomActionResultMsg: return CustomActionResultMsg( resource=self.resource, trace_id=self.trace_id, payload=CustomActionResultPayload( id=self.action_id, success=self.success, message=self.message, metadata=self.metadata, ), )
[docs] @classmethod @override def from_message(cls, msg: CustomActionResultMsg) -> CustomActionResult: obj = cls( resource=msg.resource, # type: ignore[arg-type] action_id=msg.payload.id, success=msg.payload.success, message=msg.payload.message, metadata=msg.payload.metadata, trace_id=msg.trace_id, ) obj._msg = msg return obj
BUILDER_REGISTRY: dict[type[Message], type[MessageBuilder[Any]]] = { ControlChangeMsg: ControlChange, CustomActionMsg: CustomAction, CustomActionResultMsg: CustomActionResult, DataTagMsg: DataTag, } def convert_message(msg: Message) -> Optional[MessageBuilder[Any]]: """Convert a message to its builder class. Args: msg (Message): The message to convert Returns: Optional[MessageBuilder]: The converted message builder """ if msg.__class__ in BUILDER_REGISTRY: return BUILDER_REGISTRY[msg.__class__].from_message(msg) return None