Skip to content

Timeseries Data

Timeseries Data Messages

Note

Visit the concept overview for Data Streams to understand how time series data references are constructed using one Asset name and one Data Stream name as a pair.

Timeseries data from Assets can be served to the Kelvin SmartApp™ in multiple ways, depending on the preferred data consumption method.

Overview

Data can be consumed primarily in two ways:

  1. Asynchronous Consumption (async): This approach pauses program execution until new data arrives, resuming only when data is available. To enhance efficiency and reduce unnecessary executions, filters can also be applied to limit the types of data that trigger events.

  2. Callback Consumption: In this method, a callback function allows the program to continue performing other tasks while waiting for new data, enabling non-blocking execution suitable for concurrent operations.

Each method provides flexible and efficient handling of Asset timeseries data, tailored to specific application requirements and performance needs.

In order to consume streaming data, the inputs section needs to filled upon the app.yaml.

Note

That means that Kelvin SmartApps™ is only going to be able to consume the specified inputs. i.e.:

app:
  type: kelvin
  kelvin:

    inputs:
        - data_type: number
          name: motor_temperature

        - data_type: string
          name: motor_state

inputs is an array (list) composed by two fields:

  • A unique name to identify the input. This will be used in the Python code to reference the input. It must contain only lowercase alphanumeric characters. The characters ., _ and - are allowed to separate words instead of a space BUT can not be at the beginning or end of the name.
  • An expected data type, which can be: number, boolean or string.

Now that we've defined Kelvin SmartApps™ inputs, there are a few ways to consume them within the Kelvin Python library.

Streams are a different way of filtering inputs as a Python Async Generator, also based upon a filter function.

Info

Different Data filters are available within the filters class, such as input_equals(input: str), resource_equals(resource: KRN) and asset_equals(asset: str). On the following example we're gonna use the most common and expected use case (input_equals(input: str)).

Filters to limit which inputs are monitored can be expressed as filters.input_equals(input: str).

import asyncio
from typing import AsyncGenerator # AsyncGenerator import

from kelvin.application import KelvinApp, filters # filters import
from kelvin.message import Number # Number (Input Type) import


async def main() -> None:
    app = KelvinApp()

    # Create a Filtered Stream with Temperature (Number) Input Messages
    motor_temperature_msg_stream: AsyncGenerator[Number, None] = app.stream_filter(filters.input_equals("motor_temperature"))

    await app.connect()

    # Wait & Read new Temperature Inputs
    async for motor_temperature_msg in motor_temperature_msg_stream:
        print("Received Motor Temperature: ", motor_temperature_msg)


if __name__ == "__main__":
    asyncio.run(main())

Filters can be used to filter a specific subset of the Kelvin Inputs as a Python Message Queue based upon a filter function.

Info

Different Data filters are available within the filters class, such as input_equals(input: str), resource_equals(resource: KRN) and asset_equals(asset: str). On the following example we're gonna use the most common and expected use case (input_equals(input: str)).

Filters to limit which inputs are monitored can be expressed as filters.input_equals(input: str).

import asyncio
from asyncio import Queue # Queue import

from kelvin.application import KelvinApp, filters # filters import
from kelvin.message import Number # Number (Input Type) import


async def main() -> None:
    app = KelvinApp()

    # Create a Filtered Queue with Temperature (Number) Input Messages
    motor_temperature_msg_queue: Queue[Number] = app.filter(filters.input_equals("motor_temperature"))

    await app.connect()

    while True:
        # Wait & Read new Temperature Inputs
        motor_temperature_msg = await motor_temperature_msg_queue.get()

        print("Received Motor Temperature: ", motor_temperature_msg)


if __name__ == "__main__":
    asyncio.run(main())

The callback on_asset_input can be used to read every input flowing into Kelvin SmartApps™:

import asyncio

from kelvin.application import KelvinApp
from kelvin.message.primitives import AssetDataMessage # AssetDataMessage import


# on_asset_input Callback
async def on_asset_input(msg: AssetDataMessage):
    print("Received Data Message: ", msg)

    # Get Asset and Value
    asset = msg.resource.asset
    value = msg.payload

async def main() -> None:
    app = KelvinApp()
    # Set on_asset_input Callback
    app.on_asset_input = on_asset_input

    await app.connect()

    # [Required] Forces Kelvin SmartApps™ to run forever
    while True:
        await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

