Source code for kelvin.application.client

from __future__ import annotations

import asyncio
import functools
import signal
from asyncio import Event, Queue
from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine
from datetime import datetime, timedelta
from types import TracebackType
from typing import (
    TYPE_CHECKING,
    Any,
    Optional,
    TypeVar,
    Union,
    cast,
    overload,
)
from zoneinfo import ZoneInfo

from pydantic import Field
from pydantic.dataclasses import dataclass
from typing_extensions import Literal, ParamSpec, Self, TypeGuard

import kelvin.application.filters as filters
from kelvin.application.api_client import initialize_api_client
from kelvin.application.clock import RealClock
from kelvin.application.scheduler import Scheduler, build_cron_expressions
from kelvin.application.stream import KelvinStream, KelvinStreamConfig, StreamInterface
from kelvin.application.timer import Timer
from kelvin.krn import KRNAsset, KRNAssetDataStream
from kelvin.logs import configure_logger, logger
from kelvin.message import AssetDataMessage, ControlChangeStatus, KMessageType, KMessageTypeData, Message
from kelvin.message.base_messages import ParameterType, PropertyType
from kelvin.message.msg_builders import CustomAction, DataTag, MessageBuilder, convert_message
from kelvin.message.runtime_manifest import ManifestDatastream, Resource, RuntimeManifest, WayEnum

if TYPE_CHECKING:
    from kelvin.api.client import AsyncClient
    from kelvin.application.clock import ClockInterface
    from kelvin.application.window import HoppingWindow, RollingWindow, TumblingWindow

E = TypeVar("E", bound=Exception)
T = TypeVar("T", bound=Message)

# Task Types
P = ParamSpec("P")
R = TypeVar("R")

if TYPE_CHECKING:
    SyncFunc = Callable[P, R]
    AsyncFunc = Callable[P, Awaitable[R]]
    TaskFunc = Union[SyncFunc[P, R], AsyncFunc[P, R]]

    # Stream Types
    StreamSyncFunc = Callable[[AssetDataMessage], Any]
    StreamAsyncFunc = Callable[[AssetDataMessage], Awaitable[Any]]
    StreamFunc = Union[StreamSyncFunc, StreamAsyncFunc]

_CANCEL_TASK_TIMEOUT = 5.0


