Application

The application module provides the core framework for building Kelvin applications.

Kelvin Application SDK.

This module provides the core components for building Kelvin applications that connect to and interact with the Kelvin platform.

Main Components:

KelvinApp: The main application class for connecting to Kelvin and processing messages. KelvinStream: Low-level stream interface for Kelvin communication. AssetInfo: Information about configured assets. Datastream: Datastream configuration. ResourceDatastream: Asset datastream configuration with access control.

Example

>>> from kelvin.application import KelvinApp
>>> async with KelvinApp() as app:
...     async for msg in app.stream_filter(filters.is_asset_data_message):
...         print(msg)
class kelvin.application.KelvinApp(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304), api=None)[source]

Bases: object

Kelvin App to connect and interface with the KelvinStream.

After connecting, the connection is handled automatically in the background.

Use filters or filter_stream to easily listen for specific messages. Use register_callback methods to register callbacks for events like connect and disconnect. Use tasks to run background functions that can be async or sync. Use timers to run functions at regular intervals.

The app can be used as an async context manager, which will automatically connect on enter and disconnect on exit.

Example usage:
async with KelvinApp() as app:

await app.publish(Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0)) async for msg in app.stream_filter(filters.is_asset_data_message):

print(msg)

Parameters:
api

API client for direct connection with Kelvin API

on_connect: Callable[[], Awaitable[None]] | None

Called when the connection is established.

on_disconnect: Callable[[], Awaitable[None]] | None

Called when the connection is closed.

on_message: Callable[[Message], Awaitable[None]] | None

Called on receipt of any message.

on_asset_input: Callable[[AssetDataMessage], Awaitable[None]] | None

Called when an asset data message is received.

on_control_change: Callable[[AssetDataMessage], Awaitable[None]] | None

Called when a control change message is received.

on_control_status: Callable[[ControlChangeStatus], Awaitable[None]] | None

Called when a control status is received.

on_custom_action: Callable[[CustomAction], Awaitable[None]] | None

Called when a custom action is received.

on_asset_change: Callable[[AssetInfo | None, AssetInfo | None], Awaitable[None]] | None

Called when an asset is added, removed, or modified. First arg is the new asset (None if removed); second is the previous asset (None if newly added).

on_app_configuration: Callable[[dict], Awaitable[None]] | None

Called when the app configuration changes.

property is_connected: bool

Indicates whether the read loop is active, implying an established connection.

property assets: Dict[str, AssetInfo]

Get all assets configured for this application.

It returns a dictionary where each key is the asset name, and the value is an AssetInfo object describing that asset’s properties, parameters, and datastreams.

The assets dictionary is dynamically updated whenever the application receives updates to asset properties or parameters, ensuring it always reflects the latest configuration.

Returns:

A dictionary where keys are asset names (strings) and values are AssetInfo instances

representing the current configuration and state of each asset.

Return type:

Dict[str,AssetInfo]

Example

{
“asset1”: AssetInfo(

name=”asset1”, properties={

“tubing_length”: 25.0, “area”: 11.0

}, parameters={

“param-bool”: False, “param-number”: 7.5, “param-string”: “hello”

}, datastreams={

“output1”: ResourceDatastream(

asset=KRNAsset(“asset1”), io_name=”output1”, datastream=Datastream(

name=”datastream1”, type=KMessageTypeData(“float”), unit=”m”

), access=”RO”, owned=True, configuration={}

)

}

)

}

property app_configuration: dict

Get the application configuration.

Returns:

A mapping of configuration sections to their values, matching the structure in app.yaml.

Return type:

dict

Example

{
“foo”: {

“conf_string”: “value1”, “conf_number”: 25, “conf_bool”: False,

}

}

property config_received: Event

Event set when the initial configuration is received.

Use this asyncio.Event to wait until the application has loaded its initial app/asset parameters.

Returns:

Event that becomes set once the initial configuration arrives.

Return type:

asyncio.Event

Example

await app.config_received.wait()

property inputs: List[AppIO]

List all input metrics configured for the application.

Each AppIO has:
  • name (str): the metric identifier.

  • data_type (str): the data type of the input.

Returns:

Read-only list of configured input metrics.

Return type:

List[AppIO]

property outputs: List[AppIO]

List all output metrics configured for the application.

Each AppIO has:
  • name (str): the metric identifier.

  • data_type (str): the data type of the output.

Returns:

Read-only list of configured output metrics.

Return type:

List[AppIO]

property tasks: Dict[str, Callable[[], Awaitable]]

Retrieve registered asynchronous tasks.

Returns:

Dict of task names and callable that produce awaitables when called. Represents tasks scheduled or pending.

Return type:

Dict[str, Callable[[], Awaitable]]

async __aenter__()[source]

