Application

The application module provides the core framework for building Kelvin applications.

Kelvin Application SDK.

This module provides the core components for building Kelvin applications that connect to and interact with the Kelvin platform.

Main Components:

KelvinApp: The main application class for connecting to Kelvin and processing messages. KelvinStream: Low-level stream interface for Kelvin communication. AssetInfo: Information about configured assets. Datastream: Datastream configuration. ResourceDatastream: Asset datastream configuration with access control.

Example

>>> from kelvin.application import KelvinApp
>>> async with KelvinApp() as app:
...     async for msg in app.stream_filter(filters.is_asset_data_message):
...         print(msg)
class kelvin.application.KelvinApp(config=None, api=None, clock=None)[source]

Bases: object

Kelvin App to connect and interface with the KelvinStream.

After connecting, the connection is handled automatically in the background.

Use filters or filter_stream to easily listen for specific messages. Use register_callback methods to register callbacks for events like connect and disconnect. Use tasks to run background functions that can be async or sync. Use timers to run functions at regular intervals.

The app can be used as an async context manager, which will automatically connect on enter and disconnect on exit.

Example usage:
async with KelvinApp() as app:

await app.publish(Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0)) async for msg in app.stream_filter(filters.is_asset_data_message):

print(msg)

Parameters:
  • config (Optional[KelvinStreamConfig])

  • api (Optional[AsyncClient])

  • clock (Optional[ClockInterface])

property on_connect: CallbackSlot

Called when the connection is established.

property on_disconnect: CallbackSlot

Called when the connection is closed.

property on_message: CallbackSlot

Called on receipt of any message.

property on_asset_input: CallbackSlot

Called when an asset data message is received.

property on_control_change: CallbackSlot

Called when a control change message is received.

property on_control_status: CallbackSlot

Called when a control status is received.

property on_custom_action: CallbackSlot

Called when a custom action is received.

property on_data_tag: CallbackSlot

Called when a data tag message is received.

property on_asset_change: CallbackSlot

Called when an asset is added, removed, or modified. First arg is the new asset (None if removed); second is the previous asset (None if newly added).

property on_app_configuration: CallbackSlot

Called when the app configuration changes.

property is_connected: bool

Indicates whether the read loop is active, implying an established connection.

property connection: StreamInterface

The stream connection interface for Kelvin messaging.

property api: AsyncClient

The API client for direct connection with Kelvin API.

property clock: ClockInterface

The clock interface for time operations.

property assets: dict[str, AssetInfo]

Get all assets configured for this application.

It returns a dictionary where each key is the asset name, and the value is an AssetInfo object describing that asset’s properties, parameters, and datastreams.

The assets dictionary is dynamically updated whenever the application receives updates to asset properties or parameters, ensuring it always reflects the latest configuration.

Returns:

A dictionary where keys are asset names (strings) and values are AssetInfo instances

representing the current configuration and state of each asset.

Return type:

Dict[str,AssetInfo]

Example

{
“asset1”: AssetInfo(

name=”asset1”, properties={

“tubing_length”: 25.0, “area”: 11.0

}, parameters={

“param-bool”: False, “param-number”: 7.5, “param-string”: “hello”

}, datastreams={

“output1”: ResourceDatastream(

asset=KRNAsset(“asset1”), io_name=”output1”, datastream=Datastream(

name=”datastream1”, type=KMessageTypeData(“float”), unit=”m”

), access=”RO”, owned=True, configuration={}

)

}

)

}

property app_configuration: dict[str, Any]

Get the application configuration.

Returns:

A mapping of configuration sections to their values, matching the structure in app.yaml.

Return type:

dict

Example

{
“foo”: {

“conf_string”: “value1”, “conf_number”: 25, “conf_bool”: False,

}

}

property config_received: Event

Event set when the initial configuration is received.

Use this asyncio.Event to wait until the application has loaded its initial app/asset parameters.

Returns:

Event that becomes set once the initial configuration arrives.

Return type:

asyncio.Event

Example

await app.config_received.wait()

property loop: AbstractEventLoop

Return the running event loop once the app has started.

property inputs: list[AppIO]

List all input metrics configured for the application.

Each AppIO has:
  • name (str): the metric identifier.

  • data_type (str): the data type of the input.

Returns:

Read-only list of configured input metrics.

Return type:

List[AppIO]

property outputs: list[AppIO]

List all output metrics configured for the application.

Each AppIO has:
  • name (str): the metric identifier.

  • data_type (str): the data type of the output.

Returns:

Read-only list of configured output metrics.

Return type:

List[AppIO]

property tasks: dict[str, Callable[[], Awaitable[Any]]]

Retrieve registered asynchronous tasks.

Returns:

Dict of task names and callable that produce awaitables when called. Represents tasks scheduled or pending.

Return type:

Dict[str, Callable[[], Awaitable]]

async __aenter__()[source]

Support async context: connect on enter.

Return type:

Self

async __aexit__(_exc_type, _exc_value, _tb)[source]

Support async context: disconnect on exit.

Return type:

Optional[bool]

Parameters:
async connect()[source]

Establish connection, retrying indefinitely until success. Starts the read loop and waits for App Configuration before firing on_connect callback and tasks/timers.

Return type:

None

Returns:

None.

Raises:

RuntimeError – If called without a running event loop.

async disconnect()[source]

Cancel read loop, stop all tasks, fire on_disconnect, and close stream.

Return type:

None

async wait_for_processing()[source]

Wait for all stream processing to complete.

Joins all filter queues, blocking until every message that was routed has been processed (task_done called).

Note: Only works for @app.stream() handlers, windowing streams, and stream_filter() consumers. Raw filter() queue users must call task_done() themselves.

Return type:

None

async wait_for_tasks(timeout=5.0)[source]

Wait for finite tasks to settle.

Gives running tasks up to timeout real-time seconds to complete. Long-running (never-ending) tasks that are still pending after the timeout are silently ignored — they will be cancelled on disconnect.

Return type:

None

Parameters:

timeout (float)

async publish(msg)[source]

Publish a message to KelvinStream.

Accepts either a Message instance or a MessageBuilder. If given a MessageBuilder, it is converted to a Message via to_message() before sending. Returns True on success, or False if the connection is unavailable.

Parameters:

msg (Union[Message, MessageBuilder[Any]]) –

  • A Message to send directly, or

  • A MessageBuilder which will be converted to Message.

Returns:

  • True if the message was sent successfully.

  • False if sending failed due to a ConnectionError.

Return type:

bool

Examples

message = Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0) success = await client.publish(message) if success:

print(“Published message”)

else:

print(“Publish failed”)

filter(func)[source]

Creates a filter for the received Kelvin Messages based on a filter function.

Parameters:

func (Callable[[Message], TypeGuard[TypeVar(T, bound= Message)]]) – Filter function, it should receive a Message as argument and return bool.

Returns:

Returns a asyncio queue to receive the filtered messages.

Return type:

Queue[TypeVar(T, bound= Message)]

stream_filter(func)[source]

Creates a stream for the received Kelvin Messages based on a filter function. See filter.

Parameters:

func (Callable[[Message], TypeGuard[TypeVar(T, bound= Message)]]) – Filter function, it should receive a Message as argument and return bool.

Returns:

Async Generator that can be async iterated to receive filtered messages.

Return type:

AsyncGenerator[Message, None]

Yields:

Iterator[AsyncGenerator[Message, None]] – Yields the filtered messages.

tumbling_window(window_size, assets=None, inputs=None, round_to=None)[source]

Creates a fixed, non-overlapping windowing.

Parameters:
  • window_size (timedelta) – Duration of each window.

  • assets (Optional[list[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

Returns:

An instance configured with the given parameters.

Return type:

TumblingWindow

hopping_window(window_size, hop_size, assets=None, inputs=None, round_to=None)[source]

Creates a window with fixed size and overlap.

Parameters:
  • window_size (timedelta) – Duration of each window.

  • hop_size (timedelta) – Step between window starts (defines overlap).

  • assets (Optional[list[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

Returns:

An instance configured with the given parameters.

Return type:

HoppingWindow

rolling_window(count_size, assets=None, inputs=None, round_to=None, slide=1)[source]

Creates a sliding count-based window over incoming data.

Parameters:
  • count_size (int) – Number of records per window.

  • assets (Optional[list[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

  • slide (int) – Number of records to slide the window forward on each update.

Returns:

An instance configured with the given parameters.

Return type:

RollingWindow

stream(fn=None, *, assets=None, inputs=None)[source]

Register a per-message stream handler that is invoked for each incoming AssetDataMessage matching the optional assets/inputs filters.

Usage patterns:

@app.stream() async def my_stream(msg: AssetDataMessage): …

@app.stream(inputs=[“in1”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) def my_stream(msg: AssetDataMessage): …

def my_stream(msg: AssetDataMessage): … app.stream(my_stream, assets=[“asset1”], inputs=[“in1”])

The registered stream runs as an app task when the app connects.

Return type:

Union[Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]], Callable[[Union[Callable[[ParamSpec(P, bound= None)], TypeVar(R)], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]]

Parameters:
task(fn=None, *, name=None)[source]

Register a function as a task, either sync or async.

This method acts as both a decorator and a decorator factory. It supports the following usage patterns:

@app.task async def my_async_task(…): …

@app.task() def my_sync_task(…): …

@app.task(name=”custom.task.name”) def another_task(…): …

app.task(some_function)

Parameters:
Return type:

Union[Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]], Callable[[Union[Callable[[ParamSpec(P, bound= None)], TypeVar(R)], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]]

Returns:

If fn is provided, returns the async-compatible task wrapper. If fn is None, returns a decorator that can be applied to a function.

timer(fn=None, *, interval, name=None)[source]
Overloads:
  • self, fn (TaskFunc[[], Any]), interval (float), name (Optional[str]) → AsyncFunc[[], Any]

  • self, interval (float), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]

Parameters:
Return type:

Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]

Register a function to be called at a repeating interval.

Usage patterns:

@app.timer(interval=5) def foo(): …

@app.timer(interval=5, name=”my timer”) async def foo(): …

def bar(): … app.timer(bar, interval=10) app.timer(bar, interval=10, name=”bar.timer”)

schedule(fn=None, *, every=None, at=None, cron=None, timezone='UTC', start_time=None, interval=None, name=None)[source]
Overloads:
  • self, fn (TaskFunc[[], Any]), every (Optional[str]), at (Optional[Union[str, list[str]]]), cron (Optional[str]), timezone (str), start_time (Optional[datetime]), interval (Optional[int]), name (Optional[str]) → AsyncFunc[[], Any]

  • self, every (Optional[str]), at (Optional[Union[str, list[str]]]), cron (Optional[str]), timezone (str), start_time (Optional[datetime]), interval (Optional[int]), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]

Parameters:
Return type:

Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]

Register a function to run on a cron-like schedule.

Supports two styles of schedule definition:

Human-readable:

@app.schedule(every=”day”, at=”19:00”, timezone=”Australia/Sydney”) async def daily_report(): …

@app.schedule(every=”monday”, at=”09:00”) async def weekly_meeting(): …

@app.schedule(every=”day”, at=[“09:00”, “17:00”]) async def twice_daily(): …

Cron expression:

@app.schedule(cron=”0 9 * * MON-FRI”, timezone=”Europe/London”) async def weekday_job(): …

With interval (every Nth occurrence):

@app.schedule(every=”monday”, at=”09:00”, interval=2, start_time=datetime(2026, 4, 6)) async def biweekly_sync(): …

Parameters:
  • fn (Union[Callable[[], Any], Callable[[], Awaitable[Any]], None]) – The function to register. Can be sync or async.

  • every (Optional[str]) – Human-readable recurrence (“day”, “monday”, “weekday”, etc.).

  • at (Union[str, list[str], None]) – Time(s) of day in “HH:MM” format. Required for every schedules. Can be a single string or a list of strings for multiple daily times.

  • cron (Optional[str]) – A standard 5-field cron expression. Mutually exclusive with every/at.

  • timezone (str) – IANA timezone name (default “UTC”).

  • start_time (Optional[datetime]) – Reference datetime for deterministic schedule alignment. When set, the scheduler computes fire times from this point.

  • interval (Optional[int]) – Fire every Nth cron match. When set with start_time, the count survives restarts. Without start_time, the first match always fires and the count resets on restart.

  • name (Optional[str]) – Optional task name for logging.

Return type:

Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]

async run_forever()[source]

Connects to the service and then waits indefinitely until cancelled. On cancellation, disconnects cleanly before propagating the cancellation.

Return type:

None

run()[source]

Synchronous entry point: - Starts an asyncio event loop to run run_forever(). - Blocks until run_forever() completes or is cancelled. - Allows Ctrl-C (KeyboardInterrupt) to stop cleanly.

Return type:

None

msg_is_control_change(msg)[source]

Check if a message is a control change type.

A control change is an asset data message where the datastream’s way is configured as input_cc_output or input_cc.

Parameters:

msg (Message) – The message to check.

Return type:

TypeGuard[AssetDataMessage]

Returns:

True if the message is a control change, False otherwise.

final class kelvin.application.KelvinStream(config=None)[source]

Bases: StreamInterface

TCP stream implementation for Kelvin platform communication.

This class manages a persistent TCP connection to the Kelvin stream server, handling message serialization/deserialization and connection lifecycle.

Parameters:

config (Optional[KelvinStreamConfig])

_config

The stream configuration settings.

_reader

The async stream reader (None when disconnected).

_writer

The async stream writer (None when disconnected).

__init__(config=None)[source]

Initialize the Kelvin Stream.

Parameters:

config (Optional[KelvinStreamConfig]) – Stream configuration settings. Uses defaults if not provided.

Return type:

None

async connect()[source]

Connect to the Kelvin Stream server.

Raises:

ConnectionError – If the stream server is unreachable.

Return type:

None

async disconnect()[source]

Disconnects from Kelvin Stream

Return type:

None

async read()[source]

Reads the next Kelvin Message

Raises:

ConnectionError – When connection is unavailable.

Returns:

the read Message

Return type:

Message

async write(msg)[source]

Writes a Message to the Kelvin Stream

Parameters:

msg (Message) – Kelvin message to write

Raises:

ConnectionError – If the connection is lost.

Returns:

True if the message was sent with success.

Return type:

bool

class kelvin.application.AssetInfo(*args, **kwargs)[source]

Bases: object

Parameters:
name: str
properties: dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
parameters: dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
datastreams: dict[str, ResourceDatastream] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
title: str | None = None
asset_type_name: str | None = None
asset_type_title: str | None = None
class kelvin.application.ResourceDatastream(*args, **kwargs)[source]

Bases: object

Parameters:
asset: KRNAsset
io_name: str
datastream: Datastream
configuration: dict[str, Any] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
way: WayEnum = 'output'
owned: bool = False
access: Literal['RO', 'RW', 'WO'] = 'RO'
class kelvin.application.Datastream(*args, **kwargs)[source]

Bases: object

Parameters:
name: str
type: KMessageType
unit: str | None = None

Client

class kelvin.application.client.CallbackSlot[source]

Bases: object

A slot that stores an async callback and supports decorator registration.

Supports two usage patterns:

# Direct assignment (backwards compatible) app.on_connect = my_handler

# Decorator @app.on_connect async def my_handler(): …

set(fn)[source]

Store handler, wrapping sync -> async if needed.

Return type:

None

Parameters:

fn (Callable[[...], Any] | None)

__call__(fn)[source]

Decorator use: @app.on_connect

Return type:

Callable[..., Any]

Parameters:

fn (Callable[[...], Any])

async invoke(*args, **kwargs)[source]

Internal use only. Invoke the stored handler if set.

Return type:

None

Parameters:
class kelvin.application.client.Datastream(*args, **kwargs)[source]

Bases: object

Parameters:
name: str
type: KMessageType
unit: str | None = None
class kelvin.application.client.AppIO(*args, **kwargs)[source]

Bases: Datastream

Parameters:
class kelvin.application.client.ResourceDatastream(*args, **kwargs)[source]

Bases: object

Parameters:
asset: KRNAsset
io_name: str
datastream: Datastream
configuration: dict[str, Any] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
way: WayEnum = 'output'
owned: bool = False
access: Literal['RO', 'RW', 'WO'] = 'RO'
class kelvin.application.client.AssetInfo(*args, **kwargs)[source]

Bases: object

Parameters:
name: str
properties: dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
parameters: dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
datastreams: dict[str, ResourceDatastream] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
title: str | None = None
asset_type_name: str | None = None
asset_type_title: str | None = None
class kelvin.application.client.KelvinApp(config=None, api=None, clock=None)[source]

Bases: object

Kelvin App to connect and interface with the KelvinStream.

After connecting, the connection is handled automatically in the background.

Use filters or filter_stream to easily listen for specific messages. Use register_callback methods to register callbacks for events like connect and disconnect. Use tasks to run background functions that can be async or sync. Use timers to run functions at regular intervals.

The app can be used as an async context manager, which will automatically connect on enter and disconnect on exit.

Example usage:
async with KelvinApp() as app:

await app.publish(Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0)) async for msg in app.stream_filter(filters.is_asset_data_message):

print(msg)

Parameters:
  • config (Optional[KelvinStreamConfig])

  • api (Optional[AsyncClient])

  • clock (Optional[ClockInterface])

property on_connect: CallbackSlot

Called when the connection is established.

property on_disconnect: CallbackSlot

Called when the connection is closed.

property on_message: CallbackSlot

Called on receipt of any message.

property on_asset_input: CallbackSlot

Called when an asset data message is received.

property on_control_change: CallbackSlot

Called when a control change message is received.

property on_control_status: CallbackSlot

Called when a control status is received.

property on_custom_action: CallbackSlot

Called when a custom action is received.

property on_data_tag: CallbackSlot

Called when a data tag message is received.

property on_asset_change: CallbackSlot

Called when an asset is added, removed, or modified. First arg is the new asset (None if removed); second is the previous asset (None if newly added).

property on_app_configuration: CallbackSlot

Called when the app configuration changes.

property is_connected: bool

Indicates whether the read loop is active, implying an established connection.

property connection: StreamInterface

The stream connection interface for Kelvin messaging.

property api: AsyncClient

The API client for direct connection with Kelvin API.

property clock: ClockInterface

The clock interface for time operations.

property assets: dict[str, AssetInfo]

Get all assets configured for this application.

It returns a dictionary where each key is the asset name, and the value is an AssetInfo object describing that asset’s properties, parameters, and datastreams.

The assets dictionary is dynamically updated whenever the application receives updates to asset properties or parameters, ensuring it always reflects the latest configuration.

Returns:

A dictionary where keys are asset names (strings) and values are AssetInfo instances

representing the current configuration and state of each asset.

Return type:

Dict[str,AssetInfo]

Example

{
“asset1”: AssetInfo(

name=”asset1”, properties={

“tubing_length”: 25.0, “area”: 11.0

}, parameters={

“param-bool”: False, “param-number”: 7.5, “param-string”: “hello”

}, datastreams={

“output1”: ResourceDatastream(

asset=KRNAsset(“asset1”), io_name=”output1”, datastream=Datastream(

name=”datastream1”, type=KMessageTypeData(“float”), unit=”m”

), access=”RO”, owned=True, configuration={}

)

}

)

}

property app_configuration: dict[str, Any]

Get the application configuration.

Returns:

A mapping of configuration sections to their values, matching the structure in app.yaml.

Return type:

dict

Example

{
“foo”: {

“conf_string”: “value1”, “conf_number”: 25, “conf_bool”: False,

}

}

property config_received: Event

Event set when the initial configuration is received.

Use this asyncio.Event to wait until the application has loaded its initial app/asset parameters.

Returns:

Event that becomes set once the initial configuration arrives.

Return type:

asyncio.Event

Example

await app.config_received.wait()

property loop: AbstractEventLoop

Return the running event loop once the app has started.

property inputs: list[AppIO]

List all input metrics configured for the application.

Each AppIO has:
  • name (str): the metric identifier.

  • data_type (str): the data type of the input.

Returns:

Read-only list of configured input metrics.

Return type:

List[AppIO]

property outputs: list[AppIO]

List all output metrics configured for the application.

Each AppIO has:
  • name (str): the metric identifier.

  • data_type (str): the data type of the output.

Returns:

Read-only list of configured output metrics.

Return type:

List[AppIO]

property tasks: dict[str, Callable[[], Awaitable[Any]]]

Retrieve registered asynchronous tasks.

Returns:

Dict of task names and callable that produce awaitables when called. Represents tasks scheduled or pending.

Return type:

Dict[str, Callable[[], Awaitable]]

async __aenter__()[source]

Support async context: connect on enter.

Return type:

Self

async __aexit__(_exc_type, _exc_value, _tb)[source]

Support async context: disconnect on exit.

Return type:

Optional[bool]

Parameters:
async connect()[source]

Establish connection, retrying indefinitely until success. Starts the read loop and waits for App Configuration before firing on_connect callback and tasks/timers.

Return type:

None

Returns:

None.

Raises:

RuntimeError – If called without a running event loop.

async disconnect()[source]

Cancel read loop, stop all tasks, fire on_disconnect, and close stream.

Return type:

None

async wait_for_processing()[source]

Wait for all stream processing to complete.

Joins all filter queues, blocking until every message that was routed has been processed (task_done called).

Note: Only works for @app.stream() handlers, windowing streams, and stream_filter() consumers. Raw filter() queue users must call task_done() themselves.

Return type:

None

async wait_for_tasks(timeout=5.0)[source]

Wait for finite tasks to settle.

Gives running tasks up to timeout real-time seconds to complete. Long-running (never-ending) tasks that are still pending after the timeout are silently ignored — they will be cancelled on disconnect.

Return type:

None

Parameters:

timeout (float)

async publish(msg)[source]

Publish a message to KelvinStream.

Accepts either a Message instance or a MessageBuilder. If given a MessageBuilder, it is converted to a Message via to_message() before sending. Returns True on success, or False if the connection is unavailable.

Parameters:

msg (Union[Message, MessageBuilder[Any]]) –

  • A Message to send directly, or

  • A MessageBuilder which will be converted to Message.

Returns:

  • True if the message was sent successfully.

  • False if sending failed due to a ConnectionError.

Return type:

bool

Examples

message = Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0) success = await client.publish(message) if success:

print(“Published message”)

else:

print(“Publish failed”)

filter(func)[source]

Creates a filter for the received Kelvin Messages based on a filter function.

Parameters:

func (Callable[[Message], TypeGuard[TypeVar(T, bound= Message)]]) – Filter function, it should receive a Message as argument and return bool.

Returns:

Returns a asyncio queue to receive the filtered messages.

Return type:

Queue[TypeVar(T, bound= Message)]

stream_filter(func)[source]

Creates a stream for the received Kelvin Messages based on a filter function. See filter.

Parameters:

func (Callable[[Message], TypeGuard[TypeVar(T, bound= Message)]]) – Filter function, it should receive a Message as argument and return bool.

Returns:

Async Generator that can be async iterated to receive filtered messages.

Return type:

AsyncGenerator[Message, None]

Yields:

Iterator[AsyncGenerator[Message, None]] – Yields the filtered messages.

tumbling_window(window_size, assets=None, inputs=None, round_to=None)[source]

Creates a fixed, non-overlapping windowing.

Parameters:
  • window_size (timedelta) – Duration of each window.

  • assets (Optional[list[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

Returns:

An instance configured with the given parameters.

Return type:

TumblingWindow

hopping_window(window_size, hop_size, assets=None, inputs=None, round_to=None)[source]

Creates a window with fixed size and overlap.

Parameters:
  • window_size (timedelta) – Duration of each window.

  • hop_size (timedelta) – Step between window starts (defines overlap).

  • assets (Optional[list[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

Returns:

An instance configured with the given parameters.

Return type:

HoppingWindow

rolling_window(count_size, assets=None, inputs=None, round_to=None, slide=1)[source]

Creates a sliding count-based window over incoming data.

Parameters:
  • count_size (int) – Number of records per window.

  • assets (Optional[list[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[list[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

  • slide (int) – Number of records to slide the window forward on each update.

Returns:

An instance configured with the given parameters.

Return type:

RollingWindow

stream(fn=None, *, assets=None, inputs=None)[source]

Register a per-message stream handler that is invoked for each incoming AssetDataMessage matching the optional assets/inputs filters.

Usage patterns:

@app.stream() async def my_stream(msg: AssetDataMessage): …

@app.stream(inputs=[“in1”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) def my_stream(msg: AssetDataMessage): …

def my_stream(msg: AssetDataMessage): … app.stream(my_stream, assets=[“asset1”], inputs=[“in1”])

The registered stream runs as an app task when the app connects.

Return type:

Union[Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]], Callable[[Union[Callable[[ParamSpec(P, bound= None)], TypeVar(R)], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]]

Parameters:
task(fn=None, *, name=None)[source]

Register a function as a task, either sync or async.

This method acts as both a decorator and a decorator factory. It supports the following usage patterns:

@app.task async def my_async_task(…): …

@app.task() def my_sync_task(…): …

@app.task(name=”custom.task.name”) def another_task(…): …

app.task(some_function)

Parameters:
Return type:

Union[Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]], Callable[[Union[Callable[[ParamSpec(P, bound= None)], TypeVar(R)], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]]

Returns:

If fn is provided, returns the async-compatible task wrapper. If fn is None, returns a decorator that can be applied to a function.

timer(fn=None, *, interval, name=None)[source]
Overloads:
  • self, fn (TaskFunc[[], Any]), interval (float), name (Optional[str]) → AsyncFunc[[], Any]

  • self, interval (float), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]

Parameters:
Return type:

Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]

Register a function to be called at a repeating interval.

Usage patterns:

@app.timer(interval=5) def foo(): …

@app.timer(interval=5, name=”my timer”) async def foo(): …

def bar(): … app.timer(bar, interval=10) app.timer(bar, interval=10, name=”bar.timer”)

schedule(fn=None, *, every=None, at=None, cron=None, timezone='UTC', start_time=None, interval=None, name=None)[source]
Overloads:
  • self, fn (TaskFunc[[], Any]), every (Optional[str]), at (Optional[Union[str, list[str]]]), cron (Optional[str]), timezone (str), start_time (Optional[datetime]), interval (Optional[int]), name (Optional[str]) → AsyncFunc[[], Any]

  • self, every (Optional[str]), at (Optional[Union[str, list[str]]]), cron (Optional[str]), timezone (str), start_time (Optional[datetime]), interval (Optional[int]), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]

Parameters:
Return type:

Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]

Register a function to run on a cron-like schedule.

Supports two styles of schedule definition:

Human-readable:

@app.schedule(every=”day”, at=”19:00”, timezone=”Australia/Sydney”) async def daily_report(): …

@app.schedule(every=”monday”, at=”09:00”) async def weekly_meeting(): …

@app.schedule(every=”day”, at=[“09:00”, “17:00”]) async def twice_daily(): …

Cron expression:

@app.schedule(cron=”0 9 * * MON-FRI”, timezone=”Europe/London”) async def weekday_job(): …

With interval (every Nth occurrence):

@app.schedule(every=”monday”, at=”09:00”, interval=2, start_time=datetime(2026, 4, 6)) async def biweekly_sync(): …

Parameters:
  • fn (Union[Callable[[], Any], Callable[[], Awaitable[Any]], None]) – The function to register. Can be sync or async.

  • every (Optional[str]) – Human-readable recurrence (“day”, “monday”, “weekday”, etc.).

  • at (Union[str, list[str], None]) – Time(s) of day in “HH:MM” format. Required for every schedules. Can be a single string or a list of strings for multiple daily times.

  • cron (Optional[str]) – A standard 5-field cron expression. Mutually exclusive with every/at.

  • timezone (str) – IANA timezone name (default “UTC”).

  • start_time (Optional[datetime]) – Reference datetime for deterministic schedule alignment. When set, the scheduler computes fire times from this point.

  • interval (Optional[int]) – Fire every Nth cron match. When set with start_time, the count survives restarts. Without start_time, the first match always fires and the count resets on restart.

  • name (Optional[str]) – Optional task name for logging.

Return type:

Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]

async run_forever()[source]

Connects to the service and then waits indefinitely until cancelled. On cancellation, disconnects cleanly before propagating the cancellation.

Return type:

None

run()[source]

Synchronous entry point: - Starts an asyncio event loop to run run_forever(). - Blocks until run_forever() completes or is cancelled. - Allows Ctrl-C (KeyboardInterrupt) to stop cleanly.

Return type:

None

msg_is_control_change(msg)[source]

Check if a message is a control change type.

A control change is an asset data message where the datastream’s way is configured as input_cc_output or input_cc.

Parameters:

msg (Message) – The message to check.

Return type:

TypeGuard[AssetDataMessage]

Returns:

True if the message is a control change, False otherwise.

API Client

exception kelvin.application.api_client.UnavailableApiClientError[source]

Bases: Exception

Raised when attempting to use an unavailable API client.

kelvin.application.api_client.initialize_api_client()[source]

Validate required environment vars are available to create the API client Returns UnavailableApiClient mock client if requirements are not met.

Returns:

The instantiated API client or an UnavailableApiClient if requirements are not met

Return type:

AsyncClient

Config

exception kelvin.application.config.ConfigurationError[source]

Bases: Exception

Filters

Message filters for Kelvin applications.

This module provides filter functions for routing and filtering Kelvin messages based on their type, resource, or content. Filters return TypeGuard predicates that can be used with KelvinApp.filter() or KelvinApp.stream_filter().

Example

>>> from kelvin.application import filters
>>> queue = app.filter(filters.is_asset_data_message)
>>> async for msg in app.stream_filter(filters.asset_equals("my-asset")):
...     print(msg)
kelvin.application.filters.is_asset_data_message(msg)[source]

Check if the message is an Asset Data Message.

An Asset Data Message has a KRNAssetDataStream resource and a data type.

Parameters:

msg (Message) – The message to check.

Return type:

TypeGuard[AssetDataMessage]

Returns:

True if the message is an Asset Data Message.

kelvin.application.filters.is_data_message(msg)[source]

Check if the message contains data (has a data type).

Parameters:

msg (Message) – The message to check.

Return type:

bool

Returns:

True if the message has a data type.

kelvin.application.filters.is_control_status_message(msg)[source]

Check if the message is a Control Change Status.

Return type:

TypeGuard[ControlChangeStatus]

Parameters:

msg (Message)

kelvin.application.filters.resource_equals(resource)[source]

Create a filter that matches messages with the specified resource(s).

Parameters:

resource (Union[KRN, list[KRN]]) – A single KRN or list of KRNs to match against.

Return type:

Callable[[Message], TypeGuard[Message]]

Returns:

A filter function that returns True if the message resource matches.

kelvin.application.filters.input_equals(data)[source]

Create a filter that matches Asset Data Messages with the specified datastream(s).

Parameters:

data (Union[str, list[str]]) – A single datastream name or list of datastream names to match.

Return type:

Callable[[Message], TypeGuard[AssetDataMessage]]

Returns:

A filter function that returns True if the message datastream matches.

kelvin.application.filters.asset_equals(asset)[source]

Create a filter that matches messages for the specified asset(s).

Parameters:

asset (Union[str, list[str]]) – A single asset name or list of asset names to match.

Return type:

Callable[[Message], TypeGuard[Message]]

Returns:

A filter function that returns True if the message asset matches.

kelvin.application.filters.is_custom_action(msg)[source]

Check if the message is a Custom Action Message.

Return type:

TypeGuard[CustomAction]

Parameters:

msg (Message)

kelvin.application.filters.is_asset_data_quality_message(msg)[source]

Check if the message is an Asset Data Quality Message.

Return type:

TypeGuard[AssetDataQualityMessage]

Parameters:

msg (Message)

kelvin.application.filters.is_asset_data_stream_quality_message(msg)[source]

Check if the message is an Asset Data Stream Data Quality Message.

Return type:

TypeGuard[AssetDataStreamDataQualityMessage]

Parameters:

msg (Message)

kelvin.application.filters.is_data_quality_message(msg)[source]

Check if the message is a Data Quality Message.

Return type:

TypeGuard[AssetDataQualityMessage | AssetDataStreamDataQualityMessage]

Parameters:

msg (Message)

kelvin.application.filters.is_data_tag(msg)[source]

Check if the message is a Data Tag Message.

Return type:

TypeGuard[DataTag]

Parameters:

msg (Message)

Stream

Kelvin Stream interface for message communication.

This module provides the low-level stream interface for connecting to and communicating with the Kelvin platform via TCP sockets.

Main Components:

StreamInterface: Abstract base class defining the stream protocol. KelvinStream: Concrete implementation for Kelvin platform communication. KelvinStreamConfig: Configuration settings for the stream connection.

class kelvin.application.stream.StreamInterface[source]

Bases: ABC

Abstract interface for a connection to a Kelvin system.

This class defines the protocol for stream-based communication with the Kelvin platform. Implementations must provide connect, disconnect, read, and write operations.

abstractmethod async connect()[source]
Return type:

None

abstractmethod async disconnect()[source]

Disconnect from the Kelvin Stream.

Return type:

None

abstractmethod async read()[source]

Read the next message from the stream.

Return type:

Message

abstractmethod async write(msg)[source]

Write a message to the stream.

Return type:

bool

Parameters:

msg (Message)

class kelvin.application.stream.KelvinStreamConfig(_case_sensitive=None, _nested_model_default_partial_update=None, _env_prefix=None, _env_prefix_target=None, _env_file=PosixPath('.'), _env_file_encoding=None, _env_ignore_empty=None, _env_nested_delimiter=None, _env_nested_max_split=None, _env_parse_none_str=None, _env_parse_enums=None, _cli_prog_name=None, _cli_parse_args=None, _cli_settings_source=None, _cli_parse_none_str=None, _cli_hide_none_type=None, _cli_avoid_json=None, _cli_enforce_required=None, _cli_use_class_docs_for_groups=None, _cli_exit_on_error=None, _cli_prefix=None, _cli_flag_prefix_char=None, _cli_implicit_flags=None, _cli_ignore_unknown_args=None, _cli_kebab_case=None, _cli_shortcuts=None, _secrets_dir=None, _build_sources=None, **values)[source]

Bases: BaseSettings

Configuration for the Kelvin Stream connection.

Parameters:
  • _case_sensitive (bool | None)

  • _nested_model_default_partial_update (bool | None)

  • _env_prefix (str | None)

  • _env_prefix_target (EnvPrefixTarget | None)

  • _env_file (DotenvType | None)

  • _env_file_encoding (str | None)

  • _env_ignore_empty (bool | None)

  • _env_nested_delimiter (str | None)

  • _env_nested_max_split (int | None)

  • _env_parse_none_str (str | None)

  • _env_parse_enums (bool | None)

  • _cli_prog_name (str | None)

  • _cli_parse_args (bool | list[str] | tuple[str, ...] | None)

  • _cli_settings_source (CliSettingsSource[Any] | None)

  • _cli_parse_none_str (str | None)

  • _cli_hide_none_type (bool | None)

  • _cli_avoid_json (bool | None)

  • _cli_enforce_required (bool | None)

  • _cli_use_class_docs_for_groups (bool | None)

  • _cli_exit_on_error (bool | None)

  • _cli_prefix (str | None)

  • _cli_flag_prefix_char (str | None)

  • _cli_implicit_flags (bool | Literal['dual', 'toggle'] | None)

  • _cli_ignore_unknown_args (bool | None)

  • _cli_kebab_case (bool | Literal['all', 'no_enums'] | None)

  • _cli_shortcuts (Mapping[str, str | list[str]] | None)

  • _secrets_dir (PathType | None)

  • _build_sources (tuple[tuple[PydanticBaseSettingsSource, ...], dict[str, Any]] | None)

  • ip (str)

  • port (int)

  • limit (int)

ip

The IP address of the Kelvin stream server. Default: “127.0.0.1”.

port

The port number for the stream connection. Default: 49167.

limit

Maximum buffer size in bytes for the stream reader. Default: 4MB.

Environment Variables:

KELVIN_STREAM_IP: Override the IP address. KELVIN_STREAM_PORT: Override the port number. KELVIN_STREAM_LIMIT: Override the buffer limit.

model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'KELVIN_STREAM_', 'env_prefix_target': 'variable', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

ip: str
port: int
limit: int
final class kelvin.application.stream.KelvinStream(config=None)[source]

Bases: StreamInterface

TCP stream implementation for Kelvin platform communication.

This class manages a persistent TCP connection to the Kelvin stream server, handling message serialization/deserialization and connection lifecycle.

Parameters:

config (Optional[KelvinStreamConfig])

_config

The stream configuration settings.

_reader

The async stream reader (None when disconnected).

_writer

The async stream writer (None when disconnected).

__init__(config=None)[source]

Initialize the Kelvin Stream.

Parameters:

config (Optional[KelvinStreamConfig]) – Stream configuration settings. Uses defaults if not provided.

Return type:

None

async connect()[source]

Connect to the Kelvin Stream server.

Raises:

ConnectionError – If the stream server is unreachable.

Return type:

None

async disconnect()[source]

Disconnects from Kelvin Stream

Return type:

None

async read()[source]

Reads the next Kelvin Message

Raises:

ConnectionError – When connection is unavailable.

Returns:

the read Message

Return type:

Message

async write(msg)[source]

Writes a Message to the Kelvin Stream

Parameters:

msg (Message) – Kelvin message to write

Raises:

ConnectionError – If the connection is lost.

Returns:

True if the message was sent with success.

Return type:

bool

Timer

Async timer with drift correction.

This module provides a Timer class for executing periodic tasks with automatic clock drift correction to maintain consistent intervals.

final class kelvin.application.timer.Timer(interval, name, max_drift_correction=0.1, clock=None)[source]

Bases: object

A repeating async timer that corrects clock drift.

The Timer automatically adjusts sleep intervals to compensate for execution time and system clock drift, ensuring consistent timing over extended periods.

Parameters:
  • interval (float)

  • name (str)

  • max_drift_correction (float)

  • clock (Optional[ClockInterface])

interval

The target interval between timer fires in seconds.

name

A descriptive name for the timer (used in logging).

max_drift

Maximum drift before logging a warning.

min_interval

Minimum allowed sleep interval.

max_interval

Maximum allowed sleep interval.

iteration

Number of times the timer has fired.

overlaps

Number of times execution exceeded the interval.

Example

>>> timer = Timer(interval=5.0, name="my-timer")
>>> async for _ in timer:
...     await do_periodic_work()
async __aiter__()[source]

An async iterator that yields the actual sleep interval each loop.

Return type:

AsyncIterator[float]

Scheduler

Async scheduler for cron-like recurring tasks.

This module provides a Scheduler class for executing tasks on a cron-like schedule with timezone support, and utilities for converting human-readable schedule parameters into cron expressions.

kelvin.application.scheduler.parse_at(at)[source]

Parse and validate at time values.

Parameters:

at (Union[str, list[str]]) – A single “HH:MM” string or a list of them.

Return type:

list[tuple[int, int]]

Returns:

A sorted list of (hour, minute) tuples, deduplicated and validated.

Raises:

ValueError – If any value has an invalid format or is out of range, or if there are duplicate entries.

kelvin.application.scheduler.build_cron_expressions(*, every=None, at=None, cron=None)[source]

Convert schedule parameters into one or more cron expression strings.

When at contains times with different minutes (e.g. ["09:30", "17:45"]), multiple cron expressions are returned since a single expression cannot represent different minute values for different hours.

Parameters:
  • every (Optional[str]) – Human-readable recurrence value.

  • at (Union[str, list[str], None]) – Time(s) of day in "HH:MM" format.

  • cron (Optional[str]) – A 5-field cron expression.

Return type:

list[str]

Returns:

A list of valid cron expression strings.

Raises:

ValueError – On invalid or conflicting parameters.

final class kelvin.application.scheduler.Scheduler(cron_expressions, name, tz, start_time=None, interval=None, clock=None)[source]

Bases: object

An async iterator that yields at each scheduled fire time.

Computes the next fire time using one or more cron expressions with timezone support. Supports optional start_time for deterministic alignment and interval for Nth-occurrence scheduling.

On startup the scheduler uses start_time as the scheduling base when provided. If start_time is in the past, it fast-forwards through all past fire times (counting for interval alignment) without yielding, so a far-past start_time never triggers past executions. If start_time is in the future, scheduling is aligned from that future time and no fire time will be yielded before it.

Parameters:
  • cron_expressions (list[str])

  • name (str)

  • tz (ZoneInfo)

  • start_time (Optional[datetime])

  • interval (Optional[int])

  • clock (Optional[ClockInterface])

cron_expressions

The cron expressions defining the schedule.

tz

The timezone for interpreting the schedule.

name

A descriptive name for the scheduler (used in logging).

interval

Fire every Nth cron match (None means every match).

iteration

Number of times the scheduler has actually fired.

async __aiter__()[source]

Async iterator that yields the sleep duration before each fire.

Return type:

AsyncIterator[float]

Version

Window

kelvin.application.window.quantize_time(dt, step=None, *, mode='nearest', anchor=None)[source]

Quantize a datetime to a fixed step.

  • Accepts naive datetimes and treats them as UTC.

  • Uses integer microseconds to avoid floating-point rounding issues.

  • If step is None, return the input unchanged.

Parameters

dtdatetime

Timestamp to quantize. Naive values are interpreted as UTC.

steptimedelta or None

Quantization step. When None, no quantization is applied.

mode{“nearest”, “floor”, “ceil”}

Rounding mode. Defaults to “nearest”.

anchordatetime or None

Grid anchor. When None, use the Unix epoch at UTC.

Returns

datetime

Quantized datetime with the same tzinfo semantics as input, normalized to UTC if naive.

rtype:

datetime

Parameters:
Return type:

datetime

kelvin.application.window.round_nearest_time(dt, round_to=None)[source]

Backwards-compatible wrapper that rounds to the nearest tick.

Return type:

datetime

Parameters:
class kelvin.application.window.BaseWindow(assets, inputs, queue, *, round_to=None, buffer_size=None, clock=None)[source]

Bases: object

Manage per-asset in-memory buffers and DataFrames for streaming data.

Design

  • Each asset has a deque of (timestamp, {stream: payload}). The deque can be bounded by buffer_size.

  • Timestamps are normalized to UTC and optionally quantized.

  • DataFrames are per asset, indexed by UTC timestamps, with columns equal to inputs.

  • Duplicate timestamps within a buffer are merged into a single row before appending.

get_df(asset_name)[source]

Return a copy of the DataFrame for the asset or an empty shell.

The returned DataFrame has a UTC DatetimeIndex and columns equal to inputs. When the asset has not produced data yet, return an empty DataFrame with the correct index and columns.

Return type:

DataFrame

Parameters:

asset_name (str)

Parameters:
class kelvin.application.window.BaseTimeWindow(assets, inputs, queue, window_size, hop_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None, clock=None)[source]

Bases: BaseWindow

Time-based windowing of data streams.

Parameters

assetslist[str]

Asset identifiers to include.

inputslist[str]

Data stream identifiers to include as DataFrame columns.

queueasyncio.Queue[AssetDataMessage]

Source of incoming messages.

window_sizetimedelta

Duration of each window.

hop_sizetimedelta

Time between the start of consecutive windows.

round_totimedelta or None

Optional timestamp quantization step.

align_steptimedelta or None

Optional alignment step for initial window start. If None, no extra alignment.

allowed_latenesstimedelta or None

If provided, accept events older than window_start by up to this amount.

buffer_sizeint or None

Optional bound for per-asset buffer deques.

Notes

  • Naive window_start values are interpreted as UTC.

  • Message timestamps that are naive are also treated as UTC.

async stream(window_start=None)[source]

Continuously emit windowed DataFrames per asset.

Return type:

AsyncGenerator[tuple[str, DataFrame], None]

Parameters:

window_start (datetime | None)

Behavior

  • Align the first emission to window_start + window_size.

  • Sleep until that first boundary, then drain and emit.

  • Continue emitting every hop_size using a Timer.

Parameters

window_startdatetime or None

Start of the first window. When None, use current UTC time. Naive values are interpreted as UTC. The start is optionally aligned by align_step.

Parameters:
  • assets (list[str])

  • inputs (list[str])

  • queue (asyncio.Queue[AssetDataMessage])

  • window_size (timedelta)

  • hop_size (timedelta)

  • round_to (Optional[timedelta])

  • align_step (Optional[timedelta])

  • allowed_lateness (Optional[timedelta])

  • buffer_size (Optional[int])

  • clock (Optional['ClockInterface'])

class kelvin.application.window.TumblingWindow(assets, inputs, queue, window_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None, clock=None)[source]

Bases: BaseTimeWindow

Non-overlapping time windows where hop_size equals window_size.

Parameters:
  • assets (list[str])

  • inputs (list[str])

  • queue (asyncio.Queue[AssetDataMessage])

  • window_size (timedelta)

  • round_to (Optional[timedelta])

  • align_step (Optional[timedelta])

  • allowed_lateness (Optional[timedelta])

  • buffer_size (Optional[int])

  • clock (Optional['ClockInterface'])

class kelvin.application.window.HoppingWindow(assets, inputs, queue, window_size, hop_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None, clock=None)[source]

Bases: BaseTimeWindow

Overlapping time windows when hop_size is smaller than window_size.

Parameters:
  • assets (list[str])

  • inputs (list[str])

  • queue (asyncio.Queue[AssetDataMessage])

  • window_size (timedelta)

  • hop_size (timedelta)

  • round_to (Optional[timedelta])

  • align_step (Optional[timedelta])

  • allowed_lateness (Optional[timedelta])

  • buffer_size (Optional[int])

  • clock (Optional['ClockInterface'])

class kelvin.application.window.RollingWindow(assets, inputs, queue, count_size, *, slide=1, round_to=None, buffer_size=None, clock=None)[source]

Bases: BaseWindow

Rolling window based on a fixed count of messages per asset.

Notes

  • The internal per-asset deque is bounded to prevent unbounded growth.

  • The DataFrame yielded for an asset represents the last count_size items after merging duplicate timestamps inside that slice.

async stream()[source]

Continuously emit rolling windows as new messages arrive.

Return type:

AsyncGenerator[tuple[str, DataFrame], None]

Parameters: