Configuring your Kelvin App
Kelvin App Functionalities¶
Python Kelvin Apps can further be refined to provide better filtering and caching.
By default, the kelvin.app.DataApplication is configured to cache the last value of every message received via
DataApplication.on_data, partitioned on the message name:
from kelvin.app import DataApplication
from kelvin.app.utils import make_message
class App(DataApplication):
pass
>>> app = App()
>>> message_1 = make_message(
"raw.text",
name="hello",
value="Guten Tag",
time_of_validity=int(1 * 1e9),
)
>>> message_2 = make_message(
"raw.text",
name="goodbye",
value="Sayonara",
time_of_validity=int(2 * 1e9),
)
>>> app.on_data([message_1, message_2]
>>> app.data
MappingProxy({
'goodbye': <kelvin.message.raw.text.Text at 0x7fc3153088f0>,
'hello': <kelvin.message.raw.text.Text at 0x7fc315308c70>},
defaults={})
>>> app.data.goodbye.value
'Sayonara'
>>> app.data.goodbye._.time_of_validity
2000000000
Routing and Caching¶
More complex routing and caching of data can be performed by specifying
TOPICS, which matches patterns in the {type}.{name} of incoming messages
(with # and * wildcards in the style of RabbitMQ routing patterns).
More-specific topics will be evaluated first (e.g. foo.# <
foo.bar.#), and if more than one topic matches and routes to the same
target name, the storage-type/-config for the first match will be used.
class TopicApp(DataApplication):
TOPICS = {
"#": {"target": "{name}"}, # the default rule
"raw.*.*": {
"target": "buffer.{name}", # any format can be used, referencing message fields
"storage_type": "buffer",
"storage_config": {"window": {"minutes": 5}, "count": 100, "getter": "value"},
},
}
defaults = {
"data": {"x": None},
}
>>> app = TopicApp()
>>> app.make_message("raw.float64", name="float64", value=1.0, time_of_validity=int(1 * 1e9), store=True)
>>> app.make_message("raw.float64", name="float64", value=2.0, time_of_validity=int(2 * 1e9), store=True)
>>> app.make_message("raw.text", name="text", value="one", time_of_validity=int(1 * 1e9), store=True)
>>> app.make_message("raw.text", name="text", value="two", time_of_validity=int(2 * 1e9), store=True)
>>> app.make_message("kelvin.beam_pump", name="beam_pump", time_of_validity=int(1 * 1e9), store=True)
>>> app.make_message("kelvin.beam_pump", name="beam_pump", time_of_validity=int(2 * 1e9), store=True)
>>> app.data
MappingProxy(
{
'beam_pump': <kelvin.message.kelvin.beam_pump.BeamPump at 0x7f827654fe70>,
'buffer': {
'float64': DataBuffer([
(1000000000, <kelvin.message.raw.float64.Float64 at 0x7f8273863c70>),
(2000000000, <kelvin.message.raw.float64.Float64 at 0x7f8274964cb0>)]),
'text': DataBuffer([
(1000000000, <kelvin.message.raw.text.Text at 0x7f8273d080f0>),
(2000000000, <kelvin.message.raw.text.Text at 0x7f8273d081f0>)])},
'float64': <kelvin.message.raw.float64.Float64 at 0x7f8274964cb0>,
'text': <kelvin.message.raw.text.Text at 0x7f8273d081f0>},
defaults={'x': None})
>>> app.data.buffer.text.series()
1970-01-01 00:00:01+00:00 one
1970-01-01 00:00:02+00:00 two
dtype: object
# default data values
>>> "x" in app.data, "y" in data
(True, False)
>>> app.data.x is None
True
Observe that the beam_pump message is not buffered as it doesn't match the routing rule,
but the raw.* messages do match the buffer topic (and the default topic).
The supported storage-types (storage_type) are:
None(or unspecified): value is discarded. Note:storage_configmust not be provided."buffer": data is buffered according tostorage_config(keyword-arguments ofkelvin.app.data.DataBuffer):count: Count-window of data to retainwindow: Time-window of data to retain (specified in seconds or astimedelta)getter: Optional getter function or field to get value from resource valuedelta: Store deltas in value (via getter), keeping the timestamp of the leading edge of the change
class ConfigApp(DataApplication):
defaults = {
"config": {"foo": 1, "bar": 2, "a": {"b": {"c": 3}}},
}
>>> app = ConfigApp()
>>> app.config
MappingProxy(
{},
defaults={
'a': {'b': {'c': 3}},
'bar': 2,
'foo': 1,
'parameters': {}})
# access a value
>>> app.config.foo
1
# access via attributes or key
>>> app.config.a.b.c == app.config["a.b.c"] == 3
True
# update single value
>>> app.config.foo = 100
>>> app.config.foo
100
>>> app.config
MappingProxy(
{'foo': 100},
defaults={
'a': {'b': {'c': 3}},
'bar': 2,
'foo': 1,
'parameters': {}})
# merge configuration into defaults
>>> app.on_configuration_change({"foo": 1000, "a": {"b": {"c": 30}}})
>>> app.config.foo, app.config.a.b.c
(1000, 30)
# "flattened" update
>>> app.on_configuration_change({"a.b.c": 300})
>>> app.config.a.b.c
300
Commonly, an application will want to ensure that data is present before
proceeding. Simple checks on data can be performed via CHECKS which
drive the BaseApplication.data_status property.
Rate-limiting to outputs can be managed via LIMITS.
class CheckApp(DataApplication):
TOPICS = {
"raw.*.bar": {
"target": "{name}",
"storage_type": "buffer",
"storage_config": {"window": 10},
}
}
CHECKS = {
"foo": {"max_lag": 100.0},
"bar": {"min_count": 5, "max_gap": 1.0}
}
LIMITS = {
"x": {"frequency": {"seconds": 30.0}},
"y": {"throttle": {"seconds": 15.0}},
"z": {"frequency": {"minutes": 1.0}, "throttle": {"seconds": 10.0}},
}
>>> app = CheckApp(context)
>>> app.data_status
{"foo": DataStatus.MISSING, "bar": DataStatus.MISSING}
>>> app.make_message("raw.text", "foo", value="hola!", time_of_validity=0, store=True)
>>> app.data_status
{"bar": DataStatus.MISSING}
>>> context.set_process_time(100.0) # for testing - time is usually managed by core
>>> app.data_status
{"bar": DataStatus.MISSING}
>>> context.set_process_time(100.000001)
>>> app.data_status
{"foo": DataStatus.STALE, "bar": DataStatus.MISSING}
>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=0, store=True)
>>> app.data_status
{"foo": DataStatus.STALE, "bar": DataStatus.LOW_COUNT}
>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=int(1 * 1e9), store=True)
>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=int(2 * 1e9), store=True)
>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=int(3 * 1e9), store=True)
>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=int(4 * 1e9), store=True)
>>> app.data_status
{"foo": DataStatus.STALE}
>>> app.make_message("raw.text", "bar", value="hola!", time_of_validity=int(5.1 * 1e9), store=True)
>>> app.data_status
{"foo": DataStatus.STALE, "bar": DataStatus.LOW_FREQ}
Configuring from the configuration file¶
Instead of hard-coding CHECKS, LIMITS and TOPICS, each can be
specified dynamically via configuration at run-time under the
kelvin.app section, e.g.
...
configuration:
- name: kelvin.app
data_model: kelvin.app
values:
- name: offset_timestamps
value: true
- name: topics
value:
"raw.*.bar":
target: "{name}"
storage_type": "buffer"
storage_config:
window: 10
- name: checks
value:
foo:
max_lag: 100.0
bar:
min_count: 100.0
max_gap: 100.0
- name: limits
value:
x:
frequency:
seconds: 30.0
y:
frequency:
throttle: 15.9
z:
frequency:
minutes: 1.0
throttle:
seconds: 10.0
A generic example¶
from kelvin.app import DataApplication, ApplicationConfig
class AppConfig(ApplicationConfig):
"""My Application Config."""
## Example configuration values:
# lower: float = float("-inf")
# upper: float = float("inf")
# boost: bool = True
# offset: float = 1000.0
class App(DataApplication):
"""My Application."""
config: AppConfig
TOPICS = {
# "#.*": {
# "target": "history.{name}",
# "storage_type": "buffer",
# "storage_config": {"window": {"minutes": 10}, "getter": "value"},
# }
}
CHECKS = {
# "pressure": {
# "max_lag": {"seconds": 30},
# },
# "
}
LIMITS = {
# "alert": {
# "frequency": {"minutes": 5},
# },
}
def init(self) -> None:
"""
Initialisation method
"""
# Custom initialisation logic
def process(self):
"""Process data."""
# your code here!
## Example processing code:
# data_status = self.data_status:
# if data_status:
# self.logger.warning("Missing data", data_status=data_status)
# return
# pressure = self.data.pressure
# if self.config.lower <= pressure.value <= self.config.upper:
# new_pressure = self.make_message(
# # message_type="raw.float32", # optional if name is a known app output
# # time_of_validity=pressure.time_of_validity, # optional, defaults to current time
# name="new_pressure",
# value=pressure
# )
# if self.config.boost:
# new_pressure.value += self.config.offset
# self.emit(new_pressure)
# else:
# self.make_message("my.alert", "pressure_alert", message="Pressure out of bounds", emit=True)
# history = self.data.history.pressure.series()
# self.logger.debug("pressure", n=len(history), mean=history.mean(), std=history.std())