Inject and Extract Data
Overview¶
The Kelvin Platform has the ability to inject data into application to test the application logic. This is helpful during development and at various stages for testing purposes. For this tutorial, we will be using a weather application to demonstrate the following:
- Inject data into the app inputs. This app receives temperature & humidity, and emits the mean of the last 10 values for each channel.
- Setting the interval scale to seconds. This accelerates application execution for testing purpose.
- Configuring the logic to receive a buffer (pandas.DataFrame) of the previous 10 seconds.
Prerequisite:
- Complete the Getting Started Guide.
Source Code¶
The source code for this application is available at GitHub.
App Files¶
The following app contain logic to achieve the goals described in the overview section above. Use a code editing tool (e.g Visual Studio code) on your machine, open the weather application folder and open the weather/weather.py. This file contains the application code below.
from kelvin.app import DataApplication
class App(DataApplication):
"""Application."""
# last 10 seconds buffer.
TOPICS = {
'raw.float32.#': {
'target': '{name}',
'storage_type': 'buffer',
'storage_config': {'window': {'seconds': 10}, 'getter': 'value'},
}
}
def init(self) -> None:
"""
Initialisation method
"""
# Custom initialisation logic
def process(self) -> None:
"""Process data."""
# self.logger.info("config", config=self.config)
# self.logger.info("frame", frame=frame)
temperature = self.data.get("temperature", None)
humidity = self.data.get("humidity", None)
self.logger.info("temperature", data=temperature)
self.logger.info("humidity", data=humidity)
frame = self.frame
if not frame.empty:
temperature_mean = frame['temperature'].mean()
humidity_mean = frame['humidity'].mean()
temperature_message = self.make_message(
'raw.float32',
'temperature_mean',
value=temperature_mean,
emit=True
)
self.logger.info("temperature_mean", message=temperature_message)
humidity_message = self.make_message(
'raw.float32',
'humidity_mean',
value=humidity_mean,
emit=True
)
self.logger.info("humidity_mean", message=humidity_message)
app:
kelvin:
language:
python:
entry_point: weather.weather:App
requirements: requirements.txt
type: python
inputs:
- data_type: raw.float32
name: temperature
sources:
- asset_names: [ emulation ]
- data_type: raw.float32
name: humidity
sources:
- asset_names: [ emulation ]
outputs:
- data_type: raw.float32
name: temperature_mean
targets:
- asset_names: [ emulation ]
- data_type: raw.float32
name: humidity_mean
targets:
- asset_names: [ emulation ]
type: kelvin
info:
description: weather
name: weather
title: weather
version: 1.0.0
spec_version: 2.0.0
environment:
node_name: node
workload_name: workload
Inject data¶
Build and Emulate¶
First, it is necessary to build the application and start the emulation.
Note that in the command kelvin emulation start is used the option --show-logs. This way is possible to track the
logs while the app is running.
Run the following commands in Terminal window 1:
kelvin app build
kelvin emulation start --verbose --show-logs
App build output:
[kelvin.sdk][2021-07-07 17:54:49][R] Emulation configuration loaded from: "/home/user/kics-app-examples/kelvin_apps/weather/app.yaml"
[kelvin.sdk][2021-07-07 17:54:50][I] Valid schema available locally. Using cached version (/home/user/.config/kelvin/schemas/2.0.0.json)
[kelvin.sdk][2021-07-07 17:54:53][R] Kelvin Emulation System is online
[kelvin.sdk][2021-07-07 17:54:53][I] Loading configuration and starting the application "weather:1.0.0"
[kelvin.sdk][2021-07-07 17:54:53][I] Valid schema available locally. Using cached version (/home/user/.config/kelvin/schemas/2.0.0.json)
[kelvin.sdk][2021-07-07 17:54:56][R] Overriding existing configuration with "/home/user/kics-app-examples/kelvin_apps/weather/app.yaml"
[kelvin.sdk][2021-07-07 17:54:56][R] Application successfully launched: "weather:1.0.0"
[ApplicationEngine.cpp: 26:I] - Loading configuration file: /opt/kelvin/app/app.yaml
[ Pipeline.cpp: 73:I] - Loading shared object: ../core/lib/Client.so (weather) - SUCCESS
[ Pipeline.cpp: 188:I] - Loading weather Refinery input modules:
[ Pipeline.cpp: 193:I] - Loading weather Refinery output modules:
[ Pipeline.cpp: 198:I] - Loading weather Gatekeeper input modules
[ Pipeline.cpp: 203:I] - Loading weather Gatekeeper output modules
[ StorageManager.cpp: 27:I] - Using eXtremeDB storage backend
[ExtremeDbBackend.cpp: 277:I] - [config] Commit policy: MCO_COMMIT_SYNC_FLUSH // Transaction log type: REDO_LOG
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 0 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_DATABASE // Size: 2097152 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 1 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_CACHE // Size: 8388608 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 2 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_PERSISTENT // Filename: ./storage/kelvindb_v3.dbs // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 3 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_LOG // Filename: ./storage/kelvindb_v3.log // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 258:I] - eXtremeDB is running in PAID LICENSE MODE
[ExtremeDbBackend.cpp: 260:I] - eXtremeDB version: interim build eXtremeDB_8.1_1800, rev.25973
[ExtremeDbBackend.cpp: 30:I] - eXtremeDB started successfully.
[ UploadManager.cpp: 28:I] - The UploadManager's worker will not start because it's either disabled or has no address configured
[ Poller.cpp: 70:I] - Successful poller: 0.100000 secs (fd: 7)
[ Poller.cpp: 70:I] - Successful poller: 1.000000 secs (fd: 8)
[ RPCApplication.cpp: 72:I] - Weather App initialized.
Inject¶
Open second terminal window and navigate to the app's folder. The file data/data.csv will be used with the inject command.
This file contains two columns, one for each app input in the weather app.
| temperature | humidity |
|---|---|
| 14 | 40 |
| 14 | 40 |
| 16 | 42 |
| 12 | 50 |
| 13 | 55 |
| ... |
Each column contains a header with the input name. This name must match with app.yamls inputs and connection inputs.
app:
kelvin:
core:
inputs:
- data_type: raw.float32
name: temperature
sources:
- asset_names: [ emulation ]
- data_type: raw.float32
name: humidity
sources:
- asset_names: [ emulation ]
...
Run the following command and check the terminal ouput:
ksdk emulation inject data/data.csv --period=1 --repeat --app-name weather:1.0.0 --workload-name workload --node-name node
Emulation inject output: (Terminal window 2)
...
[kelvin.sdk][2021-07-07 18:32:46][I] Building new image for "injector:0.0.0". Please wait..
[kelvin.sdk][2021-07-07 18:34:45][R] Image successfully built: "injector:0.0.0"
[kelvin.sdk][2021-07-07 18:34:46][R] Kelvin Emulation System is online
[kelvin.sdk][2021-07-07 18:34:46][I] Loading configuration and starting the application "injector"
[kelvin.sdk][2021-07-07 18:34:47][R] Application successfully launched: "injector"
[ApplicationEngine.cpp: 26:I] - Loading configuration file: /opt/kelvin/app/app.yaml
[ Pipeline.cpp: 73:I] - Loading shared object: ../core/lib/Client.so (injector) - SUCCESS
[ Pipeline.cpp: 188:I] - Loading injector Refinery input modules:
[ Pipeline.cpp: 193:I] - Loading injector Refinery output modules:
[ Pipeline.cpp: 198:I] - Loading injector Gatekeeper input modules
[ Pipeline.cpp: 203:I] - Loading injector Gatekeeper output modules
[ StorageManager.cpp: 27:I] - Using eXtremeDB storage backend
[ExtremeDbBackend.cpp: 277:I] - [config] Commit policy: MCO_COMMIT_SYNC_FLUSH // Transaction log type: REDO_LOG
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 0 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_DATABASE // Size: 2097152 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 1 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_CACHE // Size: 8388608 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 2 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_PERSISTENT // Filename: ./storage/kelvindb_v3.dbs // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 3 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_LOG // Filename: ./storage/kelvindb_v3.log // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 258:I] - eXtremeDB is running in PAID LICENSE MODE
[ExtremeDbBackend.cpp: 260:I] - eXtremeDB version: interim build eXtremeDB_8.1_1800, rev.25973
[ExtremeDbBackend.cpp: 30:I] - eXtremeDB started successfully.
[ UploadManager.cpp: 28:I] - The UploadManager's worker will not start because it's either disabled or has no address configured
[ Poller.cpp: 70:I] - Successful poller: 0.100000 secs (fd: 7)
[ Poller.cpp: 70:I] - Successful poller: 1.000000 secs (fd: 8)
[ RPCApplication.cpp: 72:I] - injector initialize.
2021-07-07T17:34:50.226444Z [info ] Loading data [injector_app] asset_name=None data_type=None last_process_time=1625679290.2255256 path=/opt/kelvin/app/data/data.csv
2021-07-07T17:34:50.240552Z [info ] Emitting data [injector_app] asset_name=None last_process_time=1625679290.2255256 path=/opt/kelvin/app/data/data.csv
Check the weather application logs. It is possible to see that the logs for the latest values received for the
temperature and humidity inputs.
App logs: (Terminal window 1)
...
... temperature [app] asset_id=None data=DataBuffer(window=10000000000, count=None, data=[<Float32>(value=14.0)]) last_process_time=1611075408.755
... humidity [app] asset_id=None data=DataBuffer(window=10000000000, count=None, data=[<Float32>(value=14.0)]) last_process_time=1611075408.755
... temperature_mean [app] asset_id=None last_process_time=1611075408.755 message=<Float32>(value=14.0)
... humidity_men [app] asset_id=None last_process_time=1611075408.755 message=<Float32>(value=40.0)
... temperature [app] asset_id=None data=DataBuffer(window=10000000000, count=None, data=[<Float32>(value=14.0), <Float32>(value=16.0)]) last_process_time=1611075409.759
... humidity [app] asset_id=None data=DataBuffer(window=10000000000, count=None, data=[<Float32>(value=14.0), <Float32>(value=16.0)]) last_process_time=1611075409.759
... temperature_mean [app] asset_id=None last_process_time=1611075409.759 message=<Float32>(value=15.0)
... humidity_men [app] asset_id=None last_process_time=1611075409.759 message=<Float32>(value=41.0)
Extract¶
With the extract command is possible to get data from the app's outputs. The first command argument is the app name with
the version. Ex: weather:1.0.0
The content is stored as .parquet files in the output inside the folder defined with the option --output-dir.
The extractor logic reads and saves the date from the outputs defined in the app.yaml.
app:
kelvin:
core:
inputs:
...
outputs:
- data_model: raw.float32
name: temperature_mean
targets:
- asset_names: [ emulation ]
- data_model: raw.float32
name: humidity_mean
targets:
- asset_names: [ emulation ]
...
Open Terminal window 3, and run the following command and check the output:
kelvin emulation extract weather:1.0.0 --output-dir data
Emulation extract logs:
[kelvin.sdk][2021-07-07 21:04:45][R]
Emulation configurations applied to container:
└── Volumes:
- "/opt/kelvin/app/data" connected to "/home/user/kics-app-examples/kelvin_apps/weather/data" (container -> host)
[kelvin.sdk][2021-07-07 21:04:46][R] Application successfully launched: "extractor"
[ApplicationEngine.cpp: 26:I] - Loading configuration file: /opt/kelvin/app/app.yaml
[ Pipeline.cpp: 73:I] - Loading shared object: ../core/lib/Client.so (extractor) - SUCCESS
[ Pipeline.cpp: 188:I] - Loading extractor Refinery input modules:
[ Pipeline.cpp: 193:I] - Loading extractor Refinery output modules:
[ Pipeline.cpp: 198:I] - Loading extractor Gatekeeper input modules
[ Pipeline.cpp: 203:I] - Loading extractor Gatekeeper output modules
[ StorageManager.cpp: 27:I] - Using eXtremeDB storage backend
[ExtremeDbBackend.cpp: 277:I] - [config] Commit policy: MCO_COMMIT_SYNC_FLUSH // Transaction log type: REDO_LOG
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 0 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_DATABASE // Size: 2097152 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 1 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_CACHE // Size: 8388608 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 2 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_PERSISTENT // Filename: ./storage/kelvindb_v3.dbs // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 3 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_LOG // Filename: ./storage/kelvindb_v3.log // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 258:I] - eXtremeDB is running in PAID LICENSE MODE
[ExtremeDbBackend.cpp: 260:I] - eXtremeDB version: interim build eXtremeDB_8.1_1800, rev.25973
[ExtremeDbBackend.cpp: 30:I] - eXtremeDB started successfully.
[ UploadManager.cpp: 28:I] - The UploadManager's worker will not start because it's either disabled or has no address configured
[ Poller.cpp: 70:I] - Successful poller: 0.100000 secs (fd: 7)
[ Poller.cpp: 70:I] - Successful poller: 1.000000 secs (fd: 8)
[ RPCApplication.cpp: 72:I] - extractor initialize.
2021-07-07T20:04:47.893801Z [info ] Fields:\n - cutoff: timestamp[ns]\n - time: timestamp[ns]\n - temperature_mean.value: double\n - humidity_mean.value: double [extractor_app] asset_name=None last_process_time=0.0
Check the .parquet files data/output folder.