Timeseries Data
Timeseries Data Messages
Timeseries data from Assets can be served to the Kelvin SmartApp™ in multiple ways, depending on the preferred data consumption method.
Data can be consumed primarily in two ways:
-
Asynchronous Consumption (
async): This approach pauses program execution until new data arrives, resuming only when data is available. To enhance efficiency and reduce unnecessary executions, filters can also be applied to limit the types of data that trigger events. -
Callback Consumption: In this method, a callback function allows the program to continue performing other tasks while waiting for new data, enabling non-blocking execution suitable for concurrent operations.
Each method provides flexible and efficient handling of Asset timeseries data, tailored to specific application requirements and performance needs.
In order to consume streaming data, the inputs section needs to filled upon the app.yaml.
Note
That means that Kelvin SmartApps™ is only going to be able to consume the specified inputs. i.e.:
app:
type: kelvin
kelvin:
inputs:
- data_type: number
name: motor_temperature
- data_type: string
name: motor_state
inputs is an array (list) composed by two fields:
- A unique name to identify the input. This will be used in the Python code to reference the input. It must contain only lowercase alphanumeric characters. The characters
.,_and-are allowed to separate words instead of a space BUT can not be at the beginning or end of the name. - An expected data type, which can be:
number,booleanorstring.
Now that we've defined Kelvin SmartApps™ inputs, there are a few ways to consume them within the Kelvin Python library.
Streams are a different way of filtering inputs as a Python Async Generator, also based upon a filter function.
Info
Different Data filters are available within the filters class, such as input_equals(input: str), resource_equals(resource: KRN) and asset_equals(asset: str). On the following example we're gonna use the most common and expected use case (input_equals(input: str)).
Filters to limit which inputs are monitored can be expressed as filters.input_equals(input: str).
import asyncio
from typing import AsyncGenerator # AsyncGenerator import
from kelvin.application import KelvinApp, filters # filters import
from kelvin.message import Number # Number (Input Type) import
async def main() -> None:
app = KelvinApp()
# Create a Filtered Stream with Temperature (Number) Input Messages
motor_temperature_msg_stream: AsyncGenerator[Number, None] = app.stream_filter(filters.input_equals("motor_temperature"))
await app.connect()
# Wait & Read new Temperature Inputs
async for motor_temperature_msg in motor_temperature_msg_stream:
print("Received Motor Temperature: ", motor_temperature_msg)
if __name__ == "__main__":
asyncio.run(main())
Filters can be used to filter a specific subset of the Kelvin Inputs as a Python Message Queue based upon a filter function.
Info
Different Data filters are available within the filters class, such as input_equals(input: str), resource_equals(resource: KRN) and asset_equals(asset: str). On the following example we're gonna use the most common and expected use case (input_equals(input: str)).
Filters to limit which inputs are monitored can be expressed as filters.input_equals(input: str).
import asyncio
from asyncio import Queue # Queue import
from kelvin.application import KelvinApp, filters # filters import
from kelvin.message import Number # Number (Input Type) import
async def main() -> None:
app = KelvinApp()
# Create a Filtered Queue with Temperature (Number) Input Messages
motor_temperature_msg_queue: Queue[Number] = app.filter(filters.input_equals("motor_temperature"))
await app.connect()
while True:
# Wait & Read new Temperature Inputs
motor_temperature_msg = await motor_temperature_msg_queue.get()
print("Received Motor Temperature: ", motor_temperature_msg)
if __name__ == "__main__":
asyncio.run(main())
The callback on_asset_input can be used to read every input flowing into Kelvin SmartApps™:
import asyncio
from kelvin.application import KelvinApp
from kelvin.message.primitives import AssetDataMessage # AssetDataMessage import
# on_asset_input Callback
async def on_asset_input(msg: AssetDataMessage):
print("Received Data Message: ", msg)
# Get Asset and Value
asset = msg.resource.asset
value = msg.payload
async def main() -> None:
app = KelvinApp()
# Set on_asset_input Callback
app.on_asset_input = on_asset_input
await app.connect()
# [Required] Forces Kelvin SmartApps™ to run forever
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