[docs] class CallbackSlot: """A slot that stores an async callback and supports decorator registration. Supports two usage patterns: # Direct assignment (backwards compatible) app.on_connect = my_handler # Decorator @app.on_connect async def my_handler(): ... """ def __init__(self) -> None: self._handler: Optional[Callable[..., Awaitable[Any]]] = None def __bool__(self) -> bool: return self._handler is not None @property def __name__(self) -> str: return getattr(self._handler, "__name__", "unset_callback") if self._handler is not None else "unset_callback"
[docs] def set(self, fn: Optional[Callable[..., Any]]) -> None: """Store handler, wrapping sync -> async if needed.""" if fn is not None and not asyncio.iscoroutinefunction(fn): @functools.wraps(fn) async def wrapper(*args: Any, **kwargs: Any) -> Any: return await asyncio.to_thread(fn, *args, **kwargs) self._handler = wrapper else: self._handler = fn
[docs] def __call__(self, fn: Callable[..., Any]) -> Callable[..., Any]: """Decorator use: @app.on_connect""" self.set(fn) return fn
[docs] async def invoke(self, *args: Any, **kwargs: Any) -> None: """Internal use only. Invoke the stored handler if set.""" if self._handler is None: return await self._handler(*args, **kwargs)
[docs] @dataclass class Datastream: name: str type: KMessageType unit: Optional[str] = None
[docs] class AppIO(Datastream): pass
[docs] @dataclass class ResourceDatastream: asset: KRNAsset io_name: str datastream: Datastream configuration: dict[str, Any] = Field(default_factory=dict) way: WayEnum = WayEnum.output # deprecated owned: bool = False access: Literal["RO", "RW", "WO"] = "RO"
[docs] @dataclass class AssetInfo: name: str properties: dict[str, PropertyType] = Field(default_factory=dict) parameters: dict[str, ParameterType] = Field(default_factory=dict) datastreams: dict[str, ResourceDatastream] = Field(default_factory=dict) title: Optional[str] = None asset_type_name: Optional[str] = None asset_type_title: Optional[str] = None
[docs] class KelvinApp: """ Kelvin App to connect and interface with the KelvinStream. After connecting, the connection is handled automatically in the background. Use filters or filter_stream to easily listen for specific messages. Use register_callback methods to register callbacks for events like connect and disconnect. Use tasks to run background functions that can be async or sync. Use timers to run functions at regular intervals. The app can be used as an async context manager, which will automatically connect on enter and disconnect on exit. Example usage: async with KelvinApp() as app: await app.publish(Number(resource=KRNAssetDataStream('my-asset', 'my-input'), payload=1.0)) async for msg in app.stream_filter(filters.is_asset_data_message): print(msg) """ _MAX_BACKOFF: int = 60 def __init__( self, config: Optional[KelvinStreamConfig] = None, api: Optional[AsyncClient] = None, clock: Optional[ClockInterface] = None, ) -> None: self._connection: StreamInterface = KelvinStream(config or KelvinStreamConfig()) self._api: AsyncClient = api or initialize_api_client() """API client for direct connection with Kelvin API""" self._clock: ClockInterface = clock or RealClock() """Clock for time operations""" # App Filters self._filters: list[tuple[Queue[Message], Callable[[Message], TypeGuard[Message]]]] = [] # App Configuration self._app_configuration: dict[str, Any] = {} # App Assets self._assets: dict[str, AssetInfo] = {} # App IO self._inputs: list[AppIO] = [] self._outputs: list[AppIO] = [] # App Tasks self._tasks: dict[str, AsyncFunc[..., Any]] = {} self._running_tasks: set[asyncio.Task[Any]] = set() # App Internal State self._connect_lock_lazy: Optional[asyncio.Lock] = None self._read_loop_task: Optional[asyncio.Task[Any]] = None self._config_received_lazy: Optional[Event] = None self._loop: Optional[asyncio.AbstractEventLoop] = None # App Runtime Manifest self._runtime_manifest: Optional[RuntimeManifest] = None # App Callbacks self._on_connect: CallbackSlot = CallbackSlot() self._on_disconnect: CallbackSlot = CallbackSlot() self._on_message: CallbackSlot = CallbackSlot() self._on_asset_input: CallbackSlot = CallbackSlot() self._on_control_change: CallbackSlot = CallbackSlot() self._on_control_status: CallbackSlot = CallbackSlot() self._on_custom_action: CallbackSlot = CallbackSlot() self._on_data_tag: CallbackSlot = CallbackSlot() self._on_asset_change: CallbackSlot = CallbackSlot() self._on_app_configuration: CallbackSlot = CallbackSlot() configure_logger() # ---------------- # Callback Properties # ---------------- @property def on_connect(self) -> CallbackSlot: """Called when the connection is established.""" return self._on_connect @on_connect.setter def on_connect( self, fn: Optional[Union[Callable[[], Awaitable[None]], Callable[[], None]]], # pyright: ignore[reportPropertyTypeMismatch] ) -> None: self._on_connect.set(fn) @property def on_disconnect(self) -> CallbackSlot: """Called when the connection is closed.""" return self._on_disconnect @on_disconnect.setter def on_disconnect( self, fn: Optional[Union[Callable[[], Awaitable[None]], Callable[[], None]]], # pyright: ignore[reportPropertyTypeMismatch] ) -> None: self._on_disconnect.set(fn) @property def on_message(self) -> CallbackSlot: """Called on receipt of any message.""" return self._on_message @on_message.setter def on_message( self, fn: Optional[Union[Callable[[Message], Awaitable[None]], Callable[[Message], None]]], # pyright: ignore[reportPropertyTypeMismatch] ) -> None: self._on_message.set(fn) @property def on_asset_input(self) -> CallbackSlot: """Called when an asset data message is received.""" return self._on_asset_input @on_asset_input.setter def on_asset_input( self, fn: Optional[Union[Callable[[AssetDataMessage], Awaitable[None]], Callable[[AssetDataMessage], None]]], # pyright: ignore[reportPropertyTypeMismatch] ) -> None: self._on_asset_input.set(fn) @property def on_control_change(self) -> CallbackSlot: """Called when a control change message is received.""" return self._on_control_change @on_control_change.setter def on_control_change( self, fn: Optional[Union[Callable[[AssetDataMessage], Awaitable[None]], Callable[[AssetDataMessage], None]]], # pyright: ignore[reportPropertyTypeMismatch] ) -> None: self._on_control_change.set(fn) @property def on_control_status(self) -> CallbackSlot: """Called when a control status is received.""" return self._on_control_status @on_control_status.setter def on_control_status( self, fn: Optional[Union[Callable[[ControlChangeStatus], Awaitable[None]], Callable[[ControlChangeStatus], None]]], # pyright: ignore[reportPropertyTypeMismatch] ) -> None: self._on_control_status.set(fn) @property def on_custom_action(self) -> CallbackSlot: """Called when a custom action is received.""" return self._on_custom_action @on_custom_action.setter def on_custom_action( self, fn: Optional[Union[Callable[[CustomAction], Awaitable[None]], Callable[[CustomAction], None]]], # pyright: ignore[reportPropertyTypeMismatch] ) -> None: self._on_custom_action.set(fn) @property def on_data_tag(self) -> CallbackSlot: """Called when a data tag message is received.""" return self._on_data_tag @on_data_tag.setter def on_data_tag( self, fn: Optional[Union[Callable[[DataTag], Awaitable[None]], Callable[[DataTag], None]]], # pyright: ignore[reportPropertyTypeMismatch] ) -> None: self._on_data_tag.set(fn) @property def on_asset_change(self) -> CallbackSlot: """Called when an asset is added, removed, or modified. First arg is the new asset (None if removed); second is the previous asset (None if newly added).""" return self._on_asset_change @on_asset_change.setter def on_asset_change( self, fn: Optional[ # pyright: ignore[reportPropertyTypeMismatch] Union[ Callable[[Optional[AssetInfo], Optional[AssetInfo]], Awaitable[None]], Callable[[Optional[AssetInfo], Optional[AssetInfo]], None], ] ], ) -> None: self._on_asset_change.set(fn) @property def on_app_configuration(self) -> CallbackSlot: """Called when the app configuration changes.""" return self._on_app_configuration @on_app_configuration.setter def on_app_configuration( self, fn: Optional[Union[Callable[[dict[str, Any]], Awaitable[None]], Callable[[dict[str, Any]], None]]], # pyright: ignore[reportPropertyTypeMismatch] ) -> None: self._on_app_configuration.set(fn) # ---------------- # Properties # ---------------- @property def is_connected(self) -> bool: """ Indicates whether the read loop is active, implying an established connection. """ return bool(self._read_loop_task and not self._read_loop_task.done()) @property def connection(self) -> StreamInterface: """The stream connection interface for Kelvin messaging.""" return self._connection @property def api(self) -> AsyncClient: """The API client for direct connection with Kelvin API.""" return self._api @property def clock(self) -> "ClockInterface": """The clock interface for time operations.""" return self._clock @property def _connect_lock(self) -> asyncio.Lock: """Lazy initialization of connect lock for Python 3.9 compatibility.""" if self._connect_lock_lazy is None: self._connect_lock_lazy = asyncio.Lock() return self._connect_lock_lazy @property def _config_received(self) -> Event: """Lazy initialization of config received event for Python 3.9 compatibility.""" if self._config_received_lazy is None: self._config_received_lazy = Event() return self._config_received_lazy @property def assets(self) -> dict[str, AssetInfo]: """ Get all assets configured for this application. It returns a dictionary where each key is the asset name, and the value is an `AssetInfo` object describing that asset's properties, parameters, and datastreams. The `assets` dictionary is dynamically updated whenever the application receives updates to asset properties or parameters, ensuring it always reflects the latest configuration. Returns: Dict[str,AssetInfo]: A dictionary where keys are asset names (strings) and values are AssetInfo instances representing the current configuration and state of each asset. Example: { "asset1": AssetInfo( name="asset1", properties={ "tubing_length": 25.0, "area": 11.0 }, parameters={ "param-bool": False, "param-number": 7.5, "param-string": "hello" }, datastreams={ "output1": ResourceDatastream( asset=KRNAsset("asset1"), io_name="output1", datastream=Datastream( name="datastream1", type=KMessageTypeData("float"), unit="m" ), access="RO", owned=True, configuration={} ) } ) } """ if not self.is_connected: raise RuntimeError("cannot get assets: KelvinApp is not connected") return self._assets.copy() @property def app_configuration(self) -> dict[str, Any]: """ Get the application configuration. Returns: dict: A mapping of configuration sections to their values, matching the structure in app.yaml. Example: { "foo": { "conf_string": "value1", "conf_number": 25, "conf_bool": False, } } """ if not self.is_connected: raise RuntimeError("cannot get app_configuration: KelvinApp is not connected") return self._app_configuration @property def config_received(self) -> Event: """ Event set when the initial configuration is received. Use this asyncio.Event to wait until the application has loaded its initial app/asset parameters. Returns: asyncio.Event: Event that becomes set once the initial configuration arrives. Example: await app.config_received.wait() """ return self._config_received @property def loop(self) -> asyncio.AbstractEventLoop: """Return the running event loop once the app has started.""" if self._loop is None: raise RuntimeError("event loop not set yet, call connect first") return self._loop @property def inputs(self) -> list[AppIO]: """ List all input metrics configured for the application. Each AppIO has: - name (str): the metric identifier. - data_type (str): the data type of the input. Returns: List[AppIO]: Read-only list of configured input metrics. """ if not self.is_connected: raise RuntimeError("cannot get inputs: KelvinApp is not connected") return self._inputs @property def outputs(self) -> list[AppIO]: """ List all output metrics configured for the application. Each AppIO has: - name (str): the metric identifier. - data_type (str): the data type of the output. Returns: List[AppIO]: Read-only list of configured output metrics. """ if not self.is_connected: raise RuntimeError("cannot get outputs: KelvinApp is not connected") return self._outputs @property def tasks(self) -> dict[str, Callable[[], Awaitable[Any]]]: """ Retrieve registered asynchronous tasks. Returns: Dict[str, Callable[[], Awaitable]]: Dict of task names and callable that produce awaitables when called. Represents tasks scheduled or pending. """ return self._tasks # ---------------- # Async context‐manager support # ----------------
[docs] async def __aenter__(self) -> Self: """ Support async context: connect on enter. """ await self.connect() return self
[docs] async def __aexit__( self, _exc_type: Optional[type[E]], _exc_value: Optional[E], _tb: Optional[TracebackType], ) -> Optional[bool]: """ Support async context: disconnect on exit. """ await self.disconnect() # Do not suppress exceptions return False
# ---------------- # Internal Helpers # ---------------- async def _callback(self, slot: CallbackSlot, *args: Any) -> None: """ Safely invoke a callback slot, catching and logging any exceptions (but allow cancellation to propagate). """ if not slot: return try: await slot.invoke(*args) except asyncio.CancelledError: raise # Propagate cancellation except Exception as ex: logger.exception("Error invoking callback", callback_name=slot.__name__, ex=ex) raise def _default_assets_and_inputs( self, assets: Optional[list[str]] = None, inputs: Optional[list[str]] = None ) -> tuple[list[str], list[str]]: if not assets: assets = list(self.assets.keys()) if not inputs: inputs = [i.name for i in self.inputs] return assets, inputs def _apply_asset_input_filters(self, assets: list[str], inputs: list[str]) -> Queue[AssetDataMessage]: def _checker(msg: Message) -> TypeGuard[AssetDataMessage]: return filters.asset_equals(assets)(msg) and filters.input_equals(inputs)(msg) return self.filter(_checker) # ---------------- # Connection Handling # ----------------
[docs] async def connect(self) -> None: """ Establish connection, retrying indefinitely until success. Starts the read loop and waits for App Configuration before firing `on_connect` callback and tasks/timers. Returns: None. Raises: RuntimeError: If called without a running event loop. """ async with self._connect_lock: # Prevent duplicate connects if self.is_connected: return # Capture the running event loop self._loop = asyncio.get_running_loop() # Reset and start read loop self._config_received.clear() self._read_loop_task = asyncio.create_task(self._read_loop(), name="app-internal:read-loop") # Wait for the App Configuration before firing on_connect callback _ = await self._config_received.wait() # Invoke on_connect callback await self._callback(self.on_connect) # Start all registered tasks await self._start_tasks()
[docs] async def disconnect(self) -> None: """ Cancel read loop, stop all tasks, fire on_disconnect, and close stream. """ logger.debug("Disconnecting from KelvinStream...") async with self._connect_lock: # Cancel the read loop task await self._cancel_task(self._read_loop_task) self._read_loop_task = None # Cancel all background tasks await self._stop_tasks() # Disconnect from stream try: await self._connection.disconnect() except ConnectionError: pass # Invoke on_disconnect callback await self._callback(self.on_disconnect) # Reset lazy asyncio primitives so the app can reconnect on a new loop self._loop = None self._connect_lock_lazy = None self._config_received_lazy = None self._filters.clear() logger.debug("Disconnected from KelvinStream")
async def _retry_connect(self) -> None: """ Keep calling stream.connect() with exponential backoff until success. """ delay = 1 while True: try: logger.debug("Connecting to KelvinStream...") await self._connection.connect() logger.debug("Successfully connected to KelvinStream") return except asyncio.CancelledError: raise except ConnectionError: logger.error(f"Connection to KelvinStream failed, retrying in {delay}s...") await asyncio.sleep(delay) delay = min(delay * 2, self._MAX_BACKOFF) async def _read_loop(self) -> None: """ Persistent read loop: ensures connection, reads messages, triggers callbacks, and reconnects on error. """ logger.debug("Read Loop: starting...") try: while True: await self._retry_connect() try: while True: msg = await self._connection.read() await self._process_message(msg) self._route_to_filters(msg) except asyncio.CancelledError: return except ConnectionError: logger.error("Read Loop: lost connection to KelvinStream, reconnecting...") except Exception as ex: logger.error("Read Loop: unexpected error", ex=ex) finally: logger.debug("Read Loop: exiting") # ---------------- # Message Handling # ---------------- async def _process_app_configuration(self, configuration: dict[str, Any]) -> None: if configuration != self.app_configuration: self._app_configuration = configuration # Invoke callback if it's not initial configuration if self._config_received.is_set(): await self._callback(self.on_app_configuration, self.app_configuration) async def _process_resources( self, resources: list[Resource], datastreams: list[ManifestDatastream], ) -> None: """ Synchronize self._assets, self._inputs, self._outputs based on the incoming resources list and the manifest datastream definitions; fire on_asset_change callbacks for any added, updated, or removed assets, but only once at the end. """ # 1) Build manifest lookup manifest_ds_map: dict[str, ManifestDatastream] = {ds.name: ds for ds in datastreams} # 2) Filter only real assets and warn on non-assets asset_resources: dict[str, Resource] = {} for res in resources: if isinstance(res.resource, KRNAsset): asset_resources[res.resource.asset] = res else: logger.warning("Skipping non-asset resource %r", res.resource) # 3) Prepare new IO maps and collect change events new_inputs: dict[str, AppIO] = {} new_outputs: dict[str, AppIO] = {} # [(new_info, old_info), ...] callbacks: list[tuple[Optional[AssetInfo], Optional[AssetInfo]]] = [] # 4) Remember which assets existed before previous_assets = set(self._assets.keys()) # 5) Process each current asset for asset_name, asset_cfg in asset_resources.items(): asset_detail = asset_cfg.asset new_info = AssetInfo( name=asset_name, properties=asset_cfg.properties, parameters=asset_cfg.parameters, datastreams={}, title=asset_detail.title if asset_detail else None, asset_type_name=asset_detail.type.name if asset_detail else None, asset_type_title=asset_detail.type.title if asset_detail else None, ) for ds_name, ds_cfg in asset_cfg.datastreams.items(): manif_ds = manifest_ds_map.get(ds_name) if manif_ds is None: logger.error("No manifest entry for datastream %s on asset %s", ds_name, asset_name) continue io_name = ds_cfg.map_to or ds_name msg_type = KMessageTypeData(cast(str, manif_ds.primitive_type_name)) new_info.datastreams[io_name] = ResourceDatastream( asset=asset_cfg.resource, # type: ignore[arg-type] io_name=io_name, access=ds_cfg.access, way=ds_cfg.way, owned=bool(ds_cfg.owned), configuration=ds_cfg.configuration, datastream=Datastream(name=ds_name, type=msg_type, unit=manif_ds.unit_name), ) if ds_cfg.way in [WayEnum.input, WayEnum.input_output_cc]: new_inputs[io_name] = AppIO(name=io_name, type=msg_type) elif ds_cfg.way in [WayEnum.output, WayEnum.input_cc_output]: new_outputs[io_name] = AppIO(name=io_name, type=msg_type) # record change old_info: Optional[AssetInfo] = self._assets.get(asset_name) self._assets[asset_name] = new_info callbacks.append((new_info, old_info)) # 6) Detect removed assets removed = previous_assets - set(asset_resources.keys()) for name in removed: old_info_existing: AssetInfo = self._assets.pop(name) callbacks.append((None, old_info_existing)) # 7) Update IO lists self._inputs = list(new_inputs.values()) self._outputs = list(new_outputs.values()) # 8) Invoke callback if it's not initial configuration if self._config_received.is_set(): for new_info_opt, old_info_opt in callbacks: # do not shadow the AssetInfo-typed new_info above await self._callback(self.on_asset_change, new_info_opt, old_info_opt) async def _process_runtime_manifest(self, msg: RuntimeManifest) -> None: logger.debug(f"Processing RuntimeManifest. Initial Manifest: {not self.config_received.is_set()}") await self._process_resources(msg.payload.resources, msg.payload.datastreams) await self._process_app_configuration(msg.payload.configuration) # Mark config received self._config_received.set() async def _process_message(self, msg: Message) -> None: """ Route an incoming Message (or RuntimeManifest) to the proper async handler. """ # Handle RuntimeManifest if isinstance(msg, RuntimeManifest): await self._process_runtime_manifest(msg) return # Invoke callbacks await self._callback(self.on_message, msg) if self.msg_is_control_change(msg): await self._callback(self.on_control_change, msg) return if filters.is_asset_data_message(msg): await self._callback(self.on_asset_input, msg) return if filters.is_control_status_message(msg): await self._callback(self.on_control_status, msg) return if filters.is_custom_action(msg): await self._callback(self.on_custom_action, convert_message(msg)) # type: ignore[arg-type] return if filters.is_data_tag(msg): await self._callback(self.on_data_tag, convert_message(msg)) # type: ignore[arg-type] return def _route_to_filters(self, msg: Message) -> None: """ Route a message to all registered filters. For each (queue, predicate) in self._filters: - If predicate(msg) returns True, convert the message if possible, then put it into the queue without waiting. """ for queue, predicate in self._filters: try: if predicate(msg): converted = convert_message(msg) or msg # TODO: check if the message is reference queue.put_nowait(converted) # type: ignore[arg-type] except Exception: # If a filter raises, we choose to ignore it or log if desired. logger.exception(f"Filter {predicate!r} raised on message: {msg!r}")
[docs] async def wait_for_processing(self) -> None: """Wait for all stream processing to complete. Joins all filter queues, blocking until every message that was routed has been processed (task_done called). Note: Only works for @app.stream() handlers, windowing streams, and stream_filter() consumers. Raw filter() queue users must call task_done() themselves. """ for queue, _ in self._filters: await queue.join()
[docs] async def wait_for_tasks(self, timeout: float = 5.0) -> None: """Wait for finite tasks to settle. Gives running tasks up to *timeout* real-time seconds to complete. Long-running (never-ending) tasks that are still pending after the timeout are silently ignored — they will be cancelled on disconnect. """ if not self._running_tasks: return _, _ = await asyncio.wait( self._running_tasks, timeout=timeout, return_when=asyncio.ALL_COMPLETED, )
# ---------------- # Publish # ----------------
[docs] async def publish(self, msg: Union[Message, MessageBuilder[Any]]) -> bool: """ Publish a message to KelvinStream. Accepts either a Message instance or a MessageBuilder. If given a MessageBuilder, it is converted to a Message via `to_message()` before sending. Returns True on success, or False if the connection is unavailable. Parameters: msg (Union[Message, MessageBuilder]): - A Message to send directly, or - A MessageBuilder which will be converted to Message. Returns: bool: - True if the message was sent successfully. - False if sending failed due to a ConnectionError. Examples: message = Number(resource=KRNAssetDataStream('my-asset', 'my-input'), payload=1.0) success = await client.publish(message) if success: print("Published message") else: print("Publish failed") """ if not self.is_connected: raise RuntimeError("cannot publish message: KelvinApp is not connected") try: if isinstance(msg, MessageBuilder): m = msg.to_message() else: m = msg return await self._connection.write(m) except ConnectionError: logger.error("Failed to publish message: Connection is unavailable") return False
# ---------------- # Filters # ----------------
[docs] def filter(self, func: Callable[[Message], TypeGuard[T]]) -> Queue[T]: """ Creates a filter for the received Kelvin Messages based on a filter function. Parameters: func (filters.KelvinFilterType): Filter function, it should receive a Message as argument and return bool. Returns: Queue[Message]: Returns a asyncio queue to receive the filtered messages. """ queue: Queue[T] = Queue() self._filters.append((queue, func)) # type: ignore[arg-type] return queue
[docs] def stream_filter(self, func: Callable[[Message], TypeGuard[T]]) -> AsyncGenerator[T, None]: """ Creates a stream for the received Kelvin Messages based on a filter function. See filter. Parameters: func (filters.KelvinFilterType): Filter function, it should receive a Message as argument and return bool. Returns: AsyncGenerator[Message, None]: Async Generator that can be async iterated to receive filtered messages. Yields: Iterator[AsyncGenerator[Message, None]]: Yields the filtered messages. """ queue: Queue[T] = self.filter(func) async def _generator() -> AsyncGenerator[T, None]: while True: msg = await queue.get() try: yield msg finally: queue.task_done() return _generator()
# ---------------- # Windowing # ----------------
[docs] def tumbling_window( self, window_size: timedelta, assets: Optional[list[str]] = None, inputs: Optional[list[str]] = None, round_to: Optional[timedelta] = None, ) -> TumblingWindow: """ Creates a fixed, non-overlapping windowing. Parameters: window_size: Duration of each window. assets: Optional list of asset names to filter on. inputs: Optional list of input names (data streams) to include as columns in the window. round_to: Optional interval to which window boundaries are aligned. Returns: TumblingWindow: An instance configured with the given parameters. """ from kelvin.application.window import TumblingWindow assets, inputs = self._default_assets_and_inputs(assets, inputs) queue = self._apply_asset_input_filters(assets, inputs) return TumblingWindow( assets=assets, inputs=inputs, window_size=window_size, queue=queue, round_to=round_to, clock=self.clock, )
[docs] def hopping_window( self, window_size: timedelta, hop_size: timedelta, assets: Optional[list[str]] = None, inputs: Optional[list[str]] = None, round_to: Optional[timedelta] = None, ) -> HoppingWindow: """ Creates a window with fixed size and overlap. Parameters: window_size: Duration of each window. hop_size: Step between window starts (defines overlap). assets: Optional list of asset names to filter on. inputs: Optional list of input names (data streams) to include as columns in the window. round_to: Optional interval to which window boundaries are aligned. Returns: HoppingWindow: An instance configured with the given parameters. """ from kelvin.application.window import HoppingWindow assets, inputs = self._default_assets_and_inputs(assets, inputs) queue = self._apply_asset_input_filters(assets, inputs) return HoppingWindow( assets=assets, inputs=inputs, window_size=window_size, hop_size=hop_size, queue=queue, round_to=round_to, clock=self.clock, )
[docs] def rolling_window( self, count_size: int, assets: Optional[list[str]] = None, inputs: Optional[list[str]] = None, round_to: Optional[timedelta] = None, slide: int = 1, ) -> RollingWindow: """ Creates a sliding count-based window over incoming data. Parameters: count_size: Number of records per window. assets: Optional list of asset names to filter on. inputs: Optional list of input names (data streams) to include as columns in the window. round_to: Optional interval to which window boundaries are aligned. slide: Number of records to slide the window forward on each update. Returns: RollingWindow: An instance configured with the given parameters. """ from kelvin.application.window import RollingWindow assets, inputs = self._default_assets_and_inputs(assets, inputs) queue = self._apply_asset_input_filters(assets, inputs) return RollingWindow( assets=assets, inputs=inputs, count_size=count_size, queue=queue, round_to=round_to, slide=slide, clock=self.clock, )
# ---------------- # Stream # ----------------
[docs] def stream( self, fn: Optional[StreamFunc] = None, *, assets: Optional[list[str]] = None, inputs: Optional[list[str]] = None, ) -> Union[AsyncFunc[P, R], Callable[[TaskFunc[P, R]], AsyncFunc[P, R]]]: """ Register a per-message stream handler that is invoked for each incoming AssetDataMessage matching the optional assets/inputs filters. Usage patterns: @app.stream() async def my_stream(msg: AssetDataMessage): ... @app.stream(inputs=["in1"]) async def my_stream(msg: AssetDataMessage): ... @app.stream(assets=["asset1"]) async def my_stream(msg: AssetDataMessage): ... @app.stream(assets=["asset1"], inputs=["in1", "in2"]) async def my_stream(msg: AssetDataMessage): ... @app.stream(assets=["asset1"], inputs=["in1", "in2"]) def my_stream(msg: AssetDataMessage): ... def my_stream(msg: AssetDataMessage): ... app.stream(my_stream, assets=["asset1"], inputs=["in1"]) The registered stream runs as an app task when the app connects. """ if self.is_connected: raise RuntimeError("cannot register streams: KelvinApp is already connected") # Return a decorator when called without a function if fn is None: def _decorator(inner: StreamFunc) -> AsyncFunc[P, R]: return self.stream(inner, assets=assets, inputs=inputs) # type: ignore[return-value] return _decorator # type: ignore[return-value] # Build the queue based on the provided filters # This uses the same helper used by windowing so semantics match there. def _build_queue() -> Queue[AssetDataMessage]: a, i = self._default_assets_and_inputs(assets, inputs) return self._apply_asset_input_filters(a, i) # Name the task for easier debugging stream_name = f"{fn.__module__}.{fn.__qualname__}" # Wrap the user's handler in a runner task that consumes from the queue forever async def _runner() -> None: queue: Queue[AssetDataMessage] = _build_queue() while True: msg = await queue.get() try: if asyncio.iscoroutinefunction(fn): await fn(msg) else: # Run sync handlers off the main loop _ = await asyncio.to_thread(fn, msg) except Exception as ex: logger.error("Stream handler raised an exception", stream_name=stream_name, ex=ex) finally: queue.task_done() # Register the runner like any other task so it starts on connect self._tasks[stream_name] = _runner # Return the runner (async) for completeness, matching task()/timer() behavior return _runner # type: ignore[return-value]
# ---------------- # Tasks # ----------------
[docs] def task( self, fn: Optional[TaskFunc[P, R]] = None, *, name: Optional[str] = None ) -> Union[AsyncFunc[P, R], Callable[[TaskFunc[P, R]], AsyncFunc[P, R]]]: """ Register a function as a task, either sync or async. This method acts as both a decorator and a decorator factory. It supports the following usage patterns: @app.task async def my_async_task(...): ... @app.task() def my_sync_task(...): ... @app.task(name="custom.task.name") def another_task(...): ... app.task(some_function) Parameters: fn (Optional[TaskFunc[P, R]]): The function to register. Can be sync or async. If not provided, a decorator is returned. name (Optional[str]): Optional name to register the task under. If not provided, the fully-qualified function name is used. Returns: If `fn` is provided, returns the async-compatible task wrapper. If `fn` is None, returns a decorator that can be applied to a function. """ if self.is_connected: raise RuntimeError("cannot register tasks: KelvinApp is already connected") # no‐arg means "I want a decorator, not yet a function" if fn is None: def _decorator(inner: TaskFunc[P, R]) -> AsyncFunc[P, R]: return self.task(inner, name=name) # type: ignore[return-value] return _decorator # here fn is the actual function to register task_name = name or f"{fn.__module__}.{fn.__qualname__}" if asyncio.iscoroutinefunction(fn): self._tasks[task_name] = fn return fn else: @functools.wraps(fn) async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: return await asyncio.to_thread(fn, *args, **kwargs) # type: ignore[arg-type, return-value] self._tasks[task_name] = _wrapper return _wrapper
def _handle_task_result(self, task: asyncio.Task[Any]) -> None: """Process task completion and log the result. Called as a done callback for all registered tasks. Logs success, cancellation, or exceptions, and removes the task from tracking. Args: task: The completed asyncio task. """ task_name = task.get_name() try: # this will re‐raise the exception if one occurred task.result() logger.info("Task completed successfully", task_name=task_name) except asyncio.CancelledError: logger.info("Task was cancelled", task_name=task_name) except Exception as ex: logger.error("Task raised an exception", task_name=task_name, ex=ex) self._running_tasks.discard(task) # Remove from running tasks async def _start_tasks(self) -> None: """ Schedule all registered coroutine functions in self._tasks. Each task is named for easier debugging and removed from the set when done. """ for task_name, coro_fn in self._tasks.items(): logger.debug("Starting task", task_name=task_name) task: asyncio.Task[Any] = asyncio.create_task( cast(Coroutine[Any, Any, Any], coro_fn()), name=f"app-task:{task_name}", ) task.add_done_callback(self._handle_task_result) # Add to running tasks self._running_tasks.add(task) async def _stop_tasks(self) -> None: """ Cancel and await all running tasks, then clear the tracking set. Assumes a helper _cancel_task(task) exists to cancel and await the task. """ for t in list(self._running_tasks): # copy to avoid modifying during iteration await self._cancel_task(t) async def _cancel_task(self, task: Optional[asyncio.Task[Any]]) -> None: """ Cancel a specific task and wait for it to finish. """ if not task: return if task.done(): return logger.debug("Cancelling task", task_name=task.get_name() or task) _ = task.cancel() try: await asyncio.wait_for(task, timeout=_CANCEL_TASK_TIMEOUT) except asyncio.TimeoutError: logger.warning("Task did not cancel within timeout", task_name=task.get_name() or task) except asyncio.CancelledError: pass except Exception as ex: logger.error("Task raised after cancellation", task_name=task.get_name() or task, ex=ex) # ---------------- # Tasks Timers # ---------------- @overload def timer(self, fn: TaskFunc[[], Any], *, interval: float, name: Optional[str] = None) -> AsyncFunc[[], Any]: ... @overload def timer( self, *, interval: float, name: Optional[str] = None ) -> Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]: ...
[docs] def timer( self, fn: Optional[TaskFunc[[], Any]] = None, *, interval: float, name: Optional[str] = None ) -> Union[AsyncFunc[[], Any], Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]]: """ Register a function to be called at a repeating interval. Usage patterns: @app.timer(interval=5) def foo(): ... @app.timer(interval=5, name="my timer") async def foo(): ... def bar(): ... app.timer(bar, interval=10) app.timer(bar, interval=10, name="bar.timer") """ if self.is_connected: raise RuntimeError("cannot register timers: KelvinApp is already connected") if fn is None: def _decorator(inner: TaskFunc[[], Any]) -> AsyncFunc[[], Any]: return self.timer(inner, interval=interval, name=name) return _decorator timer_name = name or f"{fn.__module__}.{fn.__qualname__}" async def _wrapper() -> None: t = Timer(interval=interval, name=timer_name, clock=self.clock) async for _ in t: try: if asyncio.iscoroutinefunction(fn): await fn() else: _ = await asyncio.to_thread(fn) except asyncio.CancelledError: raise except Exception as ex: logger.error("Timer raised an exception", timer_name=timer_name, ex=ex) self._tasks[timer_name] = _wrapper return _wrapper
# ---------------- # Task Schedules # ---------------- @overload def schedule( self, fn: TaskFunc[[], Any], *, every: Optional[str] = None, at: Optional[Union[str, list[str]]] = None, cron: Optional[str] = None, timezone: str = "UTC", start_time: Optional[datetime] = None, interval: Optional[int] = None, name: Optional[str] = None, ) -> AsyncFunc[[], Any]: ... @overload def schedule( self, *, every: Optional[str] = None, at: Optional[Union[str, list[str]]] = None, cron: Optional[str] = None, timezone: str = "UTC", start_time: Optional[datetime] = None, interval: Optional[int] = None, name: Optional[str] = None, ) -> Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]: ...
[docs] def schedule( self, fn: Optional[TaskFunc[[], Any]] = None, *, every: Optional[str] = None, at: Optional[Union[str, list[str]]] = None, cron: Optional[str] = None, timezone: str = "UTC", start_time: Optional[datetime] = None, interval: Optional[int] = None, name: Optional[str] = None, ) -> Union[AsyncFunc[[], Any], Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]]: """ Register a function to run on a cron-like schedule. Supports two styles of schedule definition: Human-readable: @app.schedule(every="day", at="19:00", timezone="Australia/Sydney") async def daily_report(): ... @app.schedule(every="monday", at="09:00") async def weekly_meeting(): ... @app.schedule(every="day", at=["09:00", "17:00"]) async def twice_daily(): ... Cron expression: @app.schedule(cron="0 9 * * MON-FRI", timezone="Europe/London") async def weekday_job(): ... With interval (every Nth occurrence): @app.schedule(every="monday", at="09:00", interval=2, start_time=datetime(2026, 4, 6)) async def biweekly_sync(): ... Args: fn: The function to register. Can be sync or async. every: Human-readable recurrence ("day", "monday", "weekday", etc.). at: Time(s) of day in "HH:MM" format. Required for ``every`` schedules. Can be a single string or a list of strings for multiple daily times. cron: A standard 5-field cron expression. Mutually exclusive with every/at. timezone: IANA timezone name (default "UTC"). start_time: Reference datetime for deterministic schedule alignment. When set, the scheduler computes fire times from this point. interval: Fire every Nth cron match. When set with ``start_time``, the count survives restarts. Without ``start_time``, the first match always fires and the count resets on restart. name: Optional task name for logging. """ if self.is_connected: raise RuntimeError("You cannot register schedules after the KelvinApp is connected.") # Validate eagerly at decoration time cron_expressions = build_cron_expressions(every=every, at=at, cron=cron) try: tz = ZoneInfo(timezone) except (KeyError, Exception) as exc: raise ValueError(f"Invalid timezone: {timezone!r}") from exc if interval is not None and interval < 1: raise ValueError(f"Parameter 'interval' must be >= 1, got {interval}.") if fn is None: def decorator(inner: TaskFunc[[], Any]) -> AsyncFunc[[], Any]: return self.schedule( inner, every=every, at=at, cron=cron, timezone=timezone, start_time=start_time, interval=interval, name=name, ) return decorator schedule_name = name or f"{fn.__module__}.{fn.__qualname__}" async def wrapper() -> None: scheduler = Scheduler( cron_expressions=cron_expressions, name=schedule_name, tz=tz, start_time=start_time, interval=interval, clock=self._clock, ) async for _ in scheduler: try: if asyncio.iscoroutinefunction(fn): await fn() else: _ = await asyncio.to_thread(fn) except asyncio.CancelledError: raise except Exception as ex: logger.error("Schedule raised an exception", schedule_name=schedule_name, ex=ex) self._tasks[schedule_name] = wrapper return wrapper
# ---------------- # Run # ----------------
[docs] async def run_forever(self) -> None: """ Connects to the service and then waits indefinitely until cancelled. On cancellation, disconnects cleanly before propagating the cancellation. """ # Get the current event loop loop = asyncio.get_running_loop() # Connect await self.connect() # Create an Event that is never set, so wait() blocks until cancelled. stop_event = asyncio.Event() # Register signal handlers for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, stop_event.set) try: _ = await stop_event.wait() finally: # Ensure we disconnect cleanly await self.disconnect()
[docs] def run(self) -> None: """ Synchronous entry point: - Starts an asyncio event loop to run run_forever(). - Blocks until run_forever() completes or is cancelled. - Allows Ctrl-C (KeyboardInterrupt) to stop cleanly. """ # asyncio.run will set up the loop, run run_forever(), and close the loop. asyncio.run(self.run_forever())
# ---------------- # Public Helper Methods # ----------------
[docs] def msg_is_control_change(self, msg: Message) -> TypeGuard[AssetDataMessage]: """Check if a message is a control change type. A control change is an asset data message where the datastream's way is configured as input_cc_output or input_cc. Parameters: msg: The message to check. Returns: True if the message is a control change, False otherwise. """ if not isinstance(msg.resource, KRNAssetDataStream) or not isinstance(msg.type, KMessageTypeData): return False try: resource = self.assets[msg.resource.asset].datastreams[msg.resource.data_stream] except KeyError: return False return resource.way in [WayEnum.input_cc_output, WayEnum.input_cc]