Application¶
The application module provides the core framework for building Kelvin applications.
Kelvin Application SDK.
This module provides the core components for building Kelvin applications that connect to and interact with the Kelvin platform.
- Main Components:
KelvinApp: The main application class for connecting to Kelvin and processing messages. KelvinStream: Low-level stream interface for Kelvin communication. AssetInfo: Information about configured assets. Datastream: Datastream configuration. ResourceDatastream: Asset datastream configuration with access control.
Example
>>> from kelvin.application import KelvinApp
>>> async with KelvinApp() as app:
... async for msg in app.stream_filter(filters.is_asset_data_message):
... print(msg)
- class kelvin.application.KelvinApp(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304), api=None)[source]¶
Bases:
objectKelvin App to connect and interface with the KelvinStream.
After connecting, the connection is handled automatically in the background.
Use filters or filter_stream to easily listen for specific messages. Use register_callback methods to register callbacks for events like connect and disconnect. Use tasks to run background functions that can be async or sync. Use timers to run functions at regular intervals.
The app can be used as an async context manager, which will automatically connect on enter and disconnect on exit.
- Example usage:
- async with KelvinApp() as app:
await app.publish(Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0)) async for msg in app.stream_filter(filters.is_asset_data_message):
print(msg)
- Parameters:
config (KelvinStreamConfig)
api (Optional[AsyncClient])
- api¶
API client for direct connection with Kelvin API
- on_asset_input: Callable[[AssetDataMessage], Awaitable[None]] | None¶
Called when an asset data message is received.
- on_control_change: Callable[[AssetDataMessage], Awaitable[None]] | None¶
Called when a control change message is received.
- on_control_status: Callable[[ControlChangeStatus], Awaitable[None]] | None¶
Called when a control status is received.
- on_custom_action: Callable[[CustomAction], Awaitable[None]] | None¶
Called when a custom action is received.
- on_asset_change: Callable[[AssetInfo | None, AssetInfo | None], Awaitable[None]] | None¶
Called when an asset is added, removed, or modified. First arg is the new asset (None if removed); second is the previous asset (None if newly added).
- on_app_configuration: Callable[[dict], Awaitable[None]] | None¶
Called when the app configuration changes.
- property is_connected: bool¶
Indicates whether the read loop is active, implying an established connection.
- property assets: Dict[str, AssetInfo]¶
Get all assets configured for this application.
It returns a dictionary where each key is the asset name, and the value is an AssetInfo object describing that asset’s properties, parameters, and datastreams.
The assets dictionary is dynamically updated whenever the application receives updates to asset properties or parameters, ensuring it always reflects the latest configuration.
- Returns:
- A dictionary where keys are asset names (strings) and values are AssetInfo instances
representing the current configuration and state of each asset.
- Return type:
Example
- {
- “asset1”: AssetInfo(
name=”asset1”, properties={
“tubing_length”: 25.0, “area”: 11.0
}, parameters={
“param-bool”: False, “param-number”: 7.5, “param-string”: “hello”
}, datastreams={
- “output1”: ResourceDatastream(
asset=KRNAsset(“asset1”), io_name=”output1”, datastream=Datastream(
name=”datastream1”, type=KMessageTypeData(“float”), unit=”m”
), access=”RO”, owned=True, configuration={}
)
}
)
}
- property app_configuration: dict¶
Get the application configuration.
- Returns:
A mapping of configuration sections to their values, matching the structure in app.yaml.
- Return type:
Example
- {
- “foo”: {
“conf_string”: “value1”, “conf_number”: 25, “conf_bool”: False,
}
}
- property config_received: Event¶
Event set when the initial configuration is received.
Use this asyncio.Event to wait until the application has loaded its initial app/asset parameters.
- Returns:
Event that becomes set once the initial configuration arrives.
- Return type:
Example
await app.config_received.wait()
- property inputs: List[AppIO]¶
List all input metrics configured for the application.
- Each AppIO has:
name (str): the metric identifier.
data_type (str): the data type of the input.
- Returns:
Read-only list of configured input metrics.
- Return type:
List[AppIO]
- property outputs: List[AppIO]¶
List all output metrics configured for the application.
- Each AppIO has:
name (str): the metric identifier.
data_type (str): the data type of the output.
- Returns:
Read-only list of configured output metrics.
- Return type:
List[AppIO]
- property tasks: Dict[str, Callable[[], Awaitable]]¶
Retrieve registered asynchronous tasks.
- Returns:
Dict of task names and callable that produce awaitables when called. Represents tasks scheduled or pending.
- Return type:
Dict[str, Callable[[], Awaitable]]
- async __aexit__(exc_type, exc_value, tb)[source]¶
Support async context: disconnect on exit.
- Return type:
- Parameters:
exc_type (Type[E] | None)
exc_value (E | None)
tb (TracebackType | None)
- async connect()[source]¶
Establish connection, retrying indefinitely until success. Starts the read loop and waits for App Configuration before firing on_connect callback and tasks/timers.
- Return type:
- async disconnect()[source]¶
Cancel read loop, stop all tasks, fire on_disconnect, and close stream.
- Return type:
- async publish(msg)[source]¶
Publish a message to KelvinStream.
Accepts either a Message instance or a MessageBuilder. If given a MessageBuilder, it is converted to a Message via to_message() before sending. Returns True on success, or False if the connection is unavailable.
- Parameters:
msg (
Union[Message,MessageBuilder]) –A Message to send directly, or
A MessageBuilder which will be converted to Message.
- Returns:
True if the message was sent successfully.
False if sending failed due to a ConnectionError.
- Return type:
Examples
message = Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0) success = await client.publish(message) if success:
print(“Published message”)
- else:
print(“Publish failed”)
- stream_filter(func)[source]¶
Creates a stream for the received Kelvin Messages based on a filter function. See filter.
- Parameters:
func (
Callable[[Message],TypeGuard[TypeVar(T, bound=Message)]]) – Filter function, it should receive a Message as argument and return bool.- Returns:
Async Generator that can be async iterated to receive filtered messages.
- Return type:
AsyncGenerator[Message, None]
- Yields:
Iterator[AsyncGenerator[Message, None]] – Yields the filtered messages.
- tumbling_window(window_size, assets=None, inputs=None, round_to=None)[source]¶
Creates a fixed, non-overlapping windowing.
- Parameters:
window_size (
timedelta) – Duration of each window.assets (
Optional[List[str]]) – Optional list of asset names to filter on.inputs (
Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.
- Returns:
An instance configured with the given parameters.
- Return type:
- hopping_window(window_size, hop_size, assets=None, inputs=None, round_to=None)[source]¶
Creates a window with fixed size and overlap.
- Parameters:
window_size (
timedelta) – Duration of each window.hop_size (
timedelta) – Step between window starts (defines overlap).assets (
Optional[List[str]]) – Optional list of asset names to filter on.inputs (
Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.
- Returns:
An instance configured with the given parameters.
- Return type:
- rolling_window(count_size, assets=None, inputs=None, round_to=None, slide=1)[source]¶
Creates a sliding count-based window over incoming data.
- Parameters:
count_size (
int) – Number of records per window.assets (
Optional[List[str]]) – Optional list of asset names to filter on.inputs (
Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.slide (
int) – Number of records to slide the window forward on each update.
- Returns:
An instance configured with the given parameters.
- Return type:
- stream(fn=None, *, assets=None, inputs=None)[source]¶
Register a per-message stream handler that is invoked for each incoming AssetDataMessage matching the optional assets/inputs filters.
Usage patterns:
@app.stream() async def my_stream(msg: AssetDataMessage): …
@app.stream(inputs=[“in1”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) def my_stream(msg: AssetDataMessage): …
def my_stream(msg: AssetDataMessage): … app.stream(my_stream, assets=[“asset1”], inputs=[“in1”])
The registered stream runs as an app task when the app connects.
- task(fn=None, *, name=None)[source]¶
Register a function as a task, either sync or async.
This method acts as both a decorator and a decorator factory. It supports the following usage patterns:
@app.task async def my_async_task(…): …
@app.task() def my_sync_task(…): …
@app.task(name=”custom.task.name”) def another_task(…): …
app.task(some_function)
- Parameters:
fn (
Union[Callable[[ParamSpec(P, bound=None)],TypeVar(R)],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]],None]) – The function to register. Can be sync or async. If not provided, a decorator is returned.name (
Optional[str]) – Optional name to register the task under. If not provided, the fully-qualified function name is used.
- Return type:
Union[Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]],Callable[[Union[Callable[[ParamSpec(P, bound=None)],TypeVar(R)],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]]]],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]]]]- Returns:
If fn is provided, returns the async-compatible task wrapper. If fn is None, returns a decorator that can be applied to a function.
- timer(fn=None, *, interval, name=None)[source]¶
- Overloads:
self, fn (TaskFunc[[], Any]), interval (float), name (Optional[str]) → AsyncFunc[[], Any]
self, interval (float), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]
- Parameters:
- Return type:
Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]
Register a function to be called at a repeating interval.
Usage patterns:
@app.timer(interval=5) def foo(): …
@app.timer(interval=5, name=”my timer”) async def foo(): …
def bar(): … app.timer(bar, interval=10) app.timer(bar, interval=10, name=”bar.timer”)
- async run_forever()[source]¶
Connects to the service and then waits indefinitely until cancelled. On cancellation, disconnects cleanly before propagating the cancellation.
- Return type:
- run()[source]¶
Synchronous entry point: - Starts an asyncio event loop to run run_forever(). - Blocks until run_forever() completes or is cancelled. - Allows Ctrl-C (KeyboardInterrupt) to stop cleanly.
- Return type:
- msg_is_control_change(msg)[source]¶
- Return type:
TypeGuard[AssetDataMessage]- Parameters:
msg (Message)
- class kelvin.application.KelvinStream(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304))[source]¶
Bases:
StreamInterfaceTCP stream implementation for Kelvin platform communication.
This class manages a persistent TCP connection to the Kelvin stream server, handling message serialization/deserialization and connection lifecycle.
- Parameters:
config (KelvinStreamConfig)
- _config¶
The stream configuration settings.
- _reader¶
The async stream reader (None when disconnected).
- _writer¶
The async stream writer (None when disconnected).
- __init__(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304))[source]¶
Initialize the Kelvin Stream.
- Parameters:
config (
KelvinStreamConfig) – Stream configuration settings. Uses defaults if not provided.- Return type:
None
- async connect()[source]¶
Connect to the Kelvin Stream server.
- Raises:
ConnectionError – If the stream server is unreachable.
- Return type:
- async read()[source]¶
Reads the next Kelvin Message
- Raises:
ConnectionError – When connection is unavailable.
- Returns:
the read Message
- Return type:
- async write(msg)[source]¶
Writes a Message to the Kelvin Stream
- Parameters:
msg (
Message) – Kelvin message to write- Raises:
ConnectionError – If the connection is lost.
- Returns:
True if the message was sent with success.
- Return type:
- class kelvin.application.AssetInfo(*args, **kwargs)[source]¶
Bases:
object- Parameters:
name (str)
properties (Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]])
parameters (Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]])
datastreams (Dict[str, ResourceDatastream])
title (str | None)
asset_type_name (str | None)
asset_type_title (str | None)
- properties: Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- parameters: Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- datastreams: Dict[str, ResourceDatastream] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- class kelvin.application.ResourceDatastream(*args, **kwargs)[source]¶
Bases:
object- Parameters:
- asset: KRNAsset¶
- datastream: Datastream¶
- class kelvin.application.Datastream(*args, **kwargs)[source]¶
Bases:
object- Parameters:
name (str)
type (KMessageType)
unit (str | None)
- type: KMessageType¶
Client¶
- class kelvin.application.client.Datastream(*args, **kwargs)[source]¶
Bases:
object- Parameters:
name (str)
type (KMessageType)
unit (str | None)
- type: KMessageType¶
- class kelvin.application.client.AppIO(*args, **kwargs)[source]¶
Bases:
Datastream- Parameters:
name (str)
type (KMessageType)
unit (str | None)
- class kelvin.application.client.ResourceDatastream(*args, **kwargs)[source]¶
Bases:
object- Parameters:
- asset: KRNAsset¶
- datastream: Datastream¶
- class kelvin.application.client.AssetInfo(*args, **kwargs)[source]¶
Bases:
object- Parameters:
name (str)
properties (Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]])
parameters (Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]])
datastreams (Dict[str, ResourceDatastream])
title (str | None)
asset_type_name (str | None)
asset_type_title (str | None)
- properties: Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)] | list[Annotated[bool, Strict(strict=True)]] | list[Annotated[int, Strict(strict=True)]] | list[Annotated[float, Strict(strict=True)]] | list[Annotated[str, Strict(strict=True)]]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- parameters: Dict[str, Annotated[bool, Strict(strict=True)] | Annotated[int, Strict(strict=True)] | Annotated[float, Strict(strict=True)] | Annotated[str, Strict(strict=True)]] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- datastreams: Dict[str, ResourceDatastream] = FieldInfo(annotation=NoneType, required=False, default_factory=dict)¶
- class kelvin.application.client.KelvinApp(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304), api=None)[source]¶
Bases:
objectKelvin App to connect and interface with the KelvinStream.
After connecting, the connection is handled automatically in the background.
Use filters or filter_stream to easily listen for specific messages. Use register_callback methods to register callbacks for events like connect and disconnect. Use tasks to run background functions that can be async or sync. Use timers to run functions at regular intervals.
The app can be used as an async context manager, which will automatically connect on enter and disconnect on exit.
- Example usage:
- async with KelvinApp() as app:
await app.publish(Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0)) async for msg in app.stream_filter(filters.is_asset_data_message):
print(msg)
- Parameters:
config (KelvinStreamConfig)
api (Optional[AsyncClient])
- api¶
API client for direct connection with Kelvin API
- on_asset_input: Callable[[AssetDataMessage], Awaitable[None]] | None¶
Called when an asset data message is received.
- on_control_change: Callable[[AssetDataMessage], Awaitable[None]] | None¶
Called when a control change message is received.
- on_control_status: Callable[[ControlChangeStatus], Awaitable[None]] | None¶
Called when a control status is received.
- on_custom_action: Callable[[CustomAction], Awaitable[None]] | None¶
Called when a custom action is received.
- on_asset_change: Callable[[AssetInfo | None, AssetInfo | None], Awaitable[None]] | None¶
Called when an asset is added, removed, or modified. First arg is the new asset (None if removed); second is the previous asset (None if newly added).
- on_app_configuration: Callable[[dict], Awaitable[None]] | None¶
Called when the app configuration changes.
- property is_connected: bool¶
Indicates whether the read loop is active, implying an established connection.
- property assets: Dict[str, AssetInfo]¶
Get all assets configured for this application.
It returns a dictionary where each key is the asset name, and the value is an AssetInfo object describing that asset’s properties, parameters, and datastreams.
The assets dictionary is dynamically updated whenever the application receives updates to asset properties or parameters, ensuring it always reflects the latest configuration.
- Returns:
- A dictionary where keys are asset names (strings) and values are AssetInfo instances
representing the current configuration and state of each asset.
- Return type:
Example
- {
- “asset1”: AssetInfo(
name=”asset1”, properties={
“tubing_length”: 25.0, “area”: 11.0
}, parameters={
“param-bool”: False, “param-number”: 7.5, “param-string”: “hello”
}, datastreams={
- “output1”: ResourceDatastream(
asset=KRNAsset(“asset1”), io_name=”output1”, datastream=Datastream(
name=”datastream1”, type=KMessageTypeData(“float”), unit=”m”
), access=”RO”, owned=True, configuration={}
)
}
)
}
- property app_configuration: dict¶
Get the application configuration.
- Returns:
A mapping of configuration sections to their values, matching the structure in app.yaml.
- Return type:
Example
- {
- “foo”: {
“conf_string”: “value1”, “conf_number”: 25, “conf_bool”: False,
}
}
- property config_received: Event¶
Event set when the initial configuration is received.
Use this asyncio.Event to wait until the application has loaded its initial app/asset parameters.
- Returns:
Event that becomes set once the initial configuration arrives.
- Return type:
Example
await app.config_received.wait()
- property inputs: List[AppIO]¶
List all input metrics configured for the application.
- Each AppIO has:
name (str): the metric identifier.
data_type (str): the data type of the input.
- Returns:
Read-only list of configured input metrics.
- Return type:
List[AppIO]
- property outputs: List[AppIO]¶
List all output metrics configured for the application.
- Each AppIO has:
name (str): the metric identifier.
data_type (str): the data type of the output.
- Returns:
Read-only list of configured output metrics.
- Return type:
List[AppIO]
- property tasks: Dict[str, Callable[[], Awaitable]]¶
Retrieve registered asynchronous tasks.
- Returns:
Dict of task names and callable that produce awaitables when called. Represents tasks scheduled or pending.
- Return type:
Dict[str, Callable[[], Awaitable]]
- async __aexit__(exc_type, exc_value, tb)[source]¶
Support async context: disconnect on exit.
- Return type:
- Parameters:
exc_type (Type[E] | None)
exc_value (E | None)
tb (TracebackType | None)
- async connect()[source]¶
Establish connection, retrying indefinitely until success. Starts the read loop and waits for App Configuration before firing on_connect callback and tasks/timers.
- Return type:
- async disconnect()[source]¶
Cancel read loop, stop all tasks, fire on_disconnect, and close stream.
- Return type:
- async publish(msg)[source]¶
Publish a message to KelvinStream.
Accepts either a Message instance or a MessageBuilder. If given a MessageBuilder, it is converted to a Message via to_message() before sending. Returns True on success, or False if the connection is unavailable.
- Parameters:
msg (
Union[Message,MessageBuilder]) –A Message to send directly, or
A MessageBuilder which will be converted to Message.
- Returns:
True if the message was sent successfully.
False if sending failed due to a ConnectionError.
- Return type:
Examples
message = Number(resource=KRNAssetDataStream(‘my-asset’, ‘my-input’), payload=1.0) success = await client.publish(message) if success:
print(“Published message”)
- else:
print(“Publish failed”)
- stream_filter(func)[source]¶
Creates a stream for the received Kelvin Messages based on a filter function. See filter.
- Parameters:
func (
Callable[[Message],TypeGuard[TypeVar(T, bound=Message)]]) – Filter function, it should receive a Message as argument and return bool.- Returns:
Async Generator that can be async iterated to receive filtered messages.
- Return type:
AsyncGenerator[Message, None]
- Yields:
Iterator[AsyncGenerator[Message, None]] – Yields the filtered messages.
- tumbling_window(window_size, assets=None, inputs=None, round_to=None)[source]¶
Creates a fixed, non-overlapping windowing.
- Parameters:
window_size (
timedelta) – Duration of each window.assets (
Optional[List[str]]) – Optional list of asset names to filter on.inputs (
Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.
- Returns:
An instance configured with the given parameters.
- Return type:
- hopping_window(window_size, hop_size, assets=None, inputs=None, round_to=None)[source]¶
Creates a window with fixed size and overlap.
- Parameters:
window_size (
timedelta) – Duration of each window.hop_size (
timedelta) – Step between window starts (defines overlap).assets (
Optional[List[str]]) – Optional list of asset names to filter on.inputs (
Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.
- Returns:
An instance configured with the given parameters.
- Return type:
- rolling_window(count_size, assets=None, inputs=None, round_to=None, slide=1)[source]¶
Creates a sliding count-based window over incoming data.
- Parameters:
count_size (
int) – Number of records per window.assets (
Optional[List[str]]) – Optional list of asset names to filter on.inputs (
Optional[List[str]]) – Optional list of input names (data streams) to include as columns in the window.round_to (
Optional[timedelta]) – Optional interval to which window boundaries are aligned.slide (
int) – Number of records to slide the window forward on each update.
- Returns:
An instance configured with the given parameters.
- Return type:
- stream(fn=None, *, assets=None, inputs=None)[source]¶
Register a per-message stream handler that is invoked for each incoming AssetDataMessage matching the optional assets/inputs filters.
Usage patterns:
@app.stream() async def my_stream(msg: AssetDataMessage): …
@app.stream(inputs=[“in1”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) async def my_stream(msg: AssetDataMessage): …
@app.stream(assets=[“asset1”], inputs=[“in1”, “in2”]) def my_stream(msg: AssetDataMessage): …
def my_stream(msg: AssetDataMessage): … app.stream(my_stream, assets=[“asset1”], inputs=[“in1”])
The registered stream runs as an app task when the app connects.
- task(fn=None, *, name=None)[source]¶
Register a function as a task, either sync or async.
This method acts as both a decorator and a decorator factory. It supports the following usage patterns:
@app.task async def my_async_task(…): …
@app.task() def my_sync_task(…): …
@app.task(name=”custom.task.name”) def another_task(…): …
app.task(some_function)
- Parameters:
fn (
Union[Callable[[ParamSpec(P, bound=None)],TypeVar(R)],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]],None]) – The function to register. Can be sync or async. If not provided, a decorator is returned.name (
Optional[str]) – Optional name to register the task under. If not provided, the fully-qualified function name is used.
- Return type:
Union[Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]],Callable[[Union[Callable[[ParamSpec(P, bound=None)],TypeVar(R)],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]]]],Callable[[ParamSpec(P, bound=None)],Awaitable[TypeVar(R)]]]]- Returns:
If fn is provided, returns the async-compatible task wrapper. If fn is None, returns a decorator that can be applied to a function.
- timer(fn=None, *, interval, name=None)[source]¶
- Overloads:
self, fn (TaskFunc[[], Any]), interval (float), name (Optional[str]) → AsyncFunc[[], Any]
self, interval (float), name (Optional[str]) → Callable[[TaskFunc[[], Any]], AsyncFunc[[], Any]]
- Parameters:
- Return type:
Callable[[], Awaitable[Any]] | Callable[[Callable[[], Any] | Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]
Register a function to be called at a repeating interval.
Usage patterns:
@app.timer(interval=5) def foo(): …
@app.timer(interval=5, name=”my timer”) async def foo(): …
def bar(): … app.timer(bar, interval=10) app.timer(bar, interval=10, name=”bar.timer”)
- async run_forever()[source]¶
Connects to the service and then waits indefinitely until cancelled. On cancellation, disconnects cleanly before propagating the cancellation.
- Return type:
- run()[source]¶
Synchronous entry point: - Starts an asyncio event loop to run run_forever(). - Blocks until run_forever() completes or is cancelled. - Allows Ctrl-C (KeyboardInterrupt) to stop cleanly.
- Return type:
- msg_is_control_change(msg)[source]¶
- Return type:
TypeGuard[AssetDataMessage]- Parameters:
msg (Message)
API Client¶
Bases:
ExceptionRaised when attempting to use an unavailable API client.
- kelvin.application.api_client.initialize_api_client()[source]¶
Validate required environment vars are available to create the API client Returns UnavailableApiClient mock client if requirements are not met.
- Returns:
The instantiated API client or an UnavailableApiClient if requirements are not met
- Return type:
AsyncClient
Config¶
Filters¶
Message filters for Kelvin applications.
This module provides filter functions for routing and filtering Kelvin messages based on their type, resource, or content. Filters return TypeGuard predicates that can be used with KelvinApp.filter() or KelvinApp.stream_filter().
Example
>>> from kelvin.application import filters
>>> queue = app.filter(filters.is_asset_data_message)
>>> async for msg in app.stream_filter(filters.asset_equals("my-asset")):
... print(msg)
- kelvin.application.filters.is_asset_data_message(msg)[source]¶
Check if the message is an Asset Data Message.
An Asset Data Message has a KRNAssetDataStream resource and a data type.
- Parameters:
msg (
Message) – The message to check.- Return type:
TypeGuard[AssetDataMessage]- Returns:
True if the message is an Asset Data Message.
- kelvin.application.filters.is_data_message(msg)[source]¶
Check if the message contains data (has a data type).
- kelvin.application.filters.is_control_status_message(msg)[source]¶
Check if the message is a Control Change Status.
- Return type:
TypeGuard[ControlChangeStatus]- Parameters:
msg (Message)
- kelvin.application.filters.resource_equals(resource)[source]¶
Create a filter that matches messages with the specified resource(s).
- kelvin.application.filters.input_equals(data)[source]¶
Create a filter that matches Asset Data Messages with the specified datastream(s).
- kelvin.application.filters.asset_equals(asset)[source]¶
Create a filter that matches messages for the specified asset(s).
- kelvin.application.filters.is_custom_action(msg)[source]¶
Check if the message is a Custom Action Message.
- Return type:
TypeGuard[CustomAction]- Parameters:
msg (Message)
- kelvin.application.filters.is_asset_data_quality_message(msg)[source]¶
Check if the message is an Asset Data Quality Message.
- Return type:
TypeGuard[AssetDataQualityMessage]- Parameters:
msg (Message)
- kelvin.application.filters.is_asset_data_stream_quality_message(msg)[source]¶
Check if the message is an Asset Data Stream Data Quality Message.
- Return type:
TypeGuard[AssetDataStreamDataQualityMessage]- Parameters:
msg (Message)
- kelvin.application.filters.is_data_quality_message(msg)[source]¶
Check if the message is a Data Quality Message.
- Return type:
TypeGuard[AssetDataQualityMessage|AssetDataStreamDataQualityMessage]- Parameters:
msg (Message)
Stream¶
Kelvin Stream interface for message communication.
This module provides the low-level stream interface for connecting to and communicating with the Kelvin platform via TCP sockets.
- Main Components:
StreamInterface: Abstract base class defining the stream protocol. KelvinStream: Concrete implementation for Kelvin platform communication. KelvinStreamConfig: Configuration settings for the stream connection.
- class kelvin.application.stream.StreamInterface[source]¶
Bases:
ABCAbstract interface for a connection to a Kelvin system.
This class defines the protocol for stream-based communication with the Kelvin platform. Implementations must provide connect, disconnect, read, and write operations.
- class kelvin.application.stream.KelvinStreamConfig(_case_sensitive=None, _nested_model_default_partial_update=None, _env_prefix=None, _env_prefix_target=None, _env_file=PosixPath('.'), _env_file_encoding=None, _env_ignore_empty=None, _env_nested_delimiter=None, _env_nested_max_split=None, _env_parse_none_str=None, _env_parse_enums=None, _cli_prog_name=None, _cli_parse_args=None, _cli_settings_source=None, _cli_parse_none_str=None, _cli_hide_none_type=None, _cli_avoid_json=None, _cli_enforce_required=None, _cli_use_class_docs_for_groups=None, _cli_exit_on_error=None, _cli_prefix=None, _cli_flag_prefix_char=None, _cli_implicit_flags=None, _cli_ignore_unknown_args=None, _cli_kebab_case=None, _cli_shortcuts=None, _secrets_dir=None, _build_sources=None, **values)[source]¶
Bases:
BaseSettingsConfiguration for the Kelvin Stream connection.
- Parameters:
_case_sensitive (bool | None)
_nested_model_default_partial_update (bool | None)
_env_prefix (str | None)
_env_prefix_target (EnvPrefixTarget | None)
_env_file (DotenvType | None)
_env_file_encoding (str | None)
_env_ignore_empty (bool | None)
_env_nested_delimiter (str | None)
_env_nested_max_split (int | None)
_env_parse_none_str (str | None)
_env_parse_enums (bool | None)
_cli_prog_name (str | None)
_cli_settings_source (CliSettingsSource[Any] | None)
_cli_parse_none_str (str | None)
_cli_hide_none_type (bool | None)
_cli_avoid_json (bool | None)
_cli_enforce_required (bool | None)
_cli_use_class_docs_for_groups (bool | None)
_cli_exit_on_error (bool | None)
_cli_prefix (str | None)
_cli_flag_prefix_char (str | None)
_cli_implicit_flags (bool | Literal['dual', 'toggle'] | None)
_cli_ignore_unknown_args (bool | None)
_cli_kebab_case (bool | Literal['all', 'no_enums'] | None)
_secrets_dir (PathType | None)
_build_sources (tuple[tuple[PydanticBaseSettingsSource, ...], dict[str, Any]] | None)
ip (str)
port (int)
limit (int)
- ip¶
The IP address of the Kelvin stream server. Default: “127.0.0.1”.
- port¶
The port number for the stream connection. Default: 49167.
- limit¶
Maximum buffer size in bytes for the stream reader. Default: 4MB.
- Environment Variables:
KELVIN_STREAM_IP: Override the IP address. KELVIN_STREAM_PORT: Override the port number. KELVIN_STREAM_LIMIT: Override the buffer limit.
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'KELVIN_STREAM_', 'env_prefix_target': 'variable', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class kelvin.application.stream.KelvinStream(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304))[source]¶
Bases:
StreamInterfaceTCP stream implementation for Kelvin platform communication.
This class manages a persistent TCP connection to the Kelvin stream server, handling message serialization/deserialization and connection lifecycle.
- Parameters:
config (KelvinStreamConfig)
- _config¶
The stream configuration settings.
- _reader¶
The async stream reader (None when disconnected).
- _writer¶
The async stream writer (None when disconnected).
- __init__(config=KelvinStreamConfig(ip='127.0.0.1', port=49167, limit=4194304))[source]¶
Initialize the Kelvin Stream.
- Parameters:
config (
KelvinStreamConfig) – Stream configuration settings. Uses defaults if not provided.- Return type:
None
- async connect()[source]¶
Connect to the Kelvin Stream server.
- Raises:
ConnectionError – If the stream server is unreachable.
- Return type:
- async read()[source]¶
Reads the next Kelvin Message
- Raises:
ConnectionError – When connection is unavailable.
- Returns:
the read Message
- Return type:
- async write(msg)[source]¶
Writes a Message to the Kelvin Stream
- Parameters:
msg (
Message) – Kelvin message to write- Raises:
ConnectionError – If the connection is lost.
- Returns:
True if the message was sent with success.
- Return type:
Timer¶
Async timer with drift correction.
This module provides a Timer class for executing periodic tasks with automatic clock drift correction to maintain consistent intervals.
- class kelvin.application.timer.Timer(interval, name, max_drift_correction=0.1)[source]¶
Bases:
objectA repeating async timer that corrects clock drift.
The Timer automatically adjusts sleep intervals to compensate for execution time and system clock drift, ensuring consistent timing over extended periods.
- interval¶
The target interval between timer fires in seconds.
- name¶
A descriptive name for the timer (used in logging).
- max_drift¶
Maximum drift before logging a warning.
- min_interval¶
Minimum allowed sleep interval.
- max_interval¶
Maximum allowed sleep interval.
- iteration¶
Number of times the timer has fired.
- overlaps¶
Number of times execution exceeded the interval.
Example
>>> timer = Timer(interval=5.0, name="my-timer") >>> async for _ in timer: ... await do_periodic_work()
Version¶
Window¶
- kelvin.application.window.quantize_time(dt, step=None, *, mode='nearest', anchor=None)[source]¶
Quantize a datetime to a fixed step.
Accepts naive datetimes and treats them as UTC.
Uses integer microseconds to avoid floating-point rounding issues.
If step is None, return the input unchanged.
Parameters¶
- dtdatetime
Timestamp to quantize. Naive values are interpreted as UTC.
- steptimedelta or None
Quantization step. When None, no quantization is applied.
- mode{“nearest”, “floor”, “ceil”}
Rounding mode. Defaults to “nearest”.
- anchordatetime or None
Grid anchor. When None, use the Unix epoch at UTC.
Returns¶
- datetime
Quantized datetime with the same tzinfo semantics as input, normalized to UTC if naive.
- rtype:
- kelvin.application.window.round_nearest_time(dt, round_to=None)[source]¶
Backwards-compatible wrapper that rounds to the nearest tick.
- class kelvin.application.window.BaseWindow(assets, inputs, queue, *, round_to=None, buffer_size=None)[source]¶
Bases:
objectManage per-asset in-memory buffers and DataFrames for streaming data.
Design¶
Each asset has a deque of (timestamp, {stream: payload}). The deque can be bounded by buffer_size.
Timestamps are normalized to UTC and optionally quantized.
DataFrames are per asset, indexed by UTC timestamps, with columns equal to inputs.
Duplicate timestamps within a buffer are merged into a single row before appending.
- Parameters:
assets (List[str])
inputs (List[str])
queue (asyncio.Queue[AssetDataMessage])
round_to (Optional[timedelta])
buffer_size (Optional[int])
- class kelvin.application.window.BaseTimeWindow(assets, inputs, queue, window_size, hop_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None)[source]¶
Bases:
BaseWindowTime-based windowing of data streams.
Parameters¶
- assetslist[str]
Asset identifiers to include.
- inputslist[str]
Data stream identifiers to include as DataFrame columns.
- queueasyncio.Queue[AssetDataMessage]
Source of incoming messages.
- window_sizetimedelta
Duration of each window.
- hop_sizetimedelta
Time between the start of consecutive windows.
- round_totimedelta or None
Optional timestamp quantization step.
- align_steptimedelta or None
Optional alignment step for initial window start. If None, no extra alignment.
- allowed_latenesstimedelta or None
If provided, accept events older than window_start by up to this amount.
- buffer_sizeint or None
Optional bound for per-asset buffer deques.
Notes¶
Naive window_start values are interpreted as UTC.
Message timestamps that are naive are also treated as UTC.
- async stream(window_start=None)[source]¶
Continuously emit windowed DataFrames per asset.
Behavior¶
Align the first emission to window_start + window_size.
Sleep until that first boundary, then drain and emit.
Continue emitting every hop_size using a Timer.
Parameters¶
- window_startdatetime or None
Start of the first window. When None, use current UTC time. Naive values are interpreted as UTC. The start is optionally aligned by align_step.
- Parameters:
assets (List[str])
inputs (List[str])
queue (asyncio.Queue[AssetDataMessage])
window_size (timedelta)
hop_size (timedelta)
round_to (Optional[timedelta])
align_step (Optional[timedelta])
allowed_lateness (Optional[timedelta])
buffer_size (Optional[int])
- class kelvin.application.window.TumblingWindow(assets, inputs, queue, window_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None)[source]¶
Bases:
BaseTimeWindowNon-overlapping time windows where hop_size equals window_size.
- Parameters:
assets (List[str])
inputs (List[str])
queue (asyncio.Queue[AssetDataMessage])
window_size (timedelta)
round_to (Optional[timedelta])
align_step (Optional[timedelta])
allowed_lateness (Optional[timedelta])
buffer_size (Optional[int])
- class kelvin.application.window.HoppingWindow(assets, inputs, queue, window_size, hop_size, *, round_to=None, align_step=None, allowed_lateness=None, buffer_size=None)[source]¶
Bases:
BaseTimeWindowOverlapping time windows when hop_size is smaller than window_size.
- Parameters:
assets (List[str])
inputs (List[str])
queue (asyncio.Queue[AssetDataMessage])
window_size (timedelta)
hop_size (timedelta)
round_to (Optional[timedelta])
align_step (Optional[timedelta])
allowed_lateness (Optional[timedelta])
buffer_size (Optional[int])
- class kelvin.application.window.RollingWindow(assets, inputs, queue, count_size, *, slide=1, round_to=None, buffer_size=None)[source]¶
Bases:
BaseWindowRolling window based on a fixed count of messages per asset.
Notes¶
The internal per-asset deque is bounded to prevent unbounded growth.
The DataFrame yielded for an asset represents the last count_size items after merging duplicate timestamps inside that slice.
- Parameters:
assets (List[str])
inputs (List[str])
queue (asyncio.Queue[AssetDataMessage])
count_size (int)
slide (int)
round_to (Optional[timedelta])
buffer_size (Optional[int])