Skip to content

Configuring your Kelvin App

Kelvin App Functionalities

Python Kelvin Apps can further be refined to provide better filtering and caching. By default, the kelvin.app.DataApplication is configured to cache the last value of every message received via DataApplication.on_data, partitioned on the message name:

from kelvin.app import DataApplication
from kelvin.app.utils import make_message

class App(DataApplication):
    pass

>>> app = App()
>>> message_1 = make_message(
    "raw.text",
    name="hello",
    value="Guten Tag",
    time_of_validity=int(1 * 1e9),
)
>>> message_2 = make_message(
    "raw.text",
    name="goodbye",
    value="Sayonara",
    time_of_validity=int(2 * 1e9),
)
>>> app.on_data([message_1, message_2]

>>> app.data
MappingProxy({
    'goodbye': <kelvin.message.raw.text.Text at 0x7fc3153088f0>,
    'hello': <kelvin.message.raw.text.Text at 0x7fc315308c70>},
    defaults={})

>>> app.data.goodbye.value
'Sayonara'

>>> app.data.goodbye._.time_of_validity
2000000000

Routing and Caching

More complex routing and caching of data can be performed by specifying TOPICS, which matches patterns in the {type}.{name} of incoming messages (with # and * wildcards in the style of RabbitMQ routing patterns).

More-specific topics will be evaluated first (e.g. foo.# < foo.bar.#), and if more than one topic matches and routes to the same target name, the storage-type/-config for the first match will be used.

class TopicApp(DataApplication):
    TOPICS = {
        "#": {"target": "{name}"},  # the default rule
        "raw.*.*": {
            "target": "buffer.{name}",  # any format can be used, referencing message fields
            "storage_type": "buffer",
            "storage_config": {"window": {"minutes": 5}, "count": 100, "getter": "value"},
        },
    }
    defaults = {
        "data": {"x": None},
    }

>>> app = TopicApp()
>>> app.make_message("raw.float64", name="float64", value=1.0, time_of_validity=int(1 * 1e9), store=True)
>>> app.make_message("raw.float64", name="float64", value=2.0, time_of_validity=int(2 * 1e9), store=True)
>>> app.make_message("raw.text", name="text", value="one", time_of_validity=int(1 * 1e9), store=True)
>>> app.make_message("raw.text", name="text", value="two", time_of_validity=int(2 * 1e9), store=True)
>>> app.make_message("kelvin.beam_pump", name="beam_pump", time_of_validity=int(1 * 1e9), store=True)
>>> app.make_message("kelvin.beam_pump", name="beam_pump", time_of_validity=int(2 * 1e9), store=True)

>>> app.data
MappingProxy(
    {
        'beam_pump': <kelvin.message.kelvin.beam_pump.BeamPump at 0x7f827654fe70>,
        'buffer': {
            'float64': DataBuffer([
                (1000000000, <kelvin.message.raw.float64.Float64 at 0x7f8273863c70>),
                (2000000000, <kelvin.message.raw.float64.Float64 at 0x7f8274964cb0>)]),
            'text': DataBuffer([
                (1000000000, <kelvin.message.raw.text.Text at 0x7f8273d080f0>),
                (2000000000, <kelvin.message.raw.text.Text at 0x7f8273d081f0>)])},
        'float64': <kelvin.message.raw.float64.Float64 at 0x7f8274964cb0>,
        'text': <kelvin.message.raw.text.Text at 0x7f8273d081f0>},
    defaults={'x': None})

>>> app.data.buffer.text.series()
1970-01-01 00:00:01+00:00    one
1970-01-01 00:00:02+00:00    two
dtype: object

# default data values
>>> "x" in app.data, "y" in data
(True, False)

>>> app.data.x is None
True

Observe that the beam_pump message is not buffered as it doesn't match the routing rule, but the raw.* messages do match the buffer topic (and the default topic).

The supported storage-types (storage_type) are:

  • None (or unspecified): value is discarded. Note: storage_config must not be provided.
  • "buffer": data is buffered according to storage_config (keyword-arguments of kelvin.app.data.DataBuffer):
  • count: Count-window of data to retain
  • window: Time-window of data to retain (specified in seconds or as timedelta)
  • getter: Optional getter function or field to get value from resource value
  • delta: Store deltas in value (via getter), keeping the timestamp of the leading edge of the change
class ConfigApp(DataApplication):
    defaults = {
        "config": {"foo": 1, "bar": 2, "a": {"b": {"c": 3}}},
    }

>>> app = ConfigApp()
>>> app.config
MappingProxy(
    {},
    defaults={
        'a': {'b': {'c': 3}},
        'bar': 2,
        'foo': 1,
        'parameters': {}})

# access a value
>>> app.config.foo
1

# access via attributes or key
>>> app.config.a.b.c == app.config["a.b.c"] == 3
True

# update single value
>>> app.config.foo = 100
>>> app.config.foo
100

>>> app.config
MappingProxy(
    {'foo': 100},
    defaults={
        'a': {'b': {'c': 3}},
        'bar': 2,
        'foo': 1,
        'parameters': {}})

# merge configuration into defaults
>>> app.on_configuration_change({"foo": 1000, "a": {"b": {"c": 30}}})
>>> app.config.foo, app.config.a.b.c
(1000, 30)

# "flattened" update
>>> app.on_configuration_change({"a.b.c": 300})
>>> app.config.a.b.c
300

Commonly, an application will want to ensure that data is present before proceeding. Simple checks on data can be performed via CHECKS which drive the BaseApplication.data_status property. Rate-limiting to outputs can be managed via LIMITS.

class CheckApp(DataApplication):
    TOPICS = {
        "raw.*.bar": {
            "target": "{name}",
            "storage_type": "buffer",
            "storage_config": {"window": 10},
        }
    }
    CHECKS = {
        "foo": {"max_lag": 100.0},
        "bar": {"min_count": 5, "max_gap": 1.0}
    }
    LIMITS = {
        "x": {"frequency": {"seconds": 30.0}},
        "y": {"throttle": {"seconds": 15.0}},
        "z": {"frequency": {"minutes": 1.0}, "throttle": {"seconds": 10.0}},
    }

>>> app = CheckApp(context)
>>> app.data_status
{"foo": DataStatus.MISSING, "bar": DataStatus.MISSING}

>>> app.make_message("raw.text", "foo", value="hola!", time_of_validity=0, store=True)
>>> app.data_status
{"bar": DataStatus.MISSING}

>>> context.set_process_time(100.0)  # for testing - time is usually managed by core
>>> app.data_status
{"bar": DataStatus.MISSING}

>>> context.set_process_time(100.000001)
>>> app.data_status
{"foo": DataStatus.STALE, "bar": DataStatus.MISSING}

>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=0, store=True)
>>> app.data_status
{"foo": DataStatus.STALE, "bar": DataStatus.LOW_COUNT}

>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=int(1 * 1e9), store=True)
>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=int(2 * 1e9), store=True)
>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=int(3 * 1e9), store=True)
>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=int(4 * 1e9), store=True)
>>> app.data_status
{"foo": DataStatus.STALE}

>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=int(5.1 * 1e9), store=True)
>>> app.data_status
{"foo": DataStatus.STALE, "bar": DataStatus.LOW_FREQ}

Configuring from the configuration file

Instead of hard-coding CHECKS, LIMITS and TOPICS, each can be specified dynamically via configuration at run-time under the kelvin.app section, e.g.

...
configuration:
  - name: kelvin.app
    data_model: kelvin.app
    values:
      - name: offset_timestamps
        value: true
      - name: topics
        value: 
          "raw.*.bar":
            target: "{name}"
            storage_type": "buffer"
            storage_config: 
              window: 10
      - name: checks
        value: 
          foo: 
            max_lag: 100.0
          bar: 
            min_count: 100.0
            max_gap: 100.0
      - name: limits
        value: 
          x: 
            frequency: 
              seconds: 30.0
          y: 
            frequency: 
              throttle: 15.9
          z: 
            frequency: 
              minutes: 1.0
            throttle: 
              seconds: 10.0

A generic example

from kelvin.app import DataApplication, ApplicationConfig


class AppConfig(ApplicationConfig):
    """My Application Config."""

    ## Example configuration values:
    # lower: float = float("-inf")
    # upper: float = float("inf")
    # boost: bool = True
    # offset: float = 1000.0


class App(DataApplication):
    """My Application."""

    config: AppConfig

    TOPICS = {
        # "#.*": {
        #     "target": "history.{name}",
        #     "storage_type": "buffer",
        #     "storage_config": {"window": {"minutes": 10}, "getter": "value"},
        #  }
    }
    CHECKS = {
        # "pressure": {
        #     "max_lag": {"seconds": 30},
        # },
        # "
    }
    LIMITS = {
        # "alert": {
        #     "frequency": {"minutes": 5},
        # },
    }

    def init(self) -> None:
        """
        Initialisation method
        """
        # Custom initialisation logic

    def process(self):
        """Process data."""

        # your code here!

        ## Example processing code:
        # data_status = self.data_status:
        # if data_status:
        #     self.logger.warning("Missing data", data_status=data_status)
        #     return

        # pressure = self.data.pressure
        # if self.config.lower <= pressure.value <= self.config.upper:
        #     new_pressure = self.make_message(
        #         # message_type="raw.float32",  # optional if name is a known app output
        #         # time_of_validity=pressure.time_of_validity,  # optional, defaults to current time
        #         name="new_pressure",
        #         value=pressure
        #     )
        #     if self.config.boost:
        #         new_pressure.value += self.config.offset
        #     self.emit(new_pressure)
        # else:
        #     self.make_message("my.alert", "pressure_alert", message="Pressure out of bounds", emit=True)
        #     history = self.data.history.pressure.series()
        #     self.logger.debug("pressure", n=len(history), mean=history.mean(), std=history.std())