Support async context: connect on enter.

Return type:

Self

async __aexit__(exc_type, exc_value, tb)[source]

Support async context: disconnect on exit.

Return type:

Optional[bool]

Parameters:
async connect()[source]

Establish connection, retrying indefinitely until success. Starts the read loop and waits for App Configuration before firing on_connect callback and tasks/timers.

Return type:

None

async disconnect()[source]

Cancel read loop, stop all tasks, fire on_disconnect, and close stream.

Return type:

None

async publish(msg)[source]

Publish a message to KelvinStream.

Accepts either a Message instance or a MessageBuilder. If given a MessageBuilder, it is converted to a Message via to_message() before sending. Returns True on success, or False if the connection is unavailable.

Parameters:

msg (Union[Message, MessageBuilder]) –

  • A Message to send directly, or

  • A MessageBuilder which will be converted to Message.

Returns:

  • True if the message was sent successfully.

  • False if sending failed due to a ConnectionError.

Return type:

bool

Examples

message = Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0) success = await client.publish(message) if success:

print(“Published message”)

else:

print(“Publish failed”)

filter(func)[source]

Creates a filter for the received Kelvin Messages based on a filter function.

Parameters:

func (Callable[[Message], TypeGuard[TypeVar(T, bound= Message)]]) – Filter function, it should receive a Message as argument and return bool.

Returns:

Returns a asyncio queue to receive the filtered messages.

Return type:

Queue[TypeVar(T, bound= Message)]

stream_filter(func)[source]

Creates a stream for the received Kelvin Messages based on a filter function. See filter.

Parameters:

func (Callable[[Message], TypeGuard[TypeVar(T, bound= Message)]]) – Filter function, it should receive a Message as argument and return bool.

Returns:

Async Generator that can be async iterated to receive filtered messages.

Return type:

AsyncGenerator[Message, None]

Yields:

Iterator[AsyncGenerator[Message, None]] – Yields the filtered messages.

tumbling_window(window_size, assets=None, inputs=None, round_to=None)[source]

Creates a fixed, non-overlapping windowing.

Parameters:
  • window_size (timedelta) – Duration of each window.

  • assets (Optional[List[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

Returns:

An instance configured with the given parameters.

Return type:

TumblingWindow

hopping_window(window_size, hop_size, assets=None, inputs=None, round_to=None)[source]

Creates a window with fixed size and overlap.

Parameters:
  • window_size (timedelta) – Duration of each window.

  • hop_size (timedelta) – Step between window starts (defines overlap).

  • assets (Optional[List[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

Returns:

An instance configured with the given parameters.

Return type:

HoppingWindow

rolling_window(count_size, assets=None, inputs=None, round_to=None, slide=1)[source]

Creates a sliding count-based window over incoming data.

Parameters:
  • count_size (int) – Number of records per window.

  • assets (Optional[List[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

  • slide (int) – Number of records to slide the window forward on each update.

Returns:

An instance configured with the given parameters.

Return type:

RollingWindow

stream(fn=None, *, assets=None, inputs=None)[source]

Register a per-message stream handler that is invoked for each incoming AssetDataMessage matching the optional assets/inputs filters.

Usage patterns:

@app.stream() async def my_stream(msg: AssetDataMessage): …

@app.stream(inputs=[“in1”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) def my_stream(msg: AssetDataMessage): …

def my_stream(msg: AssetDataMessage): … app.stream(my_stream, assets=[“asset1”], inputs=[“in1”])

The registered stream runs as an app task when the app connects.

Return type:

Union[Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]], Callable[[Union[Callable[[ParamSpec(P, bound= None)], TypeVar(R)], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]]

Parameters:
task(fn=None, *, name=None)[source]

Register a function as a task, either sync or async.

This method acts as both a decorator and a decorator factory. It supports the following usage patterns:

@app.task async def my_async_task(…): …

@app.task() def my_sync_task(…): …

@app.task(name=”custom.task.name”) def another_task(…): …

app.task(some_function)

Parameters:
Return type:

Union[Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]], Callable[[Union[Callable[[ParamSpec(P, bound= None)], TypeVar(R)], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]]

Returns:

If fn is provided, returns the async-compatible task wrapper. If fn is None, returns a decorator that can be applied to a function.

handle_task_result(task)[source]
Return type:

None

Parameters:

task (Task)

timer(fn=None, *, interval, name=None)[source]
Overloads:
  • self, fn (TaskFunc[[], Any]), interval (float), name (Optional[str]) → AsyncFunc[[], Any]

  • self, interval (float), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]

Parameters:
Return type:

Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]

Register a function to be called at a repeating interval.

Usage patterns:

@app.timer(interval=5) def foo(): …

@app.timer(interval=5, name=”my timer”) async def foo(): …

def bar(): … app.timer(bar, interval=10) app.timer(bar, interval=10, name=”bar.timer”)

async run_forever()[source]

Connects to the service and then waits indefinitely until cancelled. On cancellation, disconnects cleanly before propagating the cancellation.

Return type:

None

run()[source]

Synchronous entry point: - Starts an asyncio event loop to run run_forever(). - Blocks until run_forever() completes or is cancelled. - Allows Ctrl-C (KeyboardInterrupt) to stop cleanly.

Return type:

None

msg_is_control_change(msg)[source]
Return type:

TypeGuard[AssetDataMessage]

Parameters:

msg (Message)

class kelvin.application.KelvinStream(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304))[source]

Bases: StreamInterface

TCP stream implementation for Kelvin platform communication.

This class manages a persistent TCP connection to the Kelvin stream server, handling message serialization/deserialization and connection lifecycle.

Parameters:

config (KelvinStreamConfig)

_config

The stream configuration settings.

_reader

The async stream reader (None when disconnected).

_writer

The async stream writer (None when disconnected).

__init__(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304))[source]

Initialize the Kelvin Stream.

Parameters:

config (KelvinStreamConfig) – Stream configuration settings. Uses defaults if not provided.

Return type:

None

async connect()[source]

Connect to the Kelvin Stream server.

Raises:

ConnectionError – If the stream server is unreachable.

Return type:

None

async disconnect()[source]

Disconnects from Kelvin Stream

Return type:

None

async read()[source]

Reads the next Kelvin Message

Raises:

ConnectionError – When connection is unavailable.

Returns:

the read Message

Return type:

Message

async write(msg)[source]

Writes a Message to the Kelvin Stream

Parameters:

msg (Message) – Kelvin message to write

Raises:

ConnectionError – If the connection is lost.

Returns:

True if the message was sent with success.

Return type:

bool

class kelvin.application.AssetInfo(*args, **kwargs)[source]

Bases: object

Parameters:
name: str
properties: Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
parameters: Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
datastreams: Dict[str, ResourceDatastream] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
title: str | None = None
asset_type_name: str | None = None
asset_type_title: str | None = None
class kelvin.application.ResourceDatastream(*args, **kwargs)[source]

Bases: object

Parameters:
asset: KRNAsset
io_name: str
datastream: Datastream
configuration: Dict = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
way: WayEnum = 'output'
owned: bool = False
access: Literal['RO', 'RW', 'WO'] = 'RO'
class kelvin.application.Datastream(*args, **kwargs)[source]

Bases: object

Parameters:
name: str
type: KMessageType
unit: str | None = None

Client

class kelvin.application.client.Datastream(*args, **kwargs)[source]

Bases: object

Parameters:
name: str
type: KMessageType
unit: str | None = None
class kelvin.application.client.AppIO(*args, **kwargs)[source]

Bases: Datastream

Parameters:
class kelvin.application.client.ResourceDatastream(*args, **kwargs)[source]

Bases: object

Parameters:
asset: KRNAsset
io_name: str
datastream: Datastream
configuration: Dict = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
way: WayEnum = 'output'
owned: bool = False
access: Literal['RO', 'RW', 'WO'] = 'RO'
class kelvin.application.client.AssetInfo(*args, **kwargs)[source]

Bases: object

Parameters:
name: str
properties: Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
parameters: Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
datastreams: Dict[str, ResourceDatastream] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)
title: str | None = None
asset_type_name: str | None = None
asset_type_title: str | None = None
class kelvin.application.client.KelvinApp(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304), api=None)[source]

Bases: object

Kelvin App to connect and interface with the KelvinStream.

After connecting, the connection is handled automatically in the background.

Use filters or filter_stream to easily listen for specific messages. Use register_callback methods to register callbacks for events like connect and disconnect. Use tasks to run background functions that can be async or sync. Use timers to run functions at regular intervals.

The app can be used as an async context manager, which will automatically connect on enter and disconnect on exit.

Example usage:
async with KelvinApp() as app:

await app.publish(Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0)) async for msg in app.stream_filter(filters.is_asset_data_message):

print(msg)

Parameters:
api

API client for direct connection with Kelvin API

on_connect: Callable[[], Awaitable[None]] | None

Called when the connection is established.

on_disconnect: Callable[[], Awaitable[None]] | None

Called when the connection is closed.

on_message: Callable[[Message], Awaitable[None]] | None

Called on receipt of any message.

on_asset_input: Callable[[AssetDataMessage], Awaitable[None]] | None

Called when an asset data message is received.

on_control_change: Callable[[AssetDataMessage], Awaitable[None]] | None

Called when a control change message is received.

on_control_status: Callable[[ControlChangeStatus], Awaitable[None]] | None

Called when a control status is received.

on_custom_action: Callable[[CustomAction], Awaitable[None]] | None

Called when a custom action is received.

on_asset_change: Callable[[AssetInfo | None, AssetInfo | None], Awaitable[None]] | None

Called when an asset is added, removed, or modified. First arg is the new asset (None if removed); second is the previous asset (None if newly added).

on_app_configuration: Callable[[dict], Awaitable[None]] | None

Called when the app configuration changes.

property is_connected: bool

Indicates whether the read loop is active, implying an established connection.

property assets: Dict[str, AssetInfo]

Get all assets configured for this application.

It returns a dictionary where each key is the asset name, and the value is an AssetInfo object describing that asset’s properties, parameters, and datastreams.

The assets dictionary is dynamically updated whenever the application receives updates to asset properties or parameters, ensuring it always reflects the latest configuration.

Returns:

A dictionary where keys are asset names (strings) and values are AssetInfo instances

representing the current configuration and state of each asset.

Return type:

Dict[str,AssetInfo]

Example

{
“asset1”: AssetInfo(

name=”asset1”, properties={

“tubing_length”: 25.0, “area”: 11.0

}, parameters={

“param-bool”: False, “param-number”: 7.5, “param-string”: “hello”

}, datastreams={

“output1”: ResourceDatastream(

asset=KRNAsset(“asset1”), io_name=”output1”, datastream=Datastream(

name=”datastream1”, type=KMessageTypeData(“float”), unit=”m”

), access=”RO”, owned=True, configuration={}

)

}

)

}

property app_configuration: dict

Get the application configuration.

Returns:

A mapping of configuration sections to their values, matching the structure in app.yaml.

Return type:

dict

Example

{
“foo”: {

“conf_string”: “value1”, “conf_number”: 25, “conf_bool”: False,

}

}

property config_received: Event

Event set when the initial configuration is received.

Use this asyncio.Event to wait until the application has loaded its initial app/asset parameters.

Returns:

Event that becomes set once the initial configuration arrives.

Return type:

asyncio.Event

Example

await app.config_received.wait()

property inputs: List[AppIO]

List all input metrics configured for the application.

Each AppIO has:
  • name (str): the metric identifier.

  • data_type (str): the data type of the input.

Returns:

Read-only list of configured input metrics.

Return type:

List[AppIO]

property outputs: List[AppIO]

List all output metrics configured for the application.

Each AppIO has:
  • name (str): the metric identifier.

  • data_type (str): the data type of the output.

Returns:

Read-only list of configured output metrics.

Return type:

List[AppIO]

property tasks: Dict[str, Callable[[], Awaitable]]

Retrieve registered asynchronous tasks.

Returns:

Dict of task names and callable that produce awaitables when called. Represents tasks scheduled or pending.

Return type:

Dict[str, Callable[[], Awaitable]]

async __aenter__()[source]

Support async context: connect on enter.

Return type:

Self

async __aexit__(exc_type, exc_value, tb)[source]

Support async context: disconnect on exit.

Return type:

Optional[bool]

Parameters:
async connect()[source]

Establish connection, retrying indefinitely until success. Starts the read loop and waits for App Configuration before firing on_connect callback and tasks/timers.

Return type:

None

async disconnect()[source]

Cancel read loop, stop all tasks, fire on_disconnect, and close stream.

Return type:

None

async publish(msg)[source]

Publish a message to KelvinStream.

Accepts either a Message instance or a MessageBuilder. If given a MessageBuilder, it is converted to a Message via to_message() before sending. Returns True on success, or False if the connection is unavailable.

Parameters:

msg (Union[Message, MessageBuilder]) –

  • A Message to send directly, or

  • A MessageBuilder which will be converted to Message.

Returns:

  • True if the message was sent successfully.

  • False if sending failed due to a ConnectionError.

Return type:

bool

Examples

message = Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0) success = await client.publish(message) if success:

print(“Published message”)

else:

print(“Publish failed”)

filter(func)[source]

Creates a filter for the received Kelvin Messages based on a filter function.

Parameters:

func (Callable[[Message], TypeGuard[TypeVar(T, bound= Message)]]) – Filter function, it should receive a Message as argument and return bool.

Returns:

Returns a asyncio queue to receive the filtered messages.

Return type:

Queue[TypeVar(T, bound= Message)]

stream_filter(func)[source]

Creates a stream for the received Kelvin Messages based on a filter function. See filter.

Parameters:

func (Callable[[Message], TypeGuard[TypeVar(T, bound= Message)]]) – Filter function, it should receive a Message as argument and return bool.

Returns:

Async Generator that can be async iterated to receive filtered messages.

Return type:

AsyncGenerator[Message, None]

Yields:

Iterator[AsyncGenerator[Message, None]] – Yields the filtered messages.

tumbling_window(window_size, assets=None, inputs=None, round_to=None)[source]

Creates a fixed, non-overlapping windowing.

Parameters:
  • window_size (timedelta) – Duration of each window.

  • assets (Optional[List[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

Returns:

An instance configured with the given parameters.

Return type:

TumblingWindow

hopping_window(window_size, hop_size, assets=None, inputs=None, round_to=None)[source]

Creates a window with fixed size and overlap.

Parameters:
  • window_size (timedelta) – Duration of each window.

  • hop_size (timedelta) – Step between window starts (defines overlap).

  • assets (Optional[List[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

Returns:

An instance configured with the given parameters.

Return type:

HoppingWindow

rolling_window(count_size, assets=None, inputs=None, round_to=None, slide=1)[source]

Creates a sliding count-based window over incoming data.

Parameters:
  • count_size (int) – Number of records per window.

  • assets (Optional[List[str]]) – Optional list of asset names to filter on.

  • inputs (Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.

  • round_to (Optional[timedelta]) – Optional interval to which window boundaries are aligned.

  • slide (int) – Number of records to slide the window forward on each update.

Returns:

An instance configured with the given parameters.

Return type:

RollingWindow

stream(fn=None, *, assets=None, inputs=None)[source]

Register a per-message stream handler that is invoked for each incoming AssetDataMessage matching the optional assets/inputs filters.

Usage patterns:

@app.stream() async def my_stream(msg: AssetDataMessage): …

@app.stream(inputs=[“in1”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) async def my_stream(msg: AssetDataMessage): …

@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) def my_stream(msg: AssetDataMessage): …

def my_stream(msg: AssetDataMessage): … app.stream(my_stream, assets=[“asset1”], inputs=[“in1”])

The registered stream runs as an app task when the app connects.

Return type:

Union[Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]], Callable[[Union[Callable[[ParamSpec(P, bound= None)], TypeVar(R)], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]]

Parameters:
task(fn=None, *, name=None)[source]

Register a function as a task, either sync or async.

This method acts as both a decorator and a decorator factory. It supports the following usage patterns:

@app.task async def my_async_task(…): …

@app.task() def my_sync_task(…): …

@app.task(name=”custom.task.name”) def another_task(…): …

app.task(some_function)

Parameters:
Return type:

Union[Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]], Callable[[Union[Callable[[ParamSpec(P, bound= None)], TypeVar(R)], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]], Callable[[ParamSpec(P, bound= None)], Awaitable[TypeVar(R)]]]]

Returns:

If fn is provided, returns the async-compatible task wrapper. If fn is None, returns a decorator that can be applied to a function.

handle_task_result(task)[source]
Return type:

None

Parameters:

task (Task)

timer(fn=None, *, interval, name=None)[source]
Overloads:
  • self, fn (TaskFunc[[], Any]), interval (float), name (Optional[str]) → AsyncFunc[[], Any]

  • self, interval (float), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]

Parameters:
Return type:

Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]

Register a function to be called at a repeating interval.

Usage patterns:

@app.timer(interval=5) def foo(): …

@app.timer(interval=5, name=”my timer”) async def foo(): …

def bar(): … app.timer(bar, interval=10) app.timer(bar, interval=10, name=”bar.timer”)

async run_forever()[source]

Connects to the service and then waits indefinitely until cancelled. On cancellation, disconnects cleanly before propagating the cancellation.

Return type:

None

run()[source]

Synchronous entry point: - Starts an asyncio event loop to run run_forever(). - Blocks until run_forever() completes or is cancelled. - Allows Ctrl-C (KeyboardInterrupt) to stop cleanly.

Return type:

None

msg_is_control_change(msg)[source]
Return type:

TypeGuard[AssetDataMessage]

Parameters:

msg (Message)

API Client

exception kelvin.application.api_client.UnavailableApiClientError[source]

Bases: Exception

Raised when attempting to use an unavailable API client.

kelvin.application.api_client.initialize_api_client()[source]

Validate required environment vars are available to create the API client Returns UnavailableApiClient mock client if requirements are not met.

Returns:

The instantiated API client or an UnavailableApiClient if requirements are not met

Return type:

AsyncClient

Config

exception kelvin.application.config.ConfigurationError[source]

Bases: Exception

Filters

Message filters for Kelvin applications.

This module provides filter functions for routing and filtering Kelvin messages based on their type, resource, or content. Filters return TypeGuard predicates that can be used with KelvinApp.filter() or KelvinApp.stream_filter().

Example

>>> from kelvin.application import filters
>>> queue = app.filter(filters.is_asset_data_message)
>>> async for msg in app.stream_filter(filters.asset_equals("my-asset")):
...     print(msg)
kelvin.application.filters.is_asset_data_message(msg)[source]

Check if the message is an Asset Data Message.

An Asset Data Message has a KRNAssetDataStream resource and a data type.

Parameters:

msg (Message) – The message to check.

Return type:

TypeGuard[AssetDataMessage]

Returns:

True if the message is an Asset Data Message.

kelvin.application.filters.is_data_message(msg)[source]

Check if the message contains data (has a data type).

Parameters:

msg (Message) – The message to check.

Return type:

bool

Returns:

True if the message has a data type.

kelvin.application.filters.is_control_status_message(msg)[source]

Check if the message is a Control Change Status.

Return type:

TypeGuard[ControlChangeStatus]

Parameters:

msg (Message)

kelvin.application.filters.resource_equals(resource)[source]

Create a filter that matches messages with the specified resource(s).

Parameters:

resource (Union[KRN, List[KRN]]) – A single KRN or list of KRNs to match against.

Return type:

Callable[[Message], TypeGuard[Message]]

Returns:

A filter function that returns True if the message resource matches.

kelvin.application.filters.input_equals(data)[source]

Create a filter that matches Asset Data Messages with the specified datastream(s).

Parameters:

data (Union[str, List[str]]) – A single datastream name or list of datastream names to match.

Return type:

Callable[[Message], TypeGuard[AssetDataMessage]]

Returns:

A filter function that returns True if the message datastream matches.

kelvin.application.filters.asset_equals(asset)[source]

Create a filter that matches messages for the specified asset(s).

Parameters:

asset (Union[str, List[str]]) – A single asset name or list of asset names to match.

Return type:

Callable[[Message], TypeGuard[Message]]

Returns:

A filter function that returns True if the message asset matches.

kelvin.application.filters.is_custom_action(msg)[source]

Check if the message is a Custom Action Message.

Return type:

TypeGuard[CustomAction]

Parameters:

msg (Message)

kelvin.application.filters.is_asset_data_quality_message(msg)[source]

Check if the message is an Asset Data Quality Message.

Return type:

TypeGuard[AssetDataQualityMessage]

Parameters:

msg (Message)

kelvin.application.filters.is_asset_data_stream_quality_message(msg)[source]

Check if the message is an Asset Data Stream Data Quality Message.

Return type:

TypeGuard[AssetDataStreamDataQualityMessage]

Parameters:

msg (Message)

kelvin.application.filters.is_data_quality_message(msg)[source]

Check if the message is a Data Quality Message.

Return type:

TypeGuard[AssetDataQualityMessage | AssetDataStreamDataQualityMessage]

Parameters:

msg (Message)

Stream

Kelvin Stream interface for message communication.

This module provides the low-level stream interface for connecting to and communicating with the Kelvin platform via TCP sockets.

Main Components:

StreamInterface: Abstract base class defining the stream protocol. KelvinStream: Concrete implementation for Kelvin platform communication. KelvinStreamConfig: Configuration settings for the stream connection.

class kelvin.application.stream.StreamInterface[source]

Bases: ABC

Abstract interface for a connection to a Kelvin system.

This class defines the protocol for stream-based communication with the Kelvin platform. Implementations must provide connect, disconnect, read, and write operations.

abstractmethod async connect()[source]
Return type:

None

abstractmethod async disconnect()[source]

Disconnect from the Kelvin Stream.

Return type:

None

abstractmethod async read()[source]

Read the next message from the stream.

Return type:

Message

abstractmethod async write(msg)[source]

Write a message to the stream.

Return type:

bool

Parameters:

msg (Message)

class kelvin.application.stream.KelvinStreamConfig(_case_sensitive=None, _nested_model_default_partial_update=None, _env_prefix=None, _env_prefix_target=None, _env_file=PosixPath('.'), _env_file_encoding=None, _env_ignore_empty=None, _env_nested_delimiter=None, _env_nested_max_split=None, _env_parse_none_str=None, _env_parse_enums=None, _cli_prog_name=None, _cli_parse_args=None, _cli_settings_source=None, _cli_parse_none_str=None, _cli_hide_none_type=None, _cli_avoid_json=None, _cli_enforce_required=None, _cli_use_class_docs_for_groups=None, _cli_exit_on_error=None, _cli_prefix=None, _cli_flag_prefix_char=None, _cli_implicit_flags=None, _cli_ignore_unknown_args=None, _cli_kebab_case=None, _cli_shortcuts=None, _secrets_dir=None, _build_sources=None, **values)[source]

Bases: BaseSettings

Configuration for the Kelvin Stream connection.

Parameters:
  • _case_sensitive (bool | None)

  • _nested_model_default_partial_update (bool | None)

  • _env_prefix (str | None)

  • _env_prefix_target (EnvPrefixTarget | None)

  • _env_file (DotenvType | None)

  • _env_file_encoding (str | None)

  • _env_ignore_empty (bool | None)

  • _env_nested_delimiter (str | None)

  • _env_nested_max_split (int | None)

  • _env_parse_none_str (str | None)

  • _env_parse_enums (bool | None)

  • _cli_prog_name (str | None)

  • _cli_parse_args (bool | list[str] | tuple[str, ...] | None)

  • _cli_settings_source (CliSettingsSource[Any] | None)

  • _cli_parse_none_str (str | None)

  • _cli_hide_none_type (bool | None)

  • _cli_avoid_json (bool | None)

  • _cli_enforce_required (bool | None)

  • _cli_use_class_docs_for_groups (bool | None)

  • _cli_exit_on_error (bool | None)

  • _cli_prefix (str | None)

  • _cli_flag_prefix_char (str | None)

  • _cli_implicit_flags (bool | Literal['dual', 'toggle'] | None)

  • _cli_ignore_unknown_args (bool | None)

  • _cli_kebab_case (bool | Literal['all', 'no_enums'] | None)

  • _cli_shortcuts (Mapping[str, str | list[str]] | None)

  • _secrets_dir (PathType | None)

  • _build_sources (tuple[tuple[PydanticBaseSettingsSource, ...], dict[str, Any]] | None)

  • ip (str)

  • port (int)

  • limit (int)

ip

The IP address of the Kelvin stream server. Default: “127.0.0.1”.

port

The port number for the stream connection. Default: 49167.

limit

Maximum buffer size in bytes for the stream reader. Default: 4MB.

Environment Variables:

KELVIN_STREAM_IP: Override the IP address. KELVIN_STREAM_PORT: Override the port number. KELVIN_STREAM_LIMIT: Override the buffer limit.

model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'KELVIN_STREAM_', 'env_prefix_target': 'variable', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

ip: str
port: int
limit: int
class kelvin.application.stream.KelvinStream(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304))[source]

Bases: StreamInterface

TCP stream implementation for Kelvin platform communication.

This class manages a persistent TCP connection to the Kelvin stream server, handling message serialization/deserialization and connection lifecycle.

Parameters:

config (KelvinStreamConfig)

_config

The stream configuration settings.

_reader

The async stream reader (None when disconnected).

_writer

The async stream writer (None when disconnected).

__init__(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304))[source]

Initialize the Kelvin Stream.

Parameters:

config (KelvinStreamConfig) – Stream configuration settings. Uses defaults if not provided.

Return type:

None

async connect()[source]

Connect to the Kelvin Stream server.

Raises:

ConnectionError – If the stream server is unreachable.

Return type:

None

async disconnect()[source]

Disconnects from Kelvin Stream

Return type:

None

async read()[source]

Reads the next Kelvin Message

Raises:

ConnectionError – When connection is unavailable.

Returns:

the read Message

Return type:

Message

async write(msg)[source]

Writes a Message to the Kelvin Stream

Parameters:

msg (Message) – Kelvin message to write

Raises:

ConnectionError – If the connection is lost.

Returns:

True if the message was sent with success.

Return type:

bool

Timer

Async timer with drift correction.

This module provides a Timer class for executing periodic tasks with automatic clock drift correction to maintain consistent intervals.

class kelvin.application.timer.Timer(interval, name, max_drift_correction=0.1)[source]

Bases: object

A repeating async timer that corrects clock drift.

The Timer automatically adjusts sleep intervals to compensate for execution time and system clock drift, ensuring consistent timing over extended periods.

Parameters:
interval

The target interval between timer fires in seconds.

name

A descriptive name for the timer (used in logging).

max_drift

Maximum drift before logging a warning.

min_interval

Minimum allowed sleep interval.

max_interval

Maximum allowed sleep interval.

iteration

Number of times the timer has fired.

overlaps

Number of times execution exceeded the interval.

Example

>>> timer = Timer(interval=5.0, name="my-timer")
>>> async for _ in timer:
...     await do_periodic_work()
async __aiter__()[source]

An async iterator that yields the actual sleep interval each loop.

Return type:

AsyncIterator[float]

Version

Window

kelvin.application.window.quantize_time(dt, step=None, *, mode='nearest', anchor=None)[source]

Quantize a datetime to a fixed step.

  • Accepts naive datetimes and treats them as UTC.

  • Uses integer microseconds to avoid floating-point rounding issues.

  • If step is None, return the input unchanged.

Parameters

dtdatetime

Timestamp to quantize. Naive values are interpreted as UTC.

steptimedelta or None

Quantization step. When None, no quantization is applied.

mode{“nearest”, “floor”, “ceil”}

Rounding mode. Defaults to “nearest”.

anchordatetime or None

Grid anchor. When None, use the Unix epoch at UTC.

Returns

datetime

Quantized datetime with the same tzinfo semantics as input, normalized to UTC if naive.

rtype:

datetime

Parameters:
Return type:

datetime

kelvin.application.window.round_nearest_time(dt, round_to=None)[source]

Backwards-compatible wrapper that rounds to the nearest tick.

Return type:

datetime

Parameters:
class kelvin.application.window.BaseWindow(assets, inputs, queue, *, round_to=None, buffer_size=None)[source]

Bases: object

Manage per-asset in-memory buffers and DataFrames for streaming data.

Design

  • Each asset has a deque of (timestamp, {stream: payload}). The deque can be bounded by buffer_size.

  • Timestamps are normalized to UTC and optionally quantized.

  • DataFrames are per asset, indexed by UTC timestamps, with columns equal to inputs.

  • Duplicate timestamps within a buffer are merged into a single row before appending.

get_df(asset_name)[source]

Return a copy of the DataFrame for the asset or an empty shell.

The returned DataFrame has a UTC DatetimeIndex and columns equal to inputs. When the asset has not produced data yet, return an empty DataFrame with the correct index and columns.

Return type:

DataFrame

Parameters:

asset_name (str)

Parameters:
class kelvin.application.window.BaseTimeWindow(assets, inputs, queue, window_size, hop_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None)[source]

Bases: BaseWindow

Time-based windowing of data streams.

Parameters

assetslist[str]

Asset identifiers to include.

inputslist[str]

Data stream identifiers to include as DataFrame columns.

queueasyncio.Queue[AssetDataMessage]

Source of incoming messages.

window_sizetimedelta

Duration of each window.

hop_sizetimedelta

Time between the start of consecutive windows.

round_totimedelta or None

Optional timestamp quantization step.

align_steptimedelta or None

Optional alignment step for initial window start. If None, no extra alignment.

allowed_latenesstimedelta or None

If provided, accept events older than window_start by up to this amount.

buffer_sizeint or None

Optional bound for per-asset buffer deques.

Notes

  • Naive window_start values are interpreted as UTC.

  • Message timestamps that are naive are also treated as UTC.

async stream(window_start=None)[source]

Continuously emit windowed DataFrames per asset.

Return type:

AsyncGenerator[Tuple[str, DataFrame], None]

Parameters:

window_start (datetime | None)

Behavior

  • Align the first emission to window_start + window_size.

  • Sleep until that first boundary, then drain and emit.

  • Continue emitting every hop_size using a Timer.

Parameters

window_startdatetime or None

Start of the first window. When None, use current UTC time. Naive values are interpreted as UTC. The start is optionally aligned by align_step.

Parameters:
  • assets (List[str])

  • inputs (List[str])

  • queue (asyncio.Queue[AssetDataMessage])

  • window_size (timedelta)

  • hop_size (timedelta)

  • round_to (Optional[timedelta])

  • align_step (Optional[timedelta])

  • allowed_lateness (Optional[timedelta])

  • buffer_size (Optional[int])

class kelvin.application.window.TumblingWindow(assets, inputs, queue, window_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None)[source]

Bases: BaseTimeWindow

Non-overlapping time windows where hop_size equals window_size.

Parameters:
  • assets (List[str])

  • inputs (List[str])

  • queue (asyncio.Queue[AssetDataMessage])

  • window_size (timedelta)

  • round_to (Optional[timedelta])

  • align_step (Optional[timedelta])

  • allowed_lateness (Optional[timedelta])

  • buffer_size (Optional[int])

class kelvin.application.window.HoppingWindow(assets, inputs, queue, window_size, hop_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None)[source]

Bases: BaseTimeWindow

Overlapping time windows when hop_size is smaller than window_size.

Parameters:
  • assets (List[str])

  • inputs (List[str])

  • queue (asyncio.Queue[AssetDataMessage])

  • window_size (timedelta)

  • hop_size (timedelta)

  • round_to (Optional[timedelta])

  • align_step (Optional[timedelta])

  • allowed_lateness (Optional[timedelta])

  • buffer_size (Optional[int])

class kelvin.application.window.RollingWindow(assets, inputs, queue, count_size, *, slide=1, round_to=None, buffer_size=None)[source]

Bases: BaseWindow

Rolling window based on a fixed count of messages per asset.

Notes

  • The internal per-asset deque is bounded to prevent unbounded growth.

  • The DataFrame yielded for an asset represents the last count_size items after merging duplicate timestamps inside that slice.

async stream()[source]

Continuously emit rolling windows as new messages arrive.

Return type:

AsyncGenerator[Tuple[str, DataFrame], None]

Parameters: