Handling data
Applications rely on data to produce value.
To receive data, applications need to actively subscribe to it.
This can be done by specifying the inputs and outputs settings in the app.yaml configuration file.
Specifying the data types¶
Prior to code development part, it is necessary to specify which data types are going to be used in the application.
More information on how to create and integrate data types:
Check the Creating and including data types section.
Let's consider the following use case: An application that receives a temperature and outputs its doubled value.
Naturally, this value should be a floating point value.
Considering raw floating values are provided by default, there is no need to declare them in the app.yaml.
app:
kelvin:
data_types: []
language:
python:
entry_point: my_app.my_app:App
requirements: requirements.txt
type: python
type: kelvin
info:
description: my-app
name: my-app
title: my-app
version: 1.0.0
spec_version: 2.0.0
Inputs and Outputs¶
Once the data types are specified, the input and output definitions come next.
The principle is simple, declare your data types:
- declare
temperatureas araw.float32input. - declare
doubled_temperatureas araw.float32output.
app:
kelvin:
data_types: [ ]
inputs:
- data_type: raw.float32
name: temperature
sources:
- asset_names: [ emulation ]
workload_names: [ source_app ]
outputs:
- data_type: raw.float32
name: doubled_temperature
targets:
- asset_names: [ output_asset ]
language:
python:
entry_point: my_app.my_app:App
requirements: requirements.txt
type: python
type: kelvin
info:
description: my-app
name: my-app
title: my-app
version: 1.0.0
spec_version: 2.0.0
Code¶
Simple data consumption¶
To consume the expected inputs and outputs, the user should simply target the self.data variable.
Considering the application is expecting a metric called temperature. To access its value, the user needs only to type self.data.temperature
The application code would look like this:
from kelvin.app import DataApplication
from kelvin.sdk.datatype import Float32
class App(DataApplication):
"""Application."""
def init(self) -> None:
"""
Initialisation method
"""
# Custom initialisation logic
def process(self):
temperature = self.data.temperature
if temperature:
print(f"Received new temperature value: {temperature}")
doubled_temperature = temperature.value * 2
print(f"Emitting doubled temperature value: {doubled_temperature_object}")
message = self.make_message(
"raw.float32",
"doubled_temperature",
_asset_name="emulation",
value=3.14
)
self.emit(message)
Access message context and headers¶
- Each message carries a header that contains context information such as:
- type - The type of the incoming block of data
- name - The name of the channel the data is coming from
- asset_name - The asset from where the data originates
- source - Endpoint details (node and workload) for the source
- target - Endpoint details (node and workload) for the target
To access this header, the user needs only to refer to the property .header
For example, self.data.temperature.header will display all information on the temperature metric.
from kelvin.app import DataApplication
from kelvin.sdk.datatype import Float32
class App(DataApplication):
"""Application."""
def init(self) -> None:
"""
Initialisation method
"""
# Custom initialisation logic
def process(self):
temperature = self.data.temperature
asset_name = temperature.header.asset_name
if temperature:
print(f"Received new temperature value: {temperature}")
doubled_temperature = temperature.value * 2
print(f"Emitting doubled temperature value: {doubled_temperature_object}")
message = self.make_message(
name="doubled_temperature",
value=doubled_temperature,
_asset_name="output_asset", # the asset can be changed here
)
Warning
In order to emit messages with a custom asset, that same asset must be declared in the variable's output assets.
Next, all is left to do is to build the application and test it on the emulation system.