Event Callbacks

Concepts Made Simple

A callback in Python is like a WhatsApp notification. Imagine you’re waiting for a message from a friend, but instead of constantly checking your phone, you let WhatsApp notify you when the message arrives. You only get alerted when the specific event—your friend’s message—actually happens.

In Python, a callback works similarly. Instead of constantly checking for updates, the program waits until a certain event occurs, and when it does, the callback function (like the WhatsApp notification) is triggered to perform a specific action.

Event Callbacks are functions that are triggered when certain events happen, such as new time series data is received or a control change is initiated.

Video Tutorial

You can watch this Event Callback video tutorial which will show you how to program and test it on you local machine with the Kelvin test data generator Python script.

Copy the code in the Video Tutorial

In the following chapters after the video tutorial you can see and copy all the scripts shown in the video tutorial.

Written Tutorial

When any new time series data is created for any Asset / Data Stream pair, a callback event is created.

Detailed Explanation of Parameters

Response Parameters

  • id: UUID('db18aaaf-9a70-4c3e-babb-b7571867871f')

    • Description: A unique random generated UUID as the key id for the time series data.
  • type: KMessageTypeData('data', 'pt=number')

    • Options: number, boolean, string
    • Description: The format of the data to expect.
  • trace_id: UUID('db18aaaf-9a70-4c3e-babb-b7571867871f')

  • timestamp: datetime.datetime(2024, 10, 28, 11, 51, 44, 601689, tzinfo=datetime.timezone(datetime.timedelta(seconds=25200), '+07'))

    • Description: The time of recording of the time series data in the local computer's timezone.
  • resource: KRNAssetDataStream(asset='test-asset-1', data_stream='sa-rw-number')

    • Description: The Asset and Data Stream names associated with the time series data.
  • payload: The actual value in the declared type key format.

The full code is given here for the main.py script.

Detailed Code Explanations

Click on the for details about the code

main.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio # (1)!

from kelvin.application import KelvinApp # (2)!
from kelvin.message.primitives import AssetDataMessage # (3)!


async def on_asset_input(msg: AssetDataMessage):  # (4)!

    # Get Asset and Value
    asset = msg.resource.asset
    value = msg.payload

    print("Received Data Message: ", msg)
    print(f"Asset: {asset}, Value: {value}")

async def main() -> None:  # (5)!

    # Initialize the SmartApp
    app = KelvinApp()

    # Define the function to call when event callback is triggered
    app.on_asset_input = on_asset_input

    # Connect the SmartApp with the Kelvin Platform
    await app.connect()

    # Force the Application to run forever
    while True:
        await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())
  1. The asyncio module is used to ensure the program is running in asynchronous mode. This allows code, especially that which typically waits such as I/O-bound operations, to effectively execute in parallel thereby making the most efficient use of processor time.
  2. The KelvinApp class is used to initlize and connect the Kelvin SmartApp™ to the Kelvin Platform.
  3. The AssetDataMessage is a class representing the structure and properties of the asset data messages which will be held in an instance called msg.
  4. The on_asset_input callback function is triggered automatically whenever a new asset / data stream time series data associated with the Kelvin SmartApp™ is received on the platform. It serves as an event handler to process incoming data in real time, ensuring that relevant actions or data updates occur immediately upon receipt.
  5. The main function initializes the Kelvin SmartApp™, connects it to the Kelvin Platform, and configures an event callback to execute a specific function whenever new data arrives from an Asset associated with the Kelvin SmartApp™ on the Kelvin Platform.

When you run this Python script, the following output will be view in the terminal or logs;

Output from Program
Received Data Message:  id=UUID('db18aaaf-9a70-4c3e-babb-b7571867871f') type=KMessageTypeData('data', 'pt=number') trace_id=None source=None timestamp=datetime.datetime(2024, 10, 28, 11, 51, 44, 601689, tzinfo=datetime.timezone(datetime.timedelta(seconds=25200), '+07')) resource=KRNAssetDataStream(asset='test-asset-1', data_stream='sa-rw-number') payload=9.0

Asset: test-asset-1, Value: 9.0

Data Generator for Local SmartApp Testing

Easily test your SmartApp locally on your computer.

Comprehensive documentation is available for the Generator Tool. Click here to learn how to use this event callback script with the Test Generator on the "Test a SmartApp" ⟹ "Generator" page.