Skip to content

Inject and Extract Data

Overview

The Kelvin Platform has the ability to inject data into application to test the application logic. This is helpful during development and at various stages for testing purposes. For this tutorial, we will be using a weather application to demonstrate the following:

  • Inject data into the app inputs. This app receives temperature & humidity, and emits the mean of the last 10 values for each channel.
  • Setting the interval scale to seconds. This accelerates application execution for testing purpose.
  • Configuring the logic to receive a buffer (pandas.DataFrame) of the previous 10 seconds.

Prerequisite:

  1. Complete the Getting Started Guide.

Source Code

The source code for this application is available at GitHub.

App Files

The following app contain logic to achieve the goals described in the overview section above. Use a code editing tool (e.g Visual Studio code) on your machine, open the weather application folder and open the weather/weather.py. This file contains the application code below.

from kelvin.app import DataApplication

class App(DataApplication):
    """Application."""

    # last 10 seconds buffer.
    TOPICS = {
        'raw.float32.#': {
            'target': '{name}',
            'storage_type': 'buffer',
            'storage_config': {'window': {'seconds': 10}, 'getter': 'value'},
        }
    }

    def init(self) -> None:
        """
        Initialisation method
        """
        # Custom initialisation logic

    def process(self) -> None:
        """Process data."""
        # self.logger.info("config", config=self.config)
        # self.logger.info("frame", frame=frame)
        temperature = self.data.get("temperature", None)
        humidity = self.data.get("humidity", None)

        self.logger.info("temperature", data=temperature)
        self.logger.info("humidity", data=humidity)

        frame = self.frame

        if not frame.empty:
            temperature_mean = frame['temperature'].mean()
            humidity_mean = frame['humidity'].mean()

            temperature_message = self.make_message(
                'raw.float32',
                'temperature_mean',
                value=temperature_mean,
                emit=True
            )
            self.logger.info("temperature_mean", message=temperature_message)
            humidity_message = self.make_message(
                'raw.float32',
                'humidity_mean',
                value=humidity_mean,
                emit=True
            )
            self.logger.info("humidity_mean", message=humidity_message)
app:
  kelvin:
    language:
      python:
        entry_point: weather.weather:App
        requirements: requirements.txt
      type: python
    inputs:
      - data_type: raw.float32
        name: temperature
        sources:
          - asset_names: [ emulation ]
      - data_type: raw.float32
        name: humidity
        sources:
          - asset_names: [ emulation ]
    outputs:
      - data_type: raw.float32
        name: temperature_mean
        targets:
          - asset_names: [ emulation ]
      - data_type: raw.float32
        name: humidity_mean
        targets:
          - asset_names: [ emulation ]
  type: kelvin
info:
  description: weather
  name: weather
  title: weather
  version: 1.0.0
spec_version: 2.0.0
environment:
  node_name: node
  workload_name: workload

Inject data

Build and Emulate

First, it is necessary to build the application and start the emulation. Note that in the command kelvin emulation start is used the option --show-logs. This way is possible to track the logs while the app is running.

Run the following commands in Terminal window 1:

kelvin app build
kelvin emulation start --verbose --show-logs

App build output:

[kelvin.sdk][2021-07-07 17:54:49][R] Emulation configuration loaded from: "/home/user/kics-app-examples/kelvin_apps/weather/app.yaml"
[kelvin.sdk][2021-07-07 17:54:50][I] Valid schema available locally. Using cached version (/home/user/.config/kelvin/schemas/2.0.0.json)
[kelvin.sdk][2021-07-07 17:54:53][R] Kelvin Emulation System is online
[kelvin.sdk][2021-07-07 17:54:53][I] Loading configuration and starting the application "weather:1.0.0"
[kelvin.sdk][2021-07-07 17:54:53][I] Valid schema available locally. Using cached version (/home/user/.config/kelvin/schemas/2.0.0.json)
[kelvin.sdk][2021-07-07 17:54:56][R] Overriding existing configuration with "/home/user/kics-app-examples/kelvin_apps/weather/app.yaml"
[kelvin.sdk][2021-07-07 17:54:56][R] Application successfully launched: "weather:1.0.0"
[ApplicationEngine.cpp:  26:I] - Loading configuration file: /opt/kelvin/app/app.yaml
[        Pipeline.cpp:  73:I] - Loading shared object: ../core/lib/Client.so (weather) - SUCCESS
[        Pipeline.cpp: 188:I] - Loading weather Refinery input modules:
[        Pipeline.cpp: 193:I] - Loading weather Refinery output modules:
[        Pipeline.cpp: 198:I] - Loading weather Gatekeeper input modules
[        Pipeline.cpp: 203:I] - Loading weather Gatekeeper output modules
[  StorageManager.cpp:  27:I] - Using eXtremeDB storage backend
[ExtremeDbBackend.cpp: 277:I] - [config] Commit policy: MCO_COMMIT_SYNC_FLUSH // Transaction log type: REDO_LOG
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 0 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_DATABASE // Size: 2097152 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 1 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_CACHE // Size: 8388608 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 2 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_PERSISTENT // Filename: ./storage/kelvindb_v3.dbs // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 3 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_LOG // Filename: ./storage/kelvindb_v3.log // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 258:I] - eXtremeDB is running in PAID LICENSE MODE
[ExtremeDbBackend.cpp: 260:I] - eXtremeDB version: interim build eXtremeDB_8.1_1800, rev.25973
[ExtremeDbBackend.cpp:  30:I] - eXtremeDB started successfully.
[   UploadManager.cpp:  28:I] - The UploadManager's worker will not start because it's either disabled or has no address configured
[          Poller.cpp:  70:I] - Successful poller: 0.100000 secs (fd: 7)
[          Poller.cpp:  70:I] - Successful poller: 1.000000 secs (fd: 8)
[  RPCApplication.cpp:  72:I] - Weather App initialized.

Inject

Open second terminal window and navigate to the app's folder. The file data/data.csv will be used with the inject command. This file contains two columns, one for each app input in the weather app.

temperature humidity
14 40
14 40
16 42
12 50
13 55
...

Each column contains a header with the input name. This name must match with app.yamls inputs and connection inputs.

app:
  kelvin:
    core:
      inputs:
        - data_type: raw.float32
          name: temperature
          sources:
            - asset_names: [ emulation ]
        - data_type: raw.float32
          name: humidity
          sources:
            - asset_names: [ emulation ]
...

Run the following command and check the terminal ouput:

ksdk emulation inject data/data.csv --period=1 --repeat --app-name weather:1.0.0 --workload-name workload --node-name node

Emulation inject output: (Terminal window 2)

...
[kelvin.sdk][2021-07-07 18:32:46][I] Building new image for "injector:0.0.0". Please wait..
[kelvin.sdk][2021-07-07 18:34:45][R] Image successfully built: "injector:0.0.0"
[kelvin.sdk][2021-07-07 18:34:46][R] Kelvin Emulation System is online
[kelvin.sdk][2021-07-07 18:34:46][I] Loading configuration and starting the application "injector"
[kelvin.sdk][2021-07-07 18:34:47][R] Application successfully launched: "injector"
[ApplicationEngine.cpp:  26:I] - Loading configuration file: /opt/kelvin/app/app.yaml
[        Pipeline.cpp:  73:I] - Loading shared object: ../core/lib/Client.so (injector) - SUCCESS
[        Pipeline.cpp: 188:I] - Loading injector Refinery input modules:
[        Pipeline.cpp: 193:I] - Loading injector Refinery output modules:
[        Pipeline.cpp: 198:I] - Loading injector Gatekeeper input modules
[        Pipeline.cpp: 203:I] - Loading injector Gatekeeper output modules
[  StorageManager.cpp:  27:I] - Using eXtremeDB storage backend
[ExtremeDbBackend.cpp: 277:I] - [config] Commit policy: MCO_COMMIT_SYNC_FLUSH // Transaction log type: REDO_LOG
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 0 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_DATABASE // Size: 2097152 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 1 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_CACHE // Size: 8388608 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 2 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_PERSISTENT // Filename: ./storage/kelvindb_v3.dbs // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 3 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_LOG // Filename: ./storage/kelvindb_v3.log // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 258:I] - eXtremeDB is running in PAID LICENSE MODE
[ExtremeDbBackend.cpp: 260:I] - eXtremeDB version: interim build eXtremeDB_8.1_1800, rev.25973
[ExtremeDbBackend.cpp:  30:I] - eXtremeDB started successfully.
[   UploadManager.cpp:  28:I] - The UploadManager's worker will not start because it's either disabled or has no address configured
[          Poller.cpp:  70:I] - Successful poller: 0.100000 secs (fd: 7)
[          Poller.cpp:  70:I] - Successful poller: 1.000000 secs (fd: 8)
[  RPCApplication.cpp:  72:I] - injector initialize.
2021-07-07T17:34:50.226444Z [info     ] Loading data                   [injector_app] asset_name=None data_type=None last_process_time=1625679290.2255256 path=/opt/kelvin/app/data/data.csv
2021-07-07T17:34:50.240552Z [info     ] Emitting data                  [injector_app] asset_name=None last_process_time=1625679290.2255256 path=/opt/kelvin/app/data/data.csv

Check the weather application logs. It is possible to see that the logs for the latest values received for the temperature and humidity inputs.

App logs: (Terminal window 1)

...
... temperature        [app] asset_id=None data=DataBuffer(window=10000000000, count=None, data=[<Float32>(value=14.0)]) last_process_time=1611075408.755
... humidity           [app] asset_id=None data=DataBuffer(window=10000000000, count=None, data=[<Float32>(value=14.0)]) last_process_time=1611075408.755
... temperature_mean   [app] asset_id=None last_process_time=1611075408.755 message=<Float32>(value=14.0)
... humidity_men       [app] asset_id=None last_process_time=1611075408.755 message=<Float32>(value=40.0)
... temperature        [app] asset_id=None data=DataBuffer(window=10000000000, count=None, data=[<Float32>(value=14.0), <Float32>(value=16.0)]) last_process_time=1611075409.759
... humidity           [app] asset_id=None data=DataBuffer(window=10000000000, count=None, data=[<Float32>(value=14.0), <Float32>(value=16.0)]) last_process_time=1611075409.759
... temperature_mean   [app] asset_id=None last_process_time=1611075409.759 message=<Float32>(value=15.0)
... humidity_men       [app] asset_id=None last_process_time=1611075409.759 message=<Float32>(value=41.0)

Extract

With the extract command is possible to get data from the app's outputs. The first command argument is the app name with the version. Ex: weather:1.0.0 The content is stored as .parquet files in the output inside the folder defined with the option --output-dir. The extractor logic reads and saves the date from the outputs defined in the app.yaml.

app:
  kelvin:
    core:
      inputs:
        ...
      outputs:
        - data_model: raw.float32
          name: temperature_mean
          targets:
            - asset_names: [ emulation ]
        - data_model: raw.float32
          name: humidity_mean
          targets:
            - asset_names: [ emulation ]
...

Open Terminal window 3, and run the following command and check the output:

kelvin emulation extract weather:1.0.0 --output-dir data

Emulation extract logs:

[kelvin.sdk][2021-07-07 21:04:45][R] 
Emulation configurations applied to container:
    └── Volumes:
       - "/opt/kelvin/app/data" connected to "/home/user/kics-app-examples/kelvin_apps/weather/data" (container -> host)
[kelvin.sdk][2021-07-07 21:04:46][R] Application successfully launched: "extractor"
[ApplicationEngine.cpp:  26:I] - Loading configuration file: /opt/kelvin/app/app.yaml
[        Pipeline.cpp:  73:I] - Loading shared object: ../core/lib/Client.so (extractor) - SUCCESS
[        Pipeline.cpp: 188:I] - Loading extractor Refinery input modules:
[        Pipeline.cpp: 193:I] - Loading extractor Refinery output modules:
[        Pipeline.cpp: 198:I] - Loading extractor Gatekeeper input modules
[        Pipeline.cpp: 203:I] - Loading extractor Gatekeeper output modules
[  StorageManager.cpp:  27:I] - Using eXtremeDB storage backend
[ExtremeDbBackend.cpp: 277:I] - [config] Commit policy: MCO_COMMIT_SYNC_FLUSH // Transaction log type: REDO_LOG
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 0 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_DATABASE // Size: 2097152 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 1 // Memory type: MCO_MEMORY_CONV // Assignment: MCO_MEMORY_ASSIGN_CACHE // Size: 8388608 // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 2 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_PERSISTENT // Filename: ./storage/kelvindb_v3.dbs // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 318:I] - [config] Dev: 3 // Memory type: MCO_MEMORY_FILE // Assignment: MCO_MEMORY_ASSIGN_LOG // Filename: ./storage/kelvindb_v3.log // Flags: MCO_FILE_OPEN_DEFAULT
[ExtremeDbBackend.cpp: 258:I] - eXtremeDB is running in PAID LICENSE MODE
[ExtremeDbBackend.cpp: 260:I] - eXtremeDB version: interim build eXtremeDB_8.1_1800, rev.25973
[ExtremeDbBackend.cpp:  30:I] - eXtremeDB started successfully.
[   UploadManager.cpp:  28:I] - The UploadManager's worker will not start because it's either disabled or has no address configured
[          Poller.cpp:  70:I] - Successful poller: 0.100000 secs (fd: 7)
[          Poller.cpp:  70:I] - Successful poller: 1.000000 secs (fd: 8)
[  RPCApplication.cpp:  72:I] - extractor initialize.
2021-07-07T20:04:47.893801Z [info     ] Fields:\n  - cutoff: timestamp[ns]\n  - time: timestamp[ns]\n  - temperature_mean.value: double\n  - humidity_mean.value: double [extractor_app] asset_name=None last_process_time=0.0

Check the .parquet files data/output folder.