Skip to content

Timeseries Data

Timeseries Data Messages

Timeseries data from Assets can be served to the Kelvin SmartApp™ in multiple ways, depending on the preferred data consumption method.

Overview

Data can be consumed primarily in two ways:

  1. Asynchronous Consumption (async): This approach pauses program execution until new data arrives, resuming only when data is available. To enhance efficiency and reduce unnecessary executions, filters can also be applied to limit the types of data that trigger events.

  2. Callback Consumption: In this method, a callback function allows the program to continue performing other tasks while waiting for new data, enabling non-blocking execution suitable for concurrent operations.

Each method provides flexible and efficient handling of Asset timeseries data, tailored to specific application requirements and performance needs.

In order to consume streaming data, the inputs section needs to filled upon the app.yaml.

Note

That means that Kelvin SmartApps™ is only going to be able to consume the specified inputs. i.e.:

app:
  type: kelvin
  kelvin:

    inputs:
        - data_type: number
          name: motor_temperature

        - data_type: string
          name: motor_state

inputs is an array (list) composed by two fields:

  • A unique name to identify the input. This will be used in the Python code to reference the input. It must contain only lowercase alphanumeric characters. The characters ., _ and - are allowed to separate words instead of a space BUT can not be at the beginning or end of the name.
  • An expected data type, which can be: number, boolean or string.

Now that we've defined Kelvin SmartApps™ inputs, there are a few ways to consume them within the Kelvin Python library.

Streams are a different way of filtering inputs as a Python Async Generator, also based upon a filter function.

Info

Different Data filters are available within the filters class, such as input_equals(input: str), resource_equals(resource: KRN) and asset_equals(asset: str). On the following example we're gonna use the most common and expected use case (input_equals(input: str)).

Filters to limit which inputs are monitored can be expressed as filters.input_equals(input: str).

import asyncio
from typing import AsyncGenerator # AsyncGenerator import

from kelvin.application import KelvinApp, filters # filters import
from kelvin.message import Number # Number (Input Type) import


async def main() -> None:
    app = KelvinApp()

    # Create a Filtered Stream with Temperature (Number) Input Messages
    motor_temperature_msg_stream: AsyncGenerator[Number, None] = app.stream_filter(filters.input_equals("motor_temperature"))

    await app.connect()

    # Wait & Read new Temperature Inputs
    async for motor_temperature_msg in motor_temperature_msg_stream:
        print("Received Motor Temperature: ", motor_temperature_msg)


if __name__ == "__main__":
    asyncio.run(main())

Filters can be used to filter a specific subset of the Kelvin Inputs as a Python Message Queue based upon a filter function.

Info

Different Data filters are available within the filters class, such as input_equals(input: str), resource_equals(resource: KRN) and asset_equals(asset: str). On the following example we're gonna use the most common and expected use case (input_equals(input: str)).

Filters to limit which inputs are monitored can be expressed as filters.input_equals(input: str).

import asyncio
from asyncio import Queue # Queue import

from kelvin.application import KelvinApp, filters # filters import
from kelvin.message import Number # Number (Input Type) import


async def main() -> None:
    app = KelvinApp()

    # Create a Filtered Queue with Temperature (Number) Input Messages
    motor_temperature_msg_queue: Queue[Number] = app.filter(filters.input_equals("motor_temperature"))

    await app.connect()

    while True:
        # Wait & Read new Temperature Inputs
        motor_temperature_msg = await motor_temperature_msg_queue.get()

        print("Received Motor Temperature: ", motor_temperature_msg)


if __name__ == "__main__":
    asyncio.run(main())

The callback on_asset_input can be used to read every input flowing into Kelvin SmartApps™:

import asyncio

from kelvin.application import KelvinApp
from kelvin.message.primitives import AssetDataMessage # AssetDataMessage import


# on_asset_input Callback
async def on_asset_input(msg: AssetDataMessage):
    print("Received Data Message: ", msg)

    # Get Asset and Value
    asset = msg.resource.asset
    value = msg.payload

async def main() -> None:
    app = KelvinApp()
    # Set on_asset_input Callback
    app.on_asset_input = on_asset_input

    await app.connect()

    # [Required] Forces Kelvin SmartApps™ to run forever
    while True:
        await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())