Application¶
The application module provides the core framework for building Kelvin applications.
Kelvin Application SDK.
This module provides the core components for building Kelvin applications that connect to and interact with the Kelvin platform.
- Main Components:
KelvinApp: The main application class for connecting to Kelvin and processing messages. KelvinStream: Low-level stream interface for Kelvin communication. AssetInfo: Information about configured assets. Datastream: Datastream configuration. ResourceDatastream: Asset datastream configuration with access control.
Example
>>> from kelvin.application import KelvinApp
>>> async with KelvinApp() as app:
... async for msg in app.stream_filter(filters.is_asset_data_message):
... print(msg)
- class kelvin.application.KelvinApp(config=None, api=None, clock=None)[source]¶
Bases:
objectKelvin App to connect and interface with the KelvinStream.
After connecting, the connection is handled automatically in the background.
Use filters or filter_stream to easily listen for specific messages. Use register_callback methods to register callbacks for events like connect and disconnect. Use tasks to run background functions that can be async or sync. Use timers to run functions at regular intervals.
The app can be used as an async context manager, which will automatically connect on enter and disconnect on exit.
- Example usage:
- async with KelvinApp() as app:
await app.publish(Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0)) async for msg in app.stream_filter(filters.is_asset_data_message):
print(msg)
- Parameters:
config (Optional[KelvinStreamConfig])
api (Optional[AsyncClient])
clock (Optional[ClockInterface])
- property on_connect: CallbackSlot¶
Called when the connection is established.
- property on_disconnect: CallbackSlot¶
Called when the connection is closed.
- property on_message: CallbackSlot¶
Called on receipt of any message.
- property on_asset_input: CallbackSlot¶
Called when an asset data message is received.
- property on_control_change: CallbackSlot¶
Called when a control change message is received.
- property on_control_status: CallbackSlot¶
Called when a control status is received.
- property on_custom_action: CallbackSlot¶
Called when a custom action is received.
- property on_data_tag: CallbackSlot¶
Called when a data tag message is received.
- property on_asset_change: CallbackSlot¶
Called when an asset is added, removed, or modified. First arg is the new asset (None if removed); second is the previous asset (None if newly added).
- property on_app_configuration: CallbackSlot¶
Called when the app configuration changes.
- property is_connected: bool¶
Indicates whether the read loop is active, implying an established connection.
- property connection: StreamInterface¶
The stream connection interface for Kelvin messaging.
- property api: AsyncClient¶
The API client for direct connection with Kelvin API.
- property clock: ClockInterface¶
The clock interface for time operations.
- property assets: dict[str, AssetInfo]¶
Get all assets configured for this application.
It returns a dictionary where each key is the asset name, and the value is an AssetInfo object describing that asset’s properties, parameters, and datastreams.
The assets dictionary is dynamically updated whenever the application receives updates to asset properties or parameters, ensuring it always reflects the latest configuration.
- Returns:
- A dictionary where keys are asset names (strings) and values are AssetInfo instances
representing the current configuration and state of each asset.
- Return type:
Example
- {
- “asset1”: AssetInfo(
name=”asset1”, properties={
“tubing_length”: 25.0, “area”: 11.0
}, parameters={
“param-bool”: False, “param-number”: 7.5, “param-string”: “hello”
}, datastreams={
- “output1”: ResourceDatastream(
asset=KRNAsset(“asset1”), io_name=”output1”, datastream=Datastream(
name=”datastream1”, type=KMessageTypeData(“float”), unit=”m”
), access=”RO”, owned=True, configuration={}
)
}
)
}
- property app_configuration: dict[str, Any]¶
Get the application configuration.
- Returns:
A mapping of configuration sections to their values, matching the structure in app.yaml.
- Return type:
Example
- {
- “foo”: {
“conf_string”: “value1”, “conf_number”: 25, “conf_bool”: False,
}
}
- property config_received: Event¶
Event set when the initial configuration is received.
Use this asyncio.Event to wait until the application has loaded its initial app/asset parameters.
- Returns:
Event that becomes set once the initial configuration arrives.
- Return type:
Example
await app.config_received.wait()
- property loop: AbstractEventLoop¶
Return the running event loop once the app has started.
- property inputs: list[AppIO]¶
List all input metrics configured for the application.
- Each AppIO has:
name (str): the metric identifier.
data_type (str): the data type of the input.
- Returns:
Read-only list of configured input metrics.
- Return type:
List[AppIO]
- property outputs: list[AppIO]¶
List all output metrics configured for the application.
- Each AppIO has:
name (str): the metric identifier.
data_type (str): the data type of the output.
- Returns:
Read-only list of configured output metrics.
- Return type:
List[AppIO]
- property tasks: dict[str, Callable[[], Awaitable[Any]]]¶
Retrieve registered asynchronous tasks.
- Returns:
Dict of task names and callable that produce awaitables when called. Represents tasks scheduled or pending.
- Return type:
Dict[str, Callable[[], Awaitable]]
- async __aexit__(_exc_type, _exc_value, _tb)[source]¶
Support async context: disconnect on exit.
- Return type:
- Parameters:
_exc_type (type[E] | None)
_exc_value (E | None)
_tb (TracebackType | None)
- async connect()[source]¶
Establish connection, retrying indefinitely until success. Starts the read loop and waits for App Configuration before firing on_connect callback and tasks/timers.
- Return type:
- Returns:
None.
- Raises:
RuntimeError – If called without a running event loop.
- async disconnect()[source]¶
Cancel read loop, stop all tasks, fire on_disconnect, and close stream.
- Return type:
- async wait_for_processing()[source]¶
Wait for all stream processing to complete.
Joins all filter queues, blocking until every message that was routed has been processed (task_done called).
Note: Only works for @app.stream() handlers, windowing streams, and stream_filter() consumers. Raw filter() queue users must call task_done() themselves.
- Return type:
- async wait_for_tasks(timeout=5.0)[source]¶
Wait for finite tasks to settle.
Gives running tasks up to timeout real-time seconds to complete. Long-running (never-ending) tasks that are still pending after the timeout are silently ignored — they will be cancelled on disconnect.
- async publish(msg)[source]¶
Publish a message to KelvinStream.
Accepts either a Message instance or a MessageBuilder. If given a MessageBuilder, it is converted to a Message via to_message() before sending. Returns True on success, or False if the connection is unavailable.
- Parameters:
msg (
Union[Message,MessageBuilder[Any]]) –A Message to send directly, or
A MessageBuilder which will be converted to Message.
- Returns:
True if the message was sent successfully.
False if sending failed due to a ConnectionError.
- Return type:
Examples
message = Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0) success = await client.publish(message) if success:
print(“Published message”)
- else:
print(“Publish failed”)
- stream_filter(func)[source]¶
Creates a stream for the received Kelvin Messages based on a filter function. See filter.
- Parameters:
func (
Callable[[Message],TypeGuard[TypeVar(T, bound=Message)]]) – Filter function, it should receive a Message as argument and return bool.- Returns:
Async Generator that can be async iterated to receive filtered messages.
- Return type:
AsyncGenerator[Message, None]
- Yields:
Iterator[AsyncGenerator[Message, None]] – Yields the filtered messages.
- tumbling_window(window_size, assets=None, inputs=None, round_to=None)[source]¶
Creates a fixed, non-overlapping windowing.
- Parameters:
window_size (
timedelta) – Duration of each window.assets (
Optional[list[str]]) – Optional list of asset names to filter on.inputs (
Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.
- Returns:
An instance configured with the given parameters.
- Return type:
- hopping_window(window_size, hop_size, assets=None, inputs=None, round_to=None)[source]¶
Creates a window with fixed size and overlap.
- Parameters:
window_size (
timedelta) – Duration of each window.hop_size (
timedelta) – Step between window starts (defines overlap).assets (
Optional[list[str]]) – Optional list of asset names to filter on.inputs (
Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.
- Returns:
An instance configured with the given parameters.
- Return type:
- rolling_window(count_size, assets=None, inputs=None, round_to=None, slide=1)[source]¶
Creates a sliding count-based window over incoming data.
- Parameters:
count_size (
int) – Number of records per window.assets (
Optional[list[str]]) – Optional list of asset names to filter on.inputs (
Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.slide (
int) – Number of records to slide the window forward on each update.
- Returns:
An instance configured with the given parameters.
- Return type:
- stream(fn=None, *, assets=None, inputs=None)[source]¶
Register a per-message stream handler that is invoked for each incoming AssetDataMessage matching the optional assets/inputs filters.
Usage patterns:
@app.stream() async def my_stream(msg: AssetDataMessage): …
@app.stream(inputs=[“in1”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) def my_stream(msg: AssetDataMessage): …
def my_stream(msg: AssetDataMessage): … app.stream(my_stream, assets=[“asset1”], inputs=[“in1”])
The registered stream runs as an app task when the app connects.
- task(fn=None, *, name=None)[source]¶
Register a function as a task, either sync or async.
This method acts as both a decorator and a decorator factory. It supports the following usage patterns:
@app.task async def my_async_task(…): …
@app.task() def my_sync_task(…): …
@app.task(name=”custom.task.name”) def another_task(…): …
app.task(some_function)
- Parameters:
fn (
Union[Callable[[ParamSpec(P, bound=None)],TypeVar(R)],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]],None]) – The function to register. Can be sync or async. If not provided, a decorator is returned.name (
Optional[str]) – Optional name to register the task under. If not provided, the fully-qualified function name is used.
- Return type:
Union[Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]],Callable[[Union[Callable[[ParamSpec(P, bound=None)],TypeVar(R)],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]]]],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]]]]- Returns:
If fn is provided, returns the async-compatible task wrapper. If fn is None, returns a decorator that can be applied to a function.
- timer(fn=None, *, interval, name=None)[source]¶
- Overloads:
self, fn (TaskFunc[[], Any]), interval (float), name (Optional[str]) → AsyncFunc[[], Any]
self, interval (float), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]
- Parameters:
- Return type:
Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]
Register a function to be called at a repeating interval.
Usage patterns:
@app.timer(interval=5) def foo(): …
@app.timer(interval=5, name=”my timer”) async def foo(): …
def bar(): … app.timer(bar, interval=10) app.timer(bar, interval=10, name=”bar.timer”)
- schedule(fn=None, *, every=None, at=None, cron=None, timezone='UTC', start_time=None, interval=None, name=None)[source]¶
- Overloads:
self, fn (TaskFunc[[], Any]), every (Optional[str]), at (Optional[Union[str, list[str]]]), cron (Optional[str]), timezone (str), start_time (Optional[datetime]), interval (Optional[int]), name (Optional[str]) → AsyncFunc[[], Any]
self, every (Optional[str]), at (Optional[Union[str, list[str]]]), cron (Optional[str]), timezone (str), start_time (Optional[datetime]), interval (Optional[int]), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]
- Parameters:
- Return type:
Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]
Register a function to run on a cron-like schedule.
Supports two styles of schedule definition:
Human-readable:
@app.schedule(every=”day”, at=”19:00”, timezone=”Australia/Sydney”) async def daily_report(): …
@app.schedule(every=”monday”, at=”09:00”) async def weekly_meeting(): …
@app.schedule(every=”day”, at=[“09:00”, “17:00”]) async def twice_daily(): …
Cron expression:
@app.schedule(cron=”0 9 * * MON-FRI”, timezone=”Europe/London”) async def weekday_job(): …
With interval (every Nth occurrence):
@app.schedule(every=”monday”, at=”09:00”, interval=2, start_time=datetime(2026, 4, 6)) async def biweekly_sync(): …
- Parameters:
fn (
Union[Callable[[],Any],Callable[[],Awaitable[Any]],None]) – The function to register. Can be sync or async.every (
Optional[str]) – Human-readable recurrence (“day”, “monday”, “weekday”, etc.).at (
Union[str,list[str],None]) – Time(s) of day in “HH:MM” format. Required foreveryschedules. Can be a single string or a list of strings for multiple daily times.cron (
Optional[str]) – A standard 5-field cron expression. Mutually exclusive with every/at.timezone (
str) – IANA timezone name (default “UTC”).start_time (
Optional[datetime]) – Reference datetime for deterministic schedule alignment. When set, the scheduler computes fire times from this point.interval (
Optional[int]) – Fire every Nth cron match. When set withstart_time, the count survives restarts. Withoutstart_time, the first match always fires and the count resets on restart.
- Return type:
Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]
- async run_forever()[source]¶
Connects to the service and then waits indefinitely until cancelled. On cancellation, disconnects cleanly before propagating the cancellation.
- Return type:
- run()[source]¶
Synchronous entry point: - Starts an asyncio event loop to run run_forever(). - Blocks until run_forever() completes or is cancelled. - Allows Ctrl-C (KeyboardInterrupt) to stop cleanly.
- Return type:
- msg_is_control_change(msg)[source]¶
Check if a message is a control change type.
A control change is an asset data message where the datastream’s way is configured as input_cc_output or input_cc.
- Parameters:
msg (
Message) – The message to check.- Return type:
TypeGuard[AssetDataMessage]- Returns:
True if the message is a control change, False otherwise.
- final class kelvin.application.KelvinStream(config=None)[source]¶
Bases:
StreamInterfaceTCP stream implementation for Kelvin platform communication.
This class manages a persistent TCP connection to the Kelvin stream server, handling message serialization/deserialization and connection lifecycle.
- Parameters:
config (Optional[KelvinStreamConfig])
- _config¶
The stream configuration settings.
- _reader¶
The async stream reader (None when disconnected).
- _writer¶
The async stream writer (None when disconnected).
- __init__(config=None)[source]¶
Initialize the Kelvin Stream.
- Parameters:
config (
Optional[KelvinStreamConfig]) – Stream configuration settings. Uses defaults if not provided.- Return type:
None
- async connect()[source]¶
Connect to the Kelvin Stream server.
- Raises:
ConnectionError – If the stream server is unreachable.
- Return type:
- async read()[source]¶
Reads the next Kelvin Message
- Raises:
ConnectionError – When connection is unavailable.
- Returns:
the read Message
- Return type:
- async write(msg)[source]¶
Writes a Message to the Kelvin Stream
- Parameters:
msg (
Message) – Kelvin message to write- Raises:
ConnectionError – If the connection is lost.
- Returns:
True if the message was sent with success.
- Return type:
- class kelvin.application.AssetInfo(*args, **kwargs)[source]¶
Bases:
object- Parameters:
name (str)
properties (dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]])
parameters (dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]])
datastreams (dict[str, ResourceDatastream])
title (str | None)
asset_type_name (str | None)
asset_type_title (str | None)
- properties: dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- parameters: dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- datastreams: dict[str, ResourceDatastream] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- class kelvin.application.ResourceDatastream(*args, **kwargs)[source]¶
Bases:
object- Parameters:
- asset: KRNAsset¶
- datastream: Datastream¶
- class kelvin.application.Datastream(*args, **kwargs)[source]¶
Bases:
object- Parameters:
name (str)
type (KMessageType)
unit (str | None)
- type: KMessageType¶
Client¶
- class kelvin.application.client.CallbackSlot[source]¶
Bases:
objectA slot that stores an async callback and supports decorator registration.
- Supports two usage patterns:
# Direct assignment (backwards compatible) app.on_connect = my_handler
# Decorator @app.on_connect async def my_handler(): …
- class kelvin.application.client.Datastream(*args, **kwargs)[source]¶
Bases:
object- Parameters:
name (str)
type (KMessageType)
unit (str | None)
- type: KMessageType¶
- class kelvin.application.client.AppIO(*args, **kwargs)[source]¶
Bases:
Datastream- Parameters:
name (str)
type (KMessageType)
unit (str | None)
- class kelvin.application.client.ResourceDatastream(*args, **kwargs)[source]¶
Bases:
object- Parameters:
- asset: KRNAsset¶
- datastream: Datastream¶
- class kelvin.application.client.AssetInfo(*args, **kwargs)[source]¶
Bases:
object- Parameters:
name (str)
properties (dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]])
parameters (dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]])
datastreams (dict[str, ResourceDatastream])
title (str | None)
asset_type_name (str | None)
asset_type_title (str | None)
- properties: dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- parameters: dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- datastreams: dict[str, ResourceDatastream] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- class kelvin.application.client.KelvinApp(config=None, api=None, clock=None)[source]¶
Bases:
objectKelvin App to connect and interface with the KelvinStream.
After connecting, the connection is handled automatically in the background.
Use filters or filter_stream to easily listen for specific messages. Use register_callback methods to register callbacks for events like connect and disconnect. Use tasks to run background functions that can be async or sync. Use timers to run functions at regular intervals.
The app can be used as an async context manager, which will automatically connect on enter and disconnect on exit.
- Example usage:
- async with KelvinApp() as app:
await app.publish(Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0)) async for msg in app.stream_filter(filters.is_asset_data_message):
print(msg)
- Parameters:
config (Optional[KelvinStreamConfig])
api (Optional[AsyncClient])
clock (Optional[ClockInterface])
- property on_connect: CallbackSlot¶
Called when the connection is established.
- property on_disconnect: CallbackSlot¶
Called when the connection is closed.
- property on_message: CallbackSlot¶
Called on receipt of any message.
- property on_asset_input: CallbackSlot¶
Called when an asset data message is received.
- property on_control_change: CallbackSlot¶
Called when a control change message is received.
- property on_control_status: CallbackSlot¶
Called when a control status is received.
- property on_custom_action: CallbackSlot¶
Called when a custom action is received.
- property on_data_tag: CallbackSlot¶
Called when a data tag message is received.
- property on_asset_change: CallbackSlot¶
Called when an asset is added, removed, or modified. First arg is the new asset (None if removed); second is the previous asset (None if newly added).
- property on_app_configuration: CallbackSlot¶
Called when the app configuration changes.
- property is_connected: bool¶
Indicates whether the read loop is active, implying an established connection.
- property connection: StreamInterface¶
The stream connection interface for Kelvin messaging.
- property api: AsyncClient¶
The API client for direct connection with Kelvin API.
- property clock: ClockInterface¶
The clock interface for time operations.
- property assets: dict[str, AssetInfo]¶
Get all assets configured for this application.
It returns a dictionary where each key is the asset name, and the value is an AssetInfo object describing that asset’s properties, parameters, and datastreams.
The assets dictionary is dynamically updated whenever the application receives updates to asset properties or parameters, ensuring it always reflects the latest configuration.
- Returns:
- A dictionary where keys are asset names (strings) and values are AssetInfo instances
representing the current configuration and state of each asset.
- Return type:
Example
- {
- “asset1”: AssetInfo(
name=”asset1”, properties={
“tubing_length”: 25.0, “area”: 11.0
}, parameters={
“param-bool”: False, “param-number”: 7.5, “param-string”: “hello”
}, datastreams={
- “output1”: ResourceDatastream(
asset=KRNAsset(“asset1”), io_name=”output1”, datastream=Datastream(
name=”datastream1”, type=KMessageTypeData(“float”), unit=”m”
), access=”RO”, owned=True, configuration={}
)
}
)
}
- property app_configuration: dict[str, Any]¶
Get the application configuration.
- Returns:
A mapping of configuration sections to their values, matching the structure in app.yaml.
- Return type:
Example
- {
- “foo”: {
“conf_string”: “value1”, “conf_number”: 25, “conf_bool”: False,
}
}
- property config_received: Event¶
Event set when the initial configuration is received.
Use this asyncio.Event to wait until the application has loaded its initial app/asset parameters.
- Returns:
Event that becomes set once the initial configuration arrives.
- Return type:
Example
await app.config_received.wait()
- property loop: AbstractEventLoop¶
Return the running event loop once the app has started.
- property inputs: list[AppIO]¶
List all input metrics configured for the application.
- Each AppIO has:
name (str): the metric identifier.
data_type (str): the data type of the input.
- Returns:
Read-only list of configured input metrics.
- Return type:
List[AppIO]
- property outputs: list[AppIO]¶
List all output metrics configured for the application.
- Each AppIO has:
name (str): the metric identifier.
data_type (str): the data type of the output.
- Returns:
Read-only list of configured output metrics.
- Return type:
List[AppIO]
- property tasks: dict[str, Callable[[], Awaitable[Any]]]¶
Retrieve registered asynchronous tasks.
- Returns:
Dict of task names and callable that produce awaitables when called. Represents tasks scheduled or pending.
- Return type:
Dict[str, Callable[[], Awaitable]]
- async __aexit__(_exc_type, _exc_value, _tb)[source]¶
Support async context: disconnect on exit.
- Return type:
- Parameters:
_exc_type (type[E] | None)
_exc_value (E | None)
_tb (TracebackType | None)
- async connect()[source]¶
Establish connection, retrying indefinitely until success. Starts the read loop and waits for App Configuration before firing on_connect callback and tasks/timers.
- Return type:
- Returns:
None.
- Raises:
RuntimeError – If called without a running event loop.
- async disconnect()[source]¶
Cancel read loop, stop all tasks, fire on_disconnect, and close stream.
- Return type:
- async wait_for_processing()[source]¶
Wait for all stream processing to complete.
Joins all filter queues, blocking until every message that was routed has been processed (task_done called).
Note: Only works for @app.stream() handlers, windowing streams, and stream_filter() consumers. Raw filter() queue users must call task_done() themselves.
- Return type:
- async wait_for_tasks(timeout=5.0)[source]¶
Wait for finite tasks to settle.
Gives running tasks up to timeout real-time seconds to complete. Long-running (never-ending) tasks that are still pending after the timeout are silently ignored — they will be cancelled on disconnect.
- async publish(msg)[source]¶
Publish a message to KelvinStream.
Accepts either a Message instance or a MessageBuilder. If given a MessageBuilder, it is converted to a Message via to_message() before sending. Returns True on success, or False if the connection is unavailable.
- Parameters:
msg (
Union[Message,MessageBuilder[Any]]) –A Message to send directly, or
A MessageBuilder which will be converted to Message.
- Returns:
True if the message was sent successfully.
False if sending failed due to a ConnectionError.
- Return type:
Examples
message = Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0) success = await client.publish(message) if success:
print(“Published message”)
- else:
print(“Publish failed”)
- stream_filter(func)[source]¶
Creates a stream for the received Kelvin Messages based on a filter function. See filter.
- Parameters:
func (
Callable[[Message],TypeGuard[TypeVar(T, bound=Message)]]) – Filter function, it should receive a Message as argument and return bool.- Returns:
Async Generator that can be async iterated to receive filtered messages.
- Return type:
AsyncGenerator[Message, None]
- Yields:
Iterator[AsyncGenerator[Message, None]] – Yields the filtered messages.
- tumbling_window(window_size, assets=None, inputs=None, round_to=None)[source]¶
Creates a fixed, non-overlapping windowing.
- Parameters:
window_size (
timedelta) – Duration of each window.assets (
Optional[list[str]]) – Optional list of asset names to filter on.inputs (
Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.
- Returns:
An instance configured with the given parameters.
- Return type:
- hopping_window(window_size, hop_size, assets=None, inputs=None, round_to=None)[source]¶
Creates a window with fixed size and overlap.
- Parameters:
window_size (
timedelta) – Duration of each window.hop_size (
timedelta) – Step between window starts (defines overlap).assets (
Optional[list[str]]) – Optional list of asset names to filter on.inputs (
Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.
- Returns:
An instance configured with the given parameters.
- Return type:
- rolling_window(count_size, assets=None, inputs=None, round_to=None, slide=1)[source]¶
Creates a sliding count-based window over incoming data.
- Parameters:
count_size (
int) – Number of records per window.assets (
Optional[list[str]]) – Optional list of asset names to filter on.inputs (
Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.slide (
int) – Number of records to slide the window forward on each update.
- Returns:
An instance configured with the given parameters.
- Return type:
- stream(fn=None, *, assets=None, inputs=None)[source]¶
Register a per-message stream handler that is invoked for each incoming AssetDataMessage matching the optional assets/inputs filters.
Usage patterns:
@app.stream() async def my_stream(msg: AssetDataMessage): …
@app.stream(inputs=[“in1”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) def my_stream(msg: AssetDataMessage): …
def my_stream(msg: AssetDataMessage): … app.stream(my_stream, assets=[“asset1”], inputs=[“in1”])
The registered stream runs as an app task when the app connects.
- task(fn=None, *, name=None)[source]¶
Register a function as a task, either sync or async.
This method acts as both a decorator and a decorator factory. It supports the following usage patterns:
@app.task async def my_async_task(…): …
@app.task() def my_sync_task(…): …
@app.task(name=”custom.task.name”) def another_task(…): …
app.task(some_function)
- Parameters:
fn (
Union[Callable[[ParamSpec(P, bound=None)],TypeVar(R)],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]],None]) – The function to register. Can be sync or async. If not provided, a decorator is returned.name (
Optional[str]) – Optional name to register the task under. If not provided, the fully-qualified function name is used.
- Return type:
Union[Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]],Callable[[Union[Callable[[ParamSpec(P, bound=None)],TypeVar(R)],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]]]],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]]]]- Returns:
If fn is provided, returns the async-compatible task wrapper. If fn is None, returns a decorator that can be applied to a function.
- timer(fn=None, *, interval, name=None)[source]¶
- Overloads:
self, fn (TaskFunc[[], Any]), interval (float), name (Optional[str]) → AsyncFunc[[], Any]
self, interval (float), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]
- Parameters:
- Return type:
Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]
Register a function to be called at a repeating interval.
Usage patterns:
@app.timer(interval=5) def foo(): …
@app.timer(interval=5, name=”my timer”) async def foo(): …
def bar(): … app.timer(bar, interval=10) app.timer(bar, interval=10, name=”bar.timer”)
- schedule(fn=None, *, every=None, at=None, cron=None, timezone='UTC', start_time=None, interval=None, name=None)[source]¶
- Overloads:
self, fn (TaskFunc[[], Any]), every (Optional[str]), at (Optional[Union[str, list[str]]]), cron (Optional[str]), timezone (str), start_time (Optional[datetime]), interval (Optional[int]), name (Optional[str]) → AsyncFunc[[], Any]
self, every (Optional[str]), at (Optional[Union[str, list[str]]]), cron (Optional[str]), timezone (str), start_time (Optional[datetime]), interval (Optional[int]), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]
- Parameters:
- Return type:
Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]
Register a function to run on a cron-like schedule.
Supports two styles of schedule definition:
Human-readable:
@app.schedule(every=”day”, at=”19:00”, timezone=”Australia/Sydney”) async def daily_report(): …
@app.schedule(every=”monday”, at=”09:00”) async def weekly_meeting(): …
@app.schedule(every=”day”, at=[“09:00”, “17:00”]) async def twice_daily(): …
Cron expression:
@app.schedule(cron=”0 9 * * MON-FRI”, timezone=”Europe/London”) async def weekday_job(): …
With interval (every Nth occurrence):
@app.schedule(every=”monday”, at=”09:00”, interval=2, start_time=datetime(2026, 4, 6)) async def biweekly_sync(): …
- Parameters:
fn (
Union[Callable[[],Any],Callable[[],Awaitable[Any]],None]) – The function to register. Can be sync or async.every (
Optional[str]) – Human-readable recurrence (“day”, “monday”, “weekday”, etc.).at (
Union[str,list[str],None]) – Time(s) of day in “HH:MM” format. Required foreveryschedules. Can be a single string or a list of strings for multiple daily times.cron (
Optional[str]) – A standard 5-field cron expression. Mutually exclusive with every/at.timezone (
str) – IANA timezone name (default “UTC”).start_time (
Optional[datetime]) – Reference datetime for deterministic schedule alignment. When set, the scheduler computes fire times from this point.interval (
Optional[int]) – Fire every Nth cron match. When set withstart_time, the count survives restarts. Withoutstart_time, the first match always fires and the count resets on restart.
- Return type:
Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]
- async run_forever()[source]¶
Connects to the service and then waits indefinitely until cancelled. On cancellation, disconnects cleanly before propagating the cancellation.
- Return type:
- run()[source]¶
Synchronous entry point: - Starts an asyncio event loop to run run_forever(). - Blocks until run_forever() completes or is cancelled. - Allows Ctrl-C (KeyboardInterrupt) to stop cleanly.
- Return type:
- msg_is_control_change(msg)[source]¶
Check if a message is a control change type.
A control change is an asset data message where the datastream’s way is configured as input_cc_output or input_cc.
- Parameters:
msg (
Message) – The message to check.- Return type:
TypeGuard[AssetDataMessage]- Returns:
True if the message is a control change, False otherwise.
API Client¶
Bases:
ExceptionRaised when attempting to use an unavailable API client.
- kelvin.application.api_client.initialize_api_client()[source]¶
Validate required environment vars are available to create the API client Returns UnavailableApiClient mock client if requirements are not met.
- Returns:
The instantiated API client or an UnavailableApiClient if requirements are not met
- Return type:
AsyncClient
Config¶
Filters¶
Message filters for Kelvin applications.
This module provides filter functions for routing and filtering Kelvin messages based on their type, resource, or content. Filters return TypeGuard predicates that can be used with KelvinApp.filter() or KelvinApp.stream_filter().
Example
>>> from kelvin.application import filters
>>> queue = app.filter(filters.is_asset_data_message)
>>> async for msg in app.stream_filter(filters.asset_equals("my-asset")):
... print(msg)
- kelvin.application.filters.is_asset_data_message(msg)[source]¶
Check if the message is an Asset Data Message.
An Asset Data Message has a KRNAssetDataStream resource and a data type.
- Parameters:
msg (
Message) – The message to check.- Return type:
TypeGuard[AssetDataMessage]- Returns:
True if the message is an Asset Data Message.
- kelvin.application.filters.is_data_message(msg)[source]¶
Check if the message contains data (has a data type).
- kelvin.application.filters.is_control_status_message(msg)[source]¶
Check if the message is a Control Change Status.
- Return type:
TypeGuard[ControlChangeStatus]- Parameters:
msg (Message)
- kelvin.application.filters.resource_equals(resource)[source]¶
Create a filter that matches messages with the specified resource(s).
- kelvin.application.filters.input_equals(data)[source]¶
Create a filter that matches Asset Data Messages with the specified datastream(s).
- kelvin.application.filters.asset_equals(asset)[source]¶
Create a filter that matches messages for the specified asset(s).
- kelvin.application.filters.is_custom_action(msg)[source]¶
Check if the message is a Custom Action Message.
- Return type:
TypeGuard[CustomAction]- Parameters:
msg (Message)
- kelvin.application.filters.is_asset_data_quality_message(msg)[source]¶
Check if the message is an Asset Data Quality Message.
- Return type:
TypeGuard[AssetDataQualityMessage]- Parameters:
msg (Message)
- kelvin.application.filters.is_asset_data_stream_quality_message(msg)[source]¶
Check if the message is an Asset Data Stream Data Quality Message.
- Return type:
TypeGuard[AssetDataStreamDataQualityMessage]- Parameters:
msg (Message)
- kelvin.application.filters.is_data_quality_message(msg)[source]¶
Check if the message is a Data Quality Message.
- Return type:
TypeGuard[AssetDataQualityMessage|AssetDataStreamDataQualityMessage]- Parameters:
msg (Message)
Stream¶
Kelvin Stream interface for message communication.
This module provides the low-level stream interface for connecting to and communicating with the Kelvin platform via TCP sockets.
- Main Components:
StreamInterface: Abstract base class defining the stream protocol. KelvinStream: Concrete implementation for Kelvin platform communication. KelvinStreamConfig: Configuration settings for the stream connection.
- class kelvin.application.stream.StreamInterface[source]¶
Bases:
ABCAbstract interface for a connection to a Kelvin system.
This class defines the protocol for stream-based communication with the Kelvin platform. Implementations must provide connect, disconnect, read, and write operations.
- class kelvin.application.stream.KelvinStreamConfig(_case_sensitive=None, _nested_model_default_partial_update=None, _env_prefix=None, _env_prefix_target=None, _env_file=PosixPath('.'), _env_file_encoding=None, _env_ignore_empty=None, _env_nested_delimiter=None, _env_nested_max_split=None, _env_parse_none_str=None, _env_parse_enums=None, _cli_prog_name=None, _cli_parse_args=None, _cli_settings_source=None, _cli_parse_none_str=None, _cli_hide_none_type=None, _cli_avoid_json=None, _cli_enforce_required=None, _cli_use_class_docs_for_groups=None, _cli_exit_on_error=None, _cli_prefix=None, _cli_flag_prefix_char=None, _cli_implicit_flags=None, _cli_ignore_unknown_args=None, _cli_kebab_case=None, _cli_shortcuts=None, _secrets_dir=None, _build_sources=None, **values)[source]¶
Bases:
BaseSettingsConfiguration for the Kelvin Stream connection.
- Parameters:
_case_sensitive (bool | None)
_nested_model_default_partial_update (bool | None)
_env_prefix (str | None)
_env_prefix_target (EnvPrefixTarget | None)
_env_file (DotenvType | None)
_env_file_encoding (str | None)
_env_ignore_empty (bool | None)
_env_nested_delimiter (str | None)
_env_nested_max_split (int | None)
_env_parse_none_str (str | None)
_env_parse_enums (bool | None)
_cli_prog_name (str | None)
_cli_settings_source (CliSettingsSource[Any] | None)
_cli_parse_none_str (str | None)
_cli_hide_none_type (bool | None)
_cli_avoid_json (bool | None)
_cli_enforce_required (bool | None)
_cli_use_class_docs_for_groups (bool | None)
_cli_exit_on_error (bool | None)
_cli_prefix (str | None)
_cli_flag_prefix_char (str | None)
_cli_implicit_flags (bool | Literal['dual', 'toggle'] | None)
_cli_ignore_unknown_args (bool | None)
_cli_kebab_case (bool | Literal['all', 'no_enums'] | None)
_secrets_dir (PathType | None)
_build_sources (tuple[tuple[PydanticBaseSettingsSource, ...], dict[str, Any]] | None)
ip (str)
port (int)
limit (int)
- ip¶
The IP address of the Kelvin stream server. Default: “127.0.0.1”.
- port¶
The port number for the stream connection. Default: 49167.
- limit¶
Maximum buffer size in bytes for the stream reader. Default: 4MB.
- Environment Variables:
KELVIN_STREAM_IP: Override the IP address. KELVIN_STREAM_PORT: Override the port number. KELVIN_STREAM_LIMIT: Override the buffer limit.
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'KELVIN_STREAM_', 'env_prefix_target': 'variable', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- final class kelvin.application.stream.KelvinStream(config=None)[source]¶
Bases:
StreamInterfaceTCP stream implementation for Kelvin platform communication.
This class manages a persistent TCP connection to the Kelvin stream server, handling message serialization/deserialization and connection lifecycle.
- Parameters:
config (Optional[KelvinStreamConfig])
- _config¶
The stream configuration settings.
- _reader¶
The async stream reader (None when disconnected).
- _writer¶
The async stream writer (None when disconnected).
- __init__(config=None)[source]¶
Initialize the Kelvin Stream.
- Parameters:
config (
Optional[KelvinStreamConfig]) – Stream configuration settings. Uses defaults if not provided.- Return type:
None
- async connect()[source]¶
Connect to the Kelvin Stream server.
- Raises:
ConnectionError – If the stream server is unreachable.
- Return type:
- async read()[source]¶
Reads the next Kelvin Message
- Raises:
ConnectionError – When connection is unavailable.
- Returns:
the read Message
- Return type:
- async write(msg)[source]¶
Writes a Message to the Kelvin Stream
- Parameters:
msg (
Message) – Kelvin message to write- Raises:
ConnectionError – If the connection is lost.
- Returns:
True if the message was sent with success.
- Return type:
Timer¶
Async timer with drift correction.
This module provides a Timer class for executing periodic tasks with automatic clock drift correction to maintain consistent intervals.
- final class kelvin.application.timer.Timer(interval, name, max_drift_correction=0.1, clock=None)[source]¶
Bases:
objectA repeating async timer that corrects clock drift.
The Timer automatically adjusts sleep intervals to compensate for execution time and system clock drift, ensuring consistent timing over extended periods.
- Parameters:
- interval¶
The target interval between timer fires in seconds.
- name¶
A descriptive name for the timer (used in logging).
- max_drift¶
Maximum drift before logging a warning.
- min_interval¶
Minimum allowed sleep interval.
- max_interval¶
Maximum allowed sleep interval.
- iteration¶
Number of times the timer has fired.
- overlaps¶
Number of times execution exceeded the interval.
Example
>>> timer = Timer(interval=5.0, name="my-timer") >>> async for _ in timer: ... await do_periodic_work()
Scheduler¶
Async scheduler for cron-like recurring tasks.
This module provides a Scheduler class for executing tasks on a cron-like schedule with timezone support, and utilities for converting human-readable schedule parameters into cron expressions.
- kelvin.application.scheduler.parse_at(at)[source]¶
Parse and validate at time values.
- Parameters:
at (
Union[str,list[str]]) – A single “HH:MM” string or a list of them.- Return type:
- Returns:
A sorted list of (hour, minute) tuples, deduplicated and validated.
- Raises:
ValueError – If any value has an invalid format or is out of range, or if there are duplicate entries.
- kelvin.application.scheduler.build_cron_expressions(*, every=None, at=None, cron=None)[source]¶
Convert schedule parameters into one or more cron expression strings.
When
atcontains times with different minutes (e.g.["09:30", "17:45"]), multiple cron expressions are returned since a single expression cannot represent different minute values for different hours.- Parameters:
- Return type:
- Returns:
A list of valid cron expression strings.
- Raises:
ValueError – On invalid or conflicting parameters.
- final class kelvin.application.scheduler.Scheduler(cron_expressions, name, tz, start_time=None, interval=None, clock=None)[source]¶
Bases:
objectAn async iterator that yields at each scheduled fire time.
Computes the next fire time using one or more cron expressions with timezone support. Supports optional
start_timefor deterministic alignment andintervalfor Nth-occurrence scheduling.On startup the scheduler uses
start_timeas the scheduling base when provided. Ifstart_timeis in the past, it fast-forwards through all past fire times (counting for interval alignment) without yielding, so a far-paststart_timenever triggers past executions. Ifstart_timeis in the future, scheduling is aligned from that future time and no fire time will be yielded before it.- Parameters:
- cron_expressions¶
The cron expressions defining the schedule.
- tz¶
The timezone for interpreting the schedule.
- name¶
A descriptive name for the scheduler (used in logging).
- interval¶
Fire every Nth cron match (
Nonemeans every match).
- iteration¶
Number of times the scheduler has actually fired.
Version¶
Window¶
- kelvin.application.window.quantize_time(dt, step=None, *, mode='nearest', anchor=None)[source]¶
Quantize a datetime to a fixed step.
Accepts naive datetimes and treats them as UTC.
Uses integer microseconds to avoid floating-point rounding issues.
If step is None, return the input unchanged.
Parameters¶
- dtdatetime
Timestamp to quantize. Naive values are interpreted as UTC.
- steptimedelta or None
Quantization step. When None, no quantization is applied.
- mode{“nearest”, “floor”, “ceil”}
Rounding mode. Defaults to “nearest”.
- anchordatetime or None
Grid anchor. When None, use the Unix epoch at UTC.
Returns¶
- datetime
Quantized datetime with the same tzinfo semantics as input, normalized to UTC if naive.
- rtype:
- kelvin.application.window.round_nearest_time(dt, round_to=None)[source]¶
Backwards-compatible wrapper that rounds to the nearest tick.
- class kelvin.application.window.BaseWindow(assets, inputs, queue, *, round_to=None, buffer_size=None, clock=None)[source]¶
Bases:
objectManage per-asset in-memory buffers and DataFrames for streaming data.
Design¶
Each asset has a deque of (timestamp, {stream: payload}). The deque can be bounded by buffer_size.
Timestamps are normalized to UTC and optionally quantized.
DataFrames are per asset, indexed by UTC timestamps, with columns equal to inputs.
Duplicate timestamps within a buffer are merged into a single row before appending.
- Parameters:
queue (asyncio.Queue[AssetDataMessage])
round_to (Optional[timedelta])
buffer_size (Optional[int])
clock (Optional['ClockInterface'])
- class kelvin.application.window.BaseTimeWindow(assets, inputs, queue, window_size, hop_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None, clock=None)[source]¶
Bases:
BaseWindowTime-based windowing of data streams.
Parameters¶
- assetslist[str]
Asset identifiers to include.
- inputslist[str]
Data stream identifiers to include as DataFrame columns.
- queueasyncio.Queue[AssetDataMessage]
Source of incoming messages.
- window_sizetimedelta
Duration of each window.
- hop_sizetimedelta
Time between the start of consecutive windows.
- round_totimedelta or None
Optional timestamp quantization step.
- align_steptimedelta or None
Optional alignment step for initial window start. If None, no extra alignment.
- allowed_latenesstimedelta or None
If provided, accept events older than window_start by up to this amount.
- buffer_sizeint or None
Optional bound for per-asset buffer deques.
Notes¶
Naive window_start values are interpreted as UTC.
Message timestamps that are naive are also treated as UTC.
- async stream(window_start=None)[source]¶
Continuously emit windowed DataFrames per asset.
Behavior¶
Align the first emission to window_start + window_size.
Sleep until that first boundary, then drain and emit.
Continue emitting every hop_size using a Timer.
Parameters¶
- window_startdatetime or None
Start of the first window. When None, use current UTC time. Naive values are interpreted as UTC. The start is optionally aligned by align_step.
- Parameters:
queue (asyncio.Queue[AssetDataMessage])
window_size (timedelta)
hop_size (timedelta)
round_to (Optional[timedelta])
align_step (Optional[timedelta])
allowed_lateness (Optional[timedelta])
buffer_size (Optional[int])
clock (Optional['ClockInterface'])
- class kelvin.application.window.TumblingWindow(assets, inputs, queue, window_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None, clock=None)[source]¶
Bases:
BaseTimeWindowNon-overlapping time windows where hop_size equals window_size.
- Parameters:
queue (asyncio.Queue[AssetDataMessage])
window_size (timedelta)
round_to (Optional[timedelta])
align_step (Optional[timedelta])
allowed_lateness (Optional[timedelta])
buffer_size (Optional[int])
clock (Optional['ClockInterface'])
- class kelvin.application.window.HoppingWindow(assets, inputs, queue, window_size, hop_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None, clock=None)[source]¶
Bases:
BaseTimeWindowOverlapping time windows when hop_size is smaller than window_size.
- Parameters:
queue (asyncio.Queue[AssetDataMessage])
window_size (timedelta)
hop_size (timedelta)
round_to (Optional[timedelta])
align_step (Optional[timedelta])
allowed_lateness (Optional[timedelta])
buffer_size (Optional[int])
clock (Optional['ClockInterface'])
- class kelvin.application.window.RollingWindow(assets, inputs, queue, count_size, *, slide=1, round_to=None, buffer_size=None, clock=None)[source]¶
Bases:
BaseWindowRolling window based on a fixed count of messages per asset.
Notes¶
The internal per-asset deque is bounded to prevent unbounded growth.
The DataFrame yielded for an asset represents the last count_size items after merging duplicate timestamps inside that slice.
- Parameters:
queue (asyncio.Queue[AssetDataMessage])
count_size (int)
slide (int)
round_to (Optional[timedelta])
buffer_size (Optional[int])
clock (Optional['ClockInterface'])