Testing

The kelvin.testing module provides a deterministic, in-memory test harness for KelvinApp instances. Tests run without network dependencies, use a virtual clock for time control, and support flexible data injection through messages and data sources.

Quick Start

Add a pytest.ini alongside your tests:

[pytest]
pythonpath = .

Write tests against the app singleton from your main.py:

import pytest
from main import app

from kelvin.krn import KRNAssetDataStream
from kelvin.message import Number
from kelvin.testing import KelvinAppTest, ManifestBuilder


def _build_manifest():
    return (
        ManifestBuilder()
        .add_input("temperature")
        .add_output("alert")
        .add_asset("pump-001")
    )


class TestMyApp:
    @pytest.mark.asyncio
    async def test_processes_input(self):
        harness = KelvinAppTest(app, manifest=_build_manifest().build())

        async with harness:
            await harness.publish(
                Number(
                    resource=KRNAssetDataStream("pump-001", "temperature"),
                    payload=42.0,
                )
            )
            await harness.run_until_idle()

        outputs = harness.outputs
        assert len(outputs) > 0

The pattern is always:

  1. Build a manifest describing assets and datastreams

  2. Create a KelvinAppTest wrapping your app

  3. Use async with to connect/disconnect

  4. Publish inputs and/or advance time

  5. Call run_until_idle() to let the app process everything

  6. Assert on harness.outputs

Project Layout

A typical test setup alongside your app:

my-app/
  main.py           # KelvinApp definition
  app.yaml           # app configuration
  pytest.ini         # [pytest] pythonpath = .
  tests/
    test_main.py     # tests
  data/
    sample.csv       # test data (optional)

ManifestBuilder

ManifestBuilder creates a RuntimeManifest that describes the test environment: which assets exist, which datastreams are available, and what configuration the app receives.

Assets

ManifestBuilder()
    .add_asset("pump-001")
    .add_asset("pump-002", parameters={"temperature-limit": 50})
    .add_asset("pump-003", properties={"location": "building-a"})

Datastreams

Convenience methods for common datastream directions, plus a general-purpose method:

from kelvin.message.runtime_manifest import WayEnum

ManifestBuilder()
    # Convenience methods
    .add_input("temperature")                          # input
    .add_input("temperature", "number")                # input with explicit type
    .add_output("alert", "string")                     # output
    .add_control_change_input("setpoint")              # input_cc
    .add_control_change_output("target-speed")         # output_cc

    # General-purpose (for bidirectional streams)
    .add_datastream("output-number", "number", WayEnum.input_cc_output)
    .add_datastream("sensor", "number", WayEnum.input_output_cc)

Supported data types: "number", "string", "boolean", "object".

Custom Actions

ManifestBuilder()
    .add_custom_action_input("example-in")
    .add_custom_action_output("example-out")

Configuration

ManifestBuilder()
    .set_configuration({"min": 0, "max": 100, "random": True})

Loading from app.yaml

from pathlib import Path

manifest = ManifestBuilder.from_app_yaml(Path("app.yaml")).add_asset("pump-001").build()

Complete Example

def _build_manifest():
    return (
        ManifestBuilder()
        .add_input("input-temperature")
        .add_control_change_output("out-temperature-setpoint")
        .add_asset("pump-001", parameters={"temperature-limit": 50, "auto-accept": False})
    )

KelvinAppTest

Lifecycle

The harness wraps your app singleton and manages connection/disconnection:

harness = KelvinAppTest(app, manifest=_build_manifest().build())

# Option 1: async context manager (recommended)
async with harness:
    # app is connected here
    ...
# app is disconnected after the block

# Option 2: manual control
await harness.connect()
# ...
await harness.disconnect()

The harness clears all state on each connect(), so a single instance can be reused across tests. However, creating a fresh harness per test is the typical pattern.

Publishing Messages

Inject messages into the app as if they arrived from the platform:

from kelvin.krn import KRNAssetDataStream, KRNAsset
from kelvin.message import Number, CustomAction

# Single message
await harness.publish(
    Number(resource=KRNAssetDataStream("pump-001", "temperature"), payload=42.0)
)

# Batch publish
await harness.publish_batch([
    Number(resource=KRNAssetDataStream("pump-001", "temperature"), payload=42.0),
    Number(resource=KRNAssetDataStream("pump-001", "temperature"), payload=43.0),
])

# Custom actions
await harness.publish(
    CustomAction(
        resource=KRNAsset("pump-001"),
        type="example-in",
        title="Incoming Action",
        expiration_date=timedelta(seconds=30),
    )
)

Synchronization with run_until_idle

run_until_idle() is the key method for deterministic testing. It:

  1. Advances virtual time to fire pending sleeps/timers

  2. Waits for all input messages to be consumed

  3. Waits for all stream processing to complete

  4. Gives sync tasks (running in threads) a chance to finish

# Default timeout of 5 seconds (virtual time)
await harness.run_until_idle()

# For timers with longer intervals, set timeout past the interval
# e.g., a 10-second timer:
await harness.run_until_idle(timeout=11.0)

The timeout parameter controls how far virtual time can advance. Set it to at least the timer interval + 1 second.

Reading Outputs

outputs = harness.outputs  # list of all Message objects the app published

Filter by message type to assert on specific outputs:

from kelvin.message.base_messages import ControlChangeMsg, RecommendationMsg, CustomActionMsg

recs = [m for m in harness.outputs if isinstance(m, RecommendationMsg)]
ccs = [m for m in harness.outputs if isinstance(m, ControlChangeMsg)]
actions = [m for m in harness.outputs if isinstance(m, CustomActionMsg)]

Time Control

For fine-grained time manipulation:

# Advance virtual clock by a specific amount
await harness.advance_time(5.0)

Testing Patterns

Stream Handlers

Test that stream handlers receive the correct messages based on their filters:

@pytest.mark.asyncio
async def test_receives_matching_asset(self, capsys):
    harness = KelvinAppTest(app, manifest=_build_manifest().build())

    async with harness:
        await harness.publish(
            Number(resource=KRNAssetDataStream("test-asset-1", "input-1"), payload=1.0)
        )
        await harness.run_until_idle()

    captured = capsys.readouterr().out
    assert "Received" in captured

@pytest.mark.asyncio
async def test_ignores_other_assets(self, capsys):
    harness = KelvinAppTest(app, manifest=_build_manifest().build())

    async with harness:
        await harness.publish(
            Number(resource=KRNAssetDataStream("test-asset-2", "input-1"), payload=1.0)
        )
        await harness.run_until_idle()

    captured = capsys.readouterr().out
    assert "Received" not in captured

Timers

Timers fire automatically when virtual time advances past their interval:

@pytest.mark.asyncio
async def test_timer_fires(self, capsys):
    harness = KelvinAppTest(app, manifest=ManifestBuilder().build())

    async with harness:
        # Timer interval is 5s - advance past the first firing
        await harness.run_until_idle(timeout=6.0)

    captured = capsys.readouterr().out
    assert "timer fired" in captured

Test that the app survives a failing timer:

@pytest.mark.asyncio
async def test_app_survives_failing_timer(self):
    harness = KelvinAppTest(app, manifest=ManifestBuilder().build())

    async with harness:
        await harness.run_until_idle(timeout=6.0)
        assert app.is_connected  # app still running

Windows (Rolling, Tumbling, Hopping)

Feed enough messages to fill the window:

@pytest.mark.asyncio
async def test_window_emits_after_count_size(self, capsys):
    harness = KelvinAppTest(app, manifest=_build_manifest().build())

    async with harness:
        # count_size=5, so publish 5 messages
        for i in range(1, 6):
            await harness.publish(
                Number(
                    resource=KRNAssetDataStream("pump-001", "motor-temperature"),
                    payload=float(i),
                )
            )
        await harness.run_until_idle()

    captured = capsys.readouterr().out
    assert "window received" in captured

@pytest.mark.asyncio
async def test_window_does_not_emit_below_count_size(self, capsys):
    harness = KelvinAppTest(app, manifest=_build_manifest().build())

    async with harness:
        for i in range(1, 4):  # below count_size
            await harness.publish(
                Number(
                    resource=KRNAssetDataStream("pump-001", "motor-temperature"),
                    payload=float(i),
                )
            )
        await harness.run_until_idle()

    captured = capsys.readouterr().out
    assert "window received" not in captured

Output Inspection

Assert on published messages by type, payload, and resource:

@pytest.mark.asyncio
async def test_publishes_correct_output_types(self):
    harness = KelvinAppTest(app, manifest=_build_manifest().build())

    async with harness:
        await harness.run_until_idle(timeout=11.0)

    outputs = harness.outputs
    payloads = [msg.payload for msg in outputs]

    assert any(isinstance(p, bool) for p in payloads)
    assert any(isinstance(p, (int, float)) for p in payloads)
    assert any(isinstance(p, str) for p in payloads)

@pytest.mark.asyncio
async def test_all_messages_target_correct_asset(self):
    harness = KelvinAppTest(app, manifest=_build_manifest().build())

    async with harness:
        await harness.run_until_idle()

    for msg in harness.outputs:
        assert msg.resource.asset == "test-asset-1"

Recommendations and Evidence

@pytest.mark.asyncio
async def test_publishes_recommendation_with_evidence(self):
    harness = KelvinAppTest(app, manifest=_build_manifest().build())

    async with harness:
        for temp in [55.0, 56.0, 57.0]:
            await harness.publish(
                Number(
                    resource=KRNAssetDataStream("pump-001", "input-temperature"),
                    payload=temp,
                )
            )
        await harness.run_until_idle()
        await harness.run_until_idle(timeout=6.0)  # fire the timer

    recs = [o for o in harness.outputs if isinstance(o, RecommendationMsg)]
    assert len(recs) == 1

    rec = recs[0]
    assert rec.payload.evidences is not None
    assert rec.payload.evidences[0].type == "line-chart"
    assert rec.payload.actions.control_changes is not None

Custom Actions

@pytest.mark.asyncio
async def test_receives_and_responds_to_custom_action(self):
    harness = KelvinAppTest(app, manifest=_build_manifest().build())

    async with harness:
        await harness.publish(
            CustomAction(
                resource=KRNAsset("test-asset-1"),
                type="example-in",
                title="Incoming Action",
                expiration_date=timedelta(seconds=30),
            )
        )
        await harness.run_until_idle()

    results = [o for o in harness.outputs if isinstance(o, CustomActionResultMsg)]
    assert len(results) == 1
    assert results[0].payload.success is True

Multiple Timer Firings

Chain run_until_idle() calls to fire the timer multiple times:

@pytest.mark.asyncio
async def test_cooldown_prevents_duplicates(self):
    harness = KelvinAppTest(app, manifest=_build_manifest().build())

    async with harness:
        # ... publish inputs ...
        await harness.run_until_idle()

        # Fire timer twice
        await harness.run_until_idle(timeout=6.0)
        await harness.run_until_idle(timeout=6.0)

    recs = [o for o in harness.outputs if isinstance(o, RecommendationMsg)]
    assert len(recs) == 1  # only one due to cooldown

Resetting Module State

If your app uses module-level state, reset it between tests:

import main as main_module

def _reset_state():
    main_module._counter = 0
    main_module._readings.clear()

class TestMyApp:
    @pytest.mark.asyncio
    async def test_something(self):
        _reset_state()
        harness = KelvinAppTest(app, manifest=_build_manifest().build())
        # ...

Data Sources

Data sources automate input generation. Add them to the harness before connecting.

RandomSource

Generates random values for fuzz testing:

from kelvin.testing import RandomSource

source = (
    RandomSource(datastreams=["temperature", "pressure"], min_value=0, max_value=100)
    .with_asset("pump-001")
    .with_timing(timedelta(seconds=1))
    .with_count(10)
)

harness = KelvinAppTest(app, manifest=manifest)
harness.add_source(source)

async with harness:
    await harness.run_until_idle()
    outputs = harness.outputs

Options:

  • datastreams: list of datastream names (default: ["value"])

  • min_value / max_value: numeric range

  • seed: for reproducible results

  • count: number of messages (None = indefinite)

  • value_type: "number", "string", or "boolean"

CSVSource

Replay messages from a CSV file:

from kelvin.testing import CSVSource

source = (
    CSVSource("data/sample.csv", playback=True, now_offset=True)
    .with_asset("pump-001")
    .with_timing(timedelta(seconds=1))
)

# Select specific columns
source = CSVSource("data.csv").with_columns("temperature", "pressure")

# Map column names to datastream names
source = CSVSource("data.csv").with_column_mapping({
    "temp_c": "temperature",
    "pressure_psi": "pressure",
})

Options:

  • playback: wait between rows based on timestamp differences

  • ignore_timestamps: use virtual clock time instead of data timestamps

  • now_offset: offset timestamps so first row starts at current virtual time

  • asset_column / timestamp_column: column name overrides

DataFrameSource

Same as CSVSource but from a pandas DataFrame:

from kelvin.testing import DataFrameSource

source = DataFrameSource(df, playback=True, now_offset=True).with_asset("pump-001")

SyntheticSource

Generate values from mathematical wave patterns:

from datetime import timedelta
from kelvin.testing import SyntheticSource, SineWave, SquareWave, RampWave, NoiseWave, ConstantWave

# Sine wave
source = SyntheticSource(
    pattern=SineWave(amplitude=10, period=timedelta(minutes=1), offset=20),
    datastream="temperature",
    sample_rate=timedelta(seconds=1),
    duration=timedelta(minutes=5),
).with_asset("pump-001")

# Square wave (on/off signal)
source = SyntheticSource(
    pattern=SquareWave(amplitude=1, period=timedelta(seconds=10), duty_cycle=0.5),
    datastream="valve-state",
).with_asset("pump-001")

# Ramp (sawtooth)
source = SyntheticSource(
    pattern=RampWave(min_value=0, max_value=100, period=timedelta(minutes=1)),
    datastream="pressure",
).with_asset("pump-001")

# Noisy sine wave
source = SyntheticSource(
    pattern=NoiseWave(base=SineWave(amplitude=10), std_dev=0.5, seed=42),
    datastream="temperature",
).with_asset("pump-001")

# Constant value
source = SyntheticSource(
    pattern=ConstantWave(value=25.0),
    datastream="ambient-temp",
).with_asset("pump-001")

Combining Sources

Add multiple sources to the same harness:

harness = KelvinAppTest(app, manifest=manifest)
harness.add_source(temperature_source)
harness.add_source(pressure_source)

async with harness:
    await harness.run_until_idle()

API Reference

Kelvin testing framework for KelvinApp unit testing.

This module provides a test harness that wraps KelvinApp with an in-memory InMemoryStream and integrates with VirtualClock for deterministic testing.

final class kelvin.testing.KelvinAppTest(app, manifest, clock=None)[source]

Bases: object

Test harness for KelvinApp instances.

Wraps a KelvinApp with an in-memory InMemoryStream and VirtualClock for deterministic testing without network dependencies. Supports reuse after disconnect - state is cleared on each connect.

Parameters:
app

The underlying KelvinApp instance.

manifest

The RuntimeManifest injected on connect.

clock

The VirtualClock for time control.

is_connected

Whether the harness is currently connected.

inputs

Copy of all injected input messages.

outputs

Copy of all captured output messages.

Example

>>> async with KelvinAppTest(app, manifest) as harness:
...     await harness.publish(msg)
...     await harness.run_until_idle()
...     assert len(harness.outputs) == 1
__init__(app, manifest, clock=None)[source]

Initialize the test harness.

Parameters:
  • app (KelvinApp) – The KelvinApp instance to test. Must be disconnected.

  • manifest (RuntimeManifest) – RuntimeManifest to inject when connecting.

  • clock (Optional[VirtualClock]) – Optional VirtualClock. If None, creates one with manual time control.

Raises:
  • RuntimeError – If the app is already connected or has a running task.

  • ValueError – If the app is missing required internal attributes.

Return type:

None

Example

>>> harness = KelvinAppTest(app, manifest)
>>> harness = KelvinAppTest(app, manifest, clock=VirtualClock())
__repr__()[source]

Return a string representation for debugging.

Return type:

str

Returns:

String showing connection state and message counts.

add_source(source)[source]

Add a data source for input generation.

Sources are started when connect() is called and stopped on disconnect(). Must be called before connect().

Parameters:

source (DataSource) – DataSource that generates messages to inject.

Return type:

Self

Returns:

Self for method chaining.

Raises:

RuntimeError – If called after connect().

Example

>>> harness.add_source(source1).add_source(source2)
async publish(msg)[source]

Inject a message into the input queue and record it.

The message is added to the inputs list and injected into the in-memory stream for processing by the app.

Parameters:

msg (Union[Message, MessageBuilder[Any]]) – Message or MessageBuilder to inject as input.

Raises:

RuntimeError – If called before connect().

Return type:

None

Example

>>> await harness.publish(Number(resource=krn, payload=5.0))
async publish_batch(messages)[source]

Inject multiple messages sequentially.

Parameters:

messages (Iterable[Union[Message, MessageBuilder[Any]]]) – Iterable of messages or message builders to inject.

Raises:

RuntimeError – If called before connect().

Return type:

None

Example

>>> await harness.publish_batch([msg1, msg2, msg3])
property app: KelvinApp

Get the underlying KelvinApp instance.

Returns:

The KelvinApp being tested.

property manifest: RuntimeManifest

Get the runtime manifest.

Returns:

The RuntimeManifest injected on connect.

property clock: VirtualClock

Get the virtual clock.

Returns:

The VirtualClock used for time control.

property is_connected: bool

Check if the test harness is connected.

Returns:

True if connected, False otherwise.

property inputs: list[Message]

Get a copy of all injected inputs.

Returns:

Copy of the inputs list. Modifications don’t affect internal state.

property outputs: list[Message]

Get a copy of all captured outputs.

Drains any pending outputs from the output queue before returning. Each access may return additional outputs if the app has published more.

Returns:

Copy of all captured output messages.

async advance_time(seconds)[source]

Advance virtual time and yield to the event loop.

Parameters:

seconds (float) – Number of seconds to advance. Must be >= 0.

Raises:

ValueError – If seconds is negative.

Return type:

None

Example

>>> await harness.advance_time(10.0)  # Advance 10 seconds
async run_until_idle(timeout=5.0)[source]

Run until queues are drained and processing completes.

Advances virtual time to process pending sleeps, then waits for all queues to be fully processed using join().

Parameters:

timeout (float) – Maximum virtual time to advance in seconds. Must be positive. Default is 5.0 seconds.

Raises:

ValueError – If timeout is not positive.

Return type:

None

Example

>>> await harness.run_until_idle()
>>> await harness.run_until_idle(timeout=10.0)
async connect()[source]

Connect the app and start data sources.

Clears inputs/outputs for a fresh start. Can be called again after disconnect() to reuse the harness.

Return type:

Self

Returns:

Self for method chaining.

Raises:

RuntimeError – If already connected.

Example

>>> await harness.connect()
>>> # ... run tests ...
>>> await harness.disconnect()
async disconnect()[source]

Disconnect the app and stop data sources.

Safe to call multiple times or when not connected (no-op).

Example

>>> await harness.disconnect()
Return type:

None

async __aenter__()[source]

Enter the async context and connect.

Return type:

Self

Returns:

Self for use in the context.

Example

>>> async with KelvinAppTest(app, manifest) as harness:
...     await harness.publish(msg)
async __aexit__(_exc_type=None, _exc_value=None, _traceback=None)[source]

Exit the async context and disconnect.

Always disconnects, even if an exception occurred.

Return type:

None

Parameters:
class kelvin.testing.ManifestBuilder[source]

Bases: object

Fluent builder for creating RuntimeManifest instances for testing.

Provides a chainable API for constructing RuntimeManifest objects with datastreams, assets, and configuration for use in testing scenarios.

Examples

>>> manifest = (ManifestBuilder()
...     .add_datastream("temperature", "number", WayEnum.input)
...     .add_datastream("alert", "boolean", WayEnum.output)
...     .add_asset("pump-001", properties={"location": "A1"})
...     .set_configuration({"threshold": 100})
...     .build())
__init__()[source]

Initialize an empty manifest builder.

Return type:

None

__repr__()[source]

Return a string representation of the builder state.

Return type:

str

classmethod from_app_yaml(path=None)[source]

Load base configuration from app.yaml.

Supports both legacy format (inputs/outputs at top level) and modern format (spec_version 5.0.0 with data_streams section).

Parameters:

path (Optional[Path]) – Path to app.yaml file.

Return type:

ManifestBuilder

Returns:

ManifestBuilder populated from app.yaml.

Raises:
  • FileNotFoundError – If the specified path does not exist.

  • yaml.YAMLError – If the YAML file is malformed.

classmethod from_dict(config)[source]

Create a ManifestBuilder from a dictionary.

Parameters:

config (dict[str, Any]) – Dictionary with manifest configuration.

Return type:

ManifestBuilder

Returns:

ManifestBuilder populated from the dictionary.

add_asset(name, properties=None, parameters=None)[source]

Add an asset to the manifest.

Note: If an asset with the same name already exists, it will be overwritten.

Parameters:
  • name (str) – Asset name (cannot be None, empty, or whitespace-only).

  • properties (dict[str, Any] | None) – Optional asset properties.

  • parameters (dict[str, Any] | None) – Optional asset parameters.

Return type:

Self

Returns:

Self for chaining.

Raises:

ValueError – If name is None, empty, or whitespace-only.

add_datastream(name, data_type='number', way=WayEnum.input, unit=None, configuration=None)[source]

Add a datastream definition to the manifest.

Note: If a datastream with the same name already exists, it will be overwritten.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object). Defaults to “number”.

  • way (WayEnum) – Input/output direction (input, output, input_cc, output_cc, input_cc_output, input_output_cc). Defaults to WayEnum.input.

  • unit (str | None) – Optional unit name (e.g., “celsius”, “meters”).

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_input(name, data_type='number', unit=None, configuration=None)[source]

Add an input datastream.

Convenience method that calls add_datastream with WayEnum.input.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_output(name, data_type='number', unit=None, configuration=None)[source]

Add an output datastream.

Convenience method that calls add_datastream with WayEnum.output.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_control_change_input(name, data_type='number', unit=None, configuration=None)[source]

Add a control change input datastream.

Convenience method that calls add_datastream with WayEnum.input_cc.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_control_change_output(name, data_type='number', unit=None, configuration=None)[source]

Add a control change output datastream.

Convenience method that calls add_datastream with WayEnum.output_cc.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_input_cc_output(name, data_type='number', unit=None, configuration=None)[source]

Add a datastream that is both input control change and output.

Convenience method that calls add_datastream with WayEnum.input_cc_output. Use for owned datastreams with read-write access.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_input_output_cc(name, data_type='number', unit=None, configuration=None)[source]

Add a datastream that is both input and output control change.

Convenience method that calls add_datastream with WayEnum.input_output_cc. Use for remote datastreams with read-write access.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_custom_action(action_type, way=CAWayEnum.output_ca)[source]

Add a custom action to the manifest.

Note: If a custom action with the same type already exists, it will be overwritten.

Parameters:
  • action_type (str) – Custom action type identifier (cannot be None, empty, or whitespace-only).

  • way (CAWayEnum) – Direction of the custom action (input_ca or output_ca). Defaults to output_ca.

Return type:

Self

Returns:

Self for chaining.

Raises:

ValueError – If action_type is None, empty, or whitespace-only.

add_custom_action_input(action_type)[source]

Add an input custom action.

Convenience method that calls add_custom_action with CAWayEnum.input_ca.

Parameters:

action_type (str) – Custom action type identifier (cannot be None, empty, or whitespace-only).

Return type:

Self

Returns:

Self for chaining.

Raises:

ValueError – If action_type is None, empty, or whitespace-only.

add_custom_action_output(action_type)[source]

Add an output custom action.

Convenience method that calls add_custom_action with CAWayEnum.output_ca.

Parameters:

action_type (str) – Custom action type identifier (cannot be None, empty, or whitespace-only).

Return type:

Self

Returns:

Self for chaining.

Raises:

ValueError – If action_type is None, empty, or whitespace-only.

add_assets(assets)[source]

Add multiple assets to the manifest.

Note: If an asset with the same name already exists, it will be overwritten.

Parameters:

assets (list[dict[str, Any]]) – List of dicts with ‘name’ (required), ‘properties’, ‘parameters’ keys.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • KeyError – If any asset dict is missing the required ‘name’ key.

  • ValueError – If any asset name is None, empty, or whitespace-only.

add_datastreams(datastreams)[source]

Add multiple datastreams to the manifest.

Note: If a datastream with the same name already exists, it will be overwritten.

Parameters:

datastreams (list[dict[str, Any]]) – List of dicts with ‘name’ (required), ‘data_type’, ‘way’, ‘unit’, ‘configuration’ keys.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • KeyError – If any datastream dict is missing the required ‘name’ key.

  • ValueError – If any datastream name is None, empty, or whitespace-only.

  • ValueError – If ‘way’ is an invalid string or ‘data_type’ is invalid.

add_custom_actions(actions)[source]

Add multiple custom actions to the manifest.

Note: If a custom action with the same type already exists, it will be overwritten.

Parameters:

actions (list[dict[str, Any]]) – List of dicts with ‘type’ (required) and ‘way’ (optional) keys. The ‘way’ can be a CAWayEnum or string (‘input-ca’ or ‘output-ca’).

Return type:

Self

Returns:

Self for chaining.

Raises:
  • KeyError – If any action dict is missing the required ‘type’ key.

  • ValueError – If any action type is None, empty, or whitespace-only.

  • ValueError – If ‘way’ is an invalid string.

set_configuration(config)[source]

Set the app configuration.

Note: This replaces any existing configuration entirely.

Parameters:

config (dict[str, Any]) – Configuration dictionary.

Return type:

Self

Returns:

Self for chaining.

clear()[source]

Clear all builder state to allow reuse.

Resets assets, datastreams, custom actions, and configuration to empty state. Useful for building multiple manifests with the same builder instance.

Return type:

Self

Returns:

Self for chaining.

build()[source]

Build the RuntimeManifest.

Constructs the final RuntimeManifest from all configured assets, datastreams, custom actions, and configuration.

Return type:

RuntimeManifest

Returns:

RuntimeManifest instance ready for use in testing.

class kelvin.testing.DataSource[source]

Bases: ABC

Abstract base class for test data sources.

Data sources generate messages that are fed into the test harness as simulated inputs. They can be time-based (CSV replay, synthetic waveforms) or event-based (random generation).

Usage:

source = CSVSource(“data.csv”).with_asset(“pump-001”) test.add_source(source)

__init__()[source]

Initialize the data source.

Return type:

None

abstractmethod async generate(clock)[source]

Generate messages.

This is an async generator that yields messages at appropriate times. The implementation should use clock.sleep() to wait between messages for deterministic time-based generation.

Parameters:

clock (VirtualClock) – VirtualClock for time control.

Yields:

Message instances to inject as inputs.

Return type:

AsyncGenerator[Message, None]

with_asset(asset)[source]

Set the asset name for generated messages.

Parameters:

asset (str) – Asset name to use.

Return type:

Self

Returns:

Self for chaining.

with_timing(interval)[source]

Set the timing interval between messages.

Parameters:

interval (timedelta) – Time interval between messages.

Return type:

Self

Returns:

Self for chaining.

property asset: str | None

Get the configured asset name.

property interval: timedelta

Get the configured timing interval.

class kelvin.testing.TabularSource(playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Bases: DataSource, ABC

Base class for tabular data sources (CSV, DataFrame).

This class provides shared functionality for data sources that work with tabular data (rows and columns), including: - Timestamp parsing and offset management - Column selection and mapping - Message creation from typed values - Asset resolution from columns or fallback - Unified generate() implementation

Subclasses must implement: - _iter_rows(): Yield rows as dictionaries - _get_columns(): Return list of column names - _has_timestamp_column(): Check if timestamp column exists

Parameters:
  • playback (bool)

  • ignore_timestamps (bool)

  • now_offset (bool)

  • asset_column (str)

  • timestamp_column (str)

__init__(playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Initialize the tabular source.

Parameters:
  • playback (bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in data.

  • ignore_timestamps (bool) – When True, use virtual clock time instead of data timestamps.

  • now_offset (bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.

  • asset_column (str) – Name of the column containing asset names.

  • timestamp_column (str) – Name of the timestamp column.

Return type:

None

with_columns(*columns)[source]

Select specific columns as inputs.

Only the selected columns will be processed as datastreams. Can be called with varargs or a single list. Empty strings are filtered out.

Parameters:

columns (str | list[str]) – Column names to select.

Return type:

Self

Returns:

Self for chaining.

Examples

source.with_columns(“temperature”, “pressure”) source.with_columns([“temperature”, “pressure”])

with_column_mapping(mapping)[source]

Map column names to datastream names.

Allows columns with arbitrary names to map to specific datastreams.

Parameters:

mapping (dict[str, str]) – Dictionary mapping column names to datastream names.

Return type:

Self

Returns:

Self for chaining.

Example

source.with_column_mapping({

“temp_c”: “temperature”, “pressure_psi”: “pressure”,

})

async generate(clock)[source]

Generate messages from tabular data.

Iterates through rows and yields messages at appropriate timestamps, using the virtual clock for time control. Processes active columns and applies column mapping.

Parameters:

clock (VirtualClock) – VirtualClock for time control.

Yields:

Message instances from tabular data.

Return type:

AsyncGenerator[Message, None]

final class kelvin.testing.CSVSource(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Bases: TabularSource

Data source that replays messages from a CSV file.

The CSV file should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.

Usage:
source = CSVSource(“data.csv”, playback=True, now_offset=True)

.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))

# Select specific columns source = CSVSource(“data.csv”).with_columns(“temperature”, “pressure”)

# Map columns to datastream names source = CSVSource(“data.csv”).with_column_mapping({

“temp_c”: “temperature”, “pressure_psi”: “pressure”,

})

Parameters:
  • path (Path | str)

  • playback (bool)

  • ignore_timestamps (bool)

  • now_offset (bool)

  • asset_column (str)

  • timestamp_column (str)

__init__(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Initialize the CSV source.

Parameters:
  • path (Path | str) – Path to the CSV file.

  • playback (bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in CSV.

  • ignore_timestamps (bool) – When True, use virtual clock time instead of CSV timestamps.

  • now_offset (bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.

  • asset_column (str) – Name of the column containing asset names.

  • timestamp_column (str) – Name of the timestamp column.

Return type:

None

final class kelvin.testing.DataFrameSource(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Bases: TabularSource

Data source that replays messages from a pandas DataFrame.

The DataFrame should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.

Usage:
source = DataFrameSource(df, playback=True, now_offset=True)

.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))

# Select specific columns source = DataFrameSource(df).with_columns(“temperature”, “pressure”)

# Map columns to datastream names source = DataFrameSource(df).with_column_mapping({

“temp_c”: “temperature”, “pressure_psi”: “pressure”,

})

Parameters:
  • df (pd.DataFrame)

  • playback (bool)

  • ignore_timestamps (bool)

  • now_offset (bool)

  • asset_column (str)

  • timestamp_column (str)

__init__(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Initialize the DataFrame source.

Parameters:
  • df (DataFrame) – Pandas DataFrame containing the data.

  • playback (bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in DataFrame.

  • ignore_timestamps (bool) – When True, use virtual clock time instead of DataFrame timestamps.

  • now_offset (bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.

  • asset_column (str) – Name of the column containing asset names.

  • timestamp_column (str) – Name of the timestamp column.

Return type:

None

final class kelvin.testing.RandomSource(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]

Bases: DataSource

Data source that generates random values for fuzz testing.

Generates messages with random numeric values within specified bounds, optionally across multiple datastreams.

Usage:
source = RandomSource(

datastreams=[“temperature”, “pressure”], min_value=0, max_value=100,

).with_asset(“sensor-001”)

Parameters:
  • datastreams (Optional[list[str]])

  • min_value (float)

  • max_value (float)

  • seed (Optional[int])

  • count (Optional[int])

  • value_type (str)

__init__(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]

Initialize the random source.

Parameters:
  • datastreams (Optional[list[str]]) – List of datastream names. If None, uses [“value”].

  • min_value (float) – Minimum random value (for numeric types).

  • max_value (float) – Maximum random value (for numeric types).

  • seed (Optional[int]) – Optional random seed for reproducibility.

  • count (Optional[int]) – Optional number of messages to generate. If None, generates indefinitely.

  • value_type (str) – Type of values to generate (“number”, “string”, “boolean”).

Return type:

None

async generate(clock)[source]

Generate random messages.

Parameters:

clock (VirtualClock) – VirtualClock for time control.

Yields:

Messages with random values.

Return type:

AsyncGenerator[Message, None]

with_datastreams(datastreams)[source]

Set the datastreams to generate for.

Parameters:

datastreams (list[str]) – List of datastream names.

Return type:

RandomSource

Returns:

Self for chaining.

with_range(min_value, max_value)[source]

Set the value range for numeric generation.

Parameters:
  • min_value (float) – Minimum value.

  • max_value (float) – Maximum value.

Return type:

RandomSource

Returns:

Self for chaining.

with_count(count)[source]

Set the number of messages to generate.

Parameters:

count (int) – Number of messages.

Return type:

RandomSource

Returns:

Self for chaining.

final class kelvin.testing.SyntheticSource(pattern, datastream, sample_rate=None, duration=None)[source]

Bases: DataSource

Data source that generates synthetic values from wave patterns.

Generates messages at regular intervals with values computed from a wave pattern function.

Usage:
source = SyntheticSource(

pattern=SineWave(amplitude=10, period=timedelta(minutes=1)), datastream=”temperature”,

).with_asset(“sensor-001”)

Parameters:
  • pattern (WavePattern)

  • datastream (str)

  • sample_rate (Optional[timedelta])

  • duration (Optional[timedelta])

__init__(pattern, datastream, sample_rate=None, duration=None)[source]

Initialize the synthetic source.

Parameters:
  • pattern (WavePattern) – Wave pattern for value generation.

  • datastream (str) – Name of the datastream for messages.

  • sample_rate (Optional[timedelta]) – Time between samples.

  • duration (Optional[timedelta]) – Optional total duration. If None, generates indefinitely.

Return type:

None

async generate(clock)[source]

Generate messages from the wave pattern.

Parameters:

clock (VirtualClock) – VirtualClock for time control.

Yields:

Number messages with values from the pattern.

Return type:

AsyncGenerator[Message, None]

final class kelvin.testing.SineWave(amplitude=1.0, period=None, offset=0.0, phase=0.0)[source]

Bases: WavePattern

Sine wave pattern.

Generates values following a sinusoidal function: value = offset + amplitude * sin(2*pi*t/period + phase)

+amp |    *****
     |  **     **
  0  |**         ***         **
     |              **     **
-amp |                *****
     +-----------------------------> t
      0    T/4   T/2  3T/4   T
Parameters:
  • amplitude (float)

  • period (Optional[timedelta])

  • offset (float)

  • phase (float)

__init__(amplitude=1.0, period=None, offset=0.0, phase=0.0)[source]

Initialize the sine wave.

Parameters:
  • amplitude (float) – Wave amplitude (peak deviation from offset).

  • period (Optional[timedelta]) – Time for one complete cycle.

  • offset (float) – DC offset (center value).

  • phase (float) – Phase offset in radians.

Return type:

None

value_at(t)[source]

Get the sine wave value at time t.

Return type:

float

Parameters:

t (float)

final class kelvin.testing.SquareWave(amplitude=1.0, period=None, offset=0.0, duty_cycle=0.5)[source]

Bases: WavePattern

Square wave pattern.

Generates values that alternate between high and low: - First half of period: offset + amplitude - Second half of period: offset - amplitude

+amp  ┌─────────┐           ┌─────────┐
      │         │           │         │
-amp  ┘         └───────────┘         └───
                                           t -->
      |← duty →|
      |←──── period ────→|
Parameters:
  • amplitude (float)

  • period (Optional[timedelta])

  • offset (float)

  • duty_cycle (float)

__init__(amplitude=1.0, period=None, offset=0.0, duty_cycle=0.5)[source]

Initialize the square wave.

Parameters:
  • amplitude (float) – Wave amplitude.

  • period (Optional[timedelta]) – Time for one complete cycle.

  • offset (float) – DC offset.

  • duty_cycle (float) – Fraction of period spent at high value (0-1).

Return type:

None

value_at(t)[source]

Get the square wave value at time t.

Return type:

float

Parameters:

t (float)

final class kelvin.testing.RampWave(min_value=0.0, max_value=1.0, period=None)[source]

Bases: WavePattern

Ramp (sawtooth) wave pattern.

Generates values that linearly increase from min to max, then reset to min.

max |    /|    /|    /|
    |   / |   / |   / |
    |  /  |  /  |  /  |
min | /   | /   | /   |
    +-------------------------> t
     |period|
Parameters:
  • min_value (float)

  • max_value (float)

  • period (Optional[timedelta])

__init__(min_value=0.0, max_value=1.0, period=None)[source]

Initialize the ramp wave.

Parameters:
  • min_value (float) – Minimum value (start of ramp).

  • max_value (float) – Maximum value (end of ramp).

  • period (Optional[timedelta]) – Time for one complete cycle.

Return type:

None

value_at(t)[source]

Get the ramp wave value at time t.

Return type:

float

Parameters:

t (float)

final class kelvin.testing.NoiseWave(base, std_dev=1.0, seed=None)[source]

Bases: WavePattern

Noise wave that adds gaussian noise to another pattern.

Wraps a base pattern and adds random noise to its values.

     |    *
     |  *   *  *      *
base |*───────*──*────────
     |    *        *
     |         *
     +-------------------------> t
Parameters:
__init__(base, std_dev=1.0, seed=None)[source]

Initialize the noise wave.

Parameters:
  • base (WavePattern) – Base wave pattern to add noise to.

  • std_dev (float) – Standard deviation of gaussian noise.

  • seed (Optional[int]) – Optional random seed for reproducibility.

Return type:

None

value_at(t)[source]

Get the base wave value plus noise at time t.

Return type:

float

Parameters:

t (float)

final class kelvin.testing.ConstantWave(value=0.0)[source]

Bases: WavePattern

Constant value pattern.

Always returns the same value, useful for baseline testing.

     |
value|──────────────────────
     |
     +-------------------------> t
Parameters:

value (float)

__init__(value=0.0)[source]

Initialize the constant wave.

Parameters:

value (float) – Constant value to return.

Return type:

None

value_at(t)[source]

Get the constant value.

Return type:

float

Parameters:

t (float)

KelvinAppTest

KelvinAppTest harness for testing KelvinApp instances.

This module provides a test harness that wraps a KelvinApp with an in-memory stream and virtual clock for deterministic testing without network dependencies.

Example

>>> app = KelvinApp()
>>> manifest = ManifestBuilder().add_asset("test").build()
>>> async with KelvinAppTest(app, manifest) as harness:
...     await harness.publish(Number(resource=..., payload=5.0))
...     await harness.run_until_idle()
...     assert len(harness.outputs) == 1
final class kelvin.testing.app_test.KelvinAppTest(app, manifest, clock=None)[source]

Bases: object

Test harness for KelvinApp instances.

Wraps a KelvinApp with an in-memory InMemoryStream and VirtualClock for deterministic testing without network dependencies. Supports reuse after disconnect - state is cleared on each connect.

Parameters:
app

The underlying KelvinApp instance.

manifest

The RuntimeManifest injected on connect.

clock

The VirtualClock for time control.

is_connected

Whether the harness is currently connected.

inputs

Copy of all injected input messages.

outputs

Copy of all captured output messages.

Example

>>> async with KelvinAppTest(app, manifest) as harness:
...     await harness.publish(msg)
...     await harness.run_until_idle()
...     assert len(harness.outputs) == 1
__init__(app, manifest, clock=None)[source]

Initialize the test harness.

Parameters:
  • app (KelvinApp) – The KelvinApp instance to test. Must be disconnected.

  • manifest (RuntimeManifest) – RuntimeManifest to inject when connecting.

  • clock (Optional[VirtualClock]) – Optional VirtualClock. If None, creates one with manual time control.

Raises:
  • RuntimeError – If the app is already connected or has a running task.

  • ValueError – If the app is missing required internal attributes.

Return type:

None

Example

>>> harness = KelvinAppTest(app, manifest)
>>> harness = KelvinAppTest(app, manifest, clock=VirtualClock())
__repr__()[source]

Return a string representation for debugging.

Return type:

str

Returns:

String showing connection state and message counts.

add_source(source)[source]

Add a data source for input generation.

Sources are started when connect() is called and stopped on disconnect(). Must be called before connect().

Parameters:

source (DataSource) – DataSource that generates messages to inject.

Return type:

Self

Returns:

Self for method chaining.

Raises:

RuntimeError – If called after connect().

Example

>>> harness.add_source(source1).add_source(source2)
async publish(msg)[source]

Inject a message into the input queue and record it.

The message is added to the inputs list and injected into the in-memory stream for processing by the app.

Parameters:

msg (Union[Message, MessageBuilder[Any]]) – Message or MessageBuilder to inject as input.

Raises:

RuntimeError – If called before connect().

Return type:

None

Example

>>> await harness.publish(Number(resource=krn, payload=5.0))
async publish_batch(messages)[source]

Inject multiple messages sequentially.

Parameters:

messages (Iterable[Union[Message, MessageBuilder[Any]]]) – Iterable of messages or message builders to inject.

Raises:

RuntimeError – If called before connect().

Return type:

None

Example

>>> await harness.publish_batch([msg1, msg2, msg3])
property app: KelvinApp

Get the underlying KelvinApp instance.

Returns:

The KelvinApp being tested.

property manifest: RuntimeManifest

Get the runtime manifest.

Returns:

The RuntimeManifest injected on connect.

property clock: VirtualClock

Get the virtual clock.

Returns:

The VirtualClock used for time control.

property is_connected: bool

Check if the test harness is connected.

Returns:

True if connected, False otherwise.

property inputs: list[Message]

Get a copy of all injected inputs.

Returns:

Copy of the inputs list. Modifications don’t affect internal state.

property outputs: list[Message]

Get a copy of all captured outputs.

Drains any pending outputs from the output queue before returning. Each access may return additional outputs if the app has published more.

Returns:

Copy of all captured output messages.

async advance_time(seconds)[source]

Advance virtual time and yield to the event loop.

Parameters:

seconds (float) – Number of seconds to advance. Must be >= 0.

Raises:

ValueError – If seconds is negative.

Return type:

None

Example

>>> await harness.advance_time(10.0)  # Advance 10 seconds
async run_until_idle(timeout=5.0)[source]

Run until queues are drained and processing completes.

Advances virtual time to process pending sleeps, then waits for all queues to be fully processed using join().

Parameters:

timeout (float) – Maximum virtual time to advance in seconds. Must be positive. Default is 5.0 seconds.

Raises:

ValueError – If timeout is not positive.

Return type:

None

Example

>>> await harness.run_until_idle()
>>> await harness.run_until_idle(timeout=10.0)
async connect()[source]

Connect the app and start data sources.

Clears inputs/outputs for a fresh start. Can be called again after disconnect() to reuse the harness.

Return type:

Self

Returns:

Self for method chaining.

Raises:

RuntimeError – If already connected.

Example

>>> await harness.connect()
>>> # ... run tests ...
>>> await harness.disconnect()
async disconnect()[source]

Disconnect the app and stop data sources.

Safe to call multiple times or when not connected (no-op).

Example

>>> await harness.disconnect()
Return type:

None

async __aenter__()[source]

Enter the async context and connect.

Return type:

Self

Returns:

Self for use in the context.

Example

>>> async with KelvinAppTest(app, manifest) as harness:
...     await harness.publish(msg)
async __aexit__(_exc_type=None, _exc_value=None, _traceback=None)[source]

Exit the async context and disconnect.

Always disconnects, even if an exception occurred.

Return type:

None

Parameters:

ManifestBuilder

ManifestBuilder for creating test RuntimeManifests.

class kelvin.testing.manifest.ManifestBuilder[source]

Bases: object

Fluent builder for creating RuntimeManifest instances for testing.

Provides a chainable API for constructing RuntimeManifest objects with datastreams, assets, and configuration for use in testing scenarios.

Examples

>>> manifest = (ManifestBuilder()
...     .add_datastream("temperature", "number", WayEnum.input)
...     .add_datastream("alert", "boolean", WayEnum.output)
...     .add_asset("pump-001", properties={"location": "A1"})
...     .set_configuration({"threshold": 100})
...     .build())
__init__()[source]

Initialize an empty manifest builder.

Return type:

None

__repr__()[source]

Return a string representation of the builder state.

Return type:

str

classmethod from_app_yaml(path=None)[source]

Load base configuration from app.yaml.

Supports both legacy format (inputs/outputs at top level) and modern format (spec_version 5.0.0 with data_streams section).

Parameters:

path (Optional[Path]) – Path to app.yaml file.

Return type:

ManifestBuilder

Returns:

ManifestBuilder populated from app.yaml.

Raises:
  • FileNotFoundError – If the specified path does not exist.

  • yaml.YAMLError – If the YAML file is malformed.

classmethod from_dict(config)[source]

Create a ManifestBuilder from a dictionary.

Parameters:

config (dict[str, Any]) – Dictionary with manifest configuration.

Return type:

ManifestBuilder

Returns:

ManifestBuilder populated from the dictionary.

add_asset(name, properties=None, parameters=None)[source]

Add an asset to the manifest.

Note: If an asset with the same name already exists, it will be overwritten.

Parameters:
  • name (str) – Asset name (cannot be None, empty, or whitespace-only).

  • properties (dict[str, Any] | None) – Optional asset properties.

  • parameters (dict[str, Any] | None) – Optional asset parameters.

Return type:

Self

Returns:

Self for chaining.

Raises:

ValueError – If name is None, empty, or whitespace-only.

add_datastream(name, data_type='number', way=WayEnum.input, unit=None, configuration=None)[source]

Add a datastream definition to the manifest.

Note: If a datastream with the same name already exists, it will be overwritten.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object). Defaults to “number”.

  • way (WayEnum) – Input/output direction (input, output, input_cc, output_cc, input_cc_output, input_output_cc). Defaults to WayEnum.input.

  • unit (str | None) – Optional unit name (e.g., “celsius”, “meters”).

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_input(name, data_type='number', unit=None, configuration=None)[source]

Add an input datastream.

Convenience method that calls add_datastream with WayEnum.input.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_output(name, data_type='number', unit=None, configuration=None)[source]

Add an output datastream.

Convenience method that calls add_datastream with WayEnum.output.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_control_change_input(name, data_type='number', unit=None, configuration=None)[source]

Add a control change input datastream.

Convenience method that calls add_datastream with WayEnum.input_cc.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_control_change_output(name, data_type='number', unit=None, configuration=None)[source]

Add a control change output datastream.

Convenience method that calls add_datastream with WayEnum.output_cc.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_input_cc_output(name, data_type='number', unit=None, configuration=None)[source]

Add a datastream that is both input control change and output.

Convenience method that calls add_datastream with WayEnum.input_cc_output. Use for owned datastreams with read-write access.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_input_output_cc(name, data_type='number', unit=None, configuration=None)[source]

Add a datastream that is both input and output control change.

Convenience method that calls add_datastream with WayEnum.input_output_cc. Use for remote datastreams with read-write access.

Parameters:
  • name (str) – Datastream name (cannot be None, empty, or whitespace-only).

  • data_type (str) – Data type (number, string, boolean, object).

  • unit (str | None) – Optional unit name.

  • configuration (dict[str, Any] | None) – Optional per-datastream configuration dict.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • ValueError – If name is None, empty, or whitespace-only.

  • ValueError – If data_type is not one of: boolean, number, object, string.

add_custom_action(action_type, way=CAWayEnum.output_ca)[source]

Add a custom action to the manifest.

Note: If a custom action with the same type already exists, it will be overwritten.

Parameters:
  • action_type (str) – Custom action type identifier (cannot be None, empty, or whitespace-only).

  • way (CAWayEnum) – Direction of the custom action (input_ca or output_ca). Defaults to output_ca.

Return type:

Self

Returns:

Self for chaining.

Raises:

ValueError – If action_type is None, empty, or whitespace-only.

add_custom_action_input(action_type)[source]

Add an input custom action.

Convenience method that calls add_custom_action with CAWayEnum.input_ca.

Parameters:

action_type (str) – Custom action type identifier (cannot be None, empty, or whitespace-only).

Return type:

Self

Returns:

Self for chaining.

Raises:

ValueError – If action_type is None, empty, or whitespace-only.

add_custom_action_output(action_type)[source]

Add an output custom action.

Convenience method that calls add_custom_action with CAWayEnum.output_ca.

Parameters:

action_type (str) – Custom action type identifier (cannot be None, empty, or whitespace-only).

Return type:

Self

Returns:

Self for chaining.

Raises:

ValueError – If action_type is None, empty, or whitespace-only.

add_assets(assets)[source]

Add multiple assets to the manifest.

Note: If an asset with the same name already exists, it will be overwritten.

Parameters:

assets (list[dict[str, Any]]) – List of dicts with ‘name’ (required), ‘properties’, ‘parameters’ keys.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • KeyError – If any asset dict is missing the required ‘name’ key.

  • ValueError – If any asset name is None, empty, or whitespace-only.

add_datastreams(datastreams)[source]

Add multiple datastreams to the manifest.

Note: If a datastream with the same name already exists, it will be overwritten.

Parameters:

datastreams (list[dict[str, Any]]) – List of dicts with ‘name’ (required), ‘data_type’, ‘way’, ‘unit’, ‘configuration’ keys.

Return type:

Self

Returns:

Self for chaining.

Raises:
  • KeyError – If any datastream dict is missing the required ‘name’ key.

  • ValueError – If any datastream name is None, empty, or whitespace-only.

  • ValueError – If ‘way’ is an invalid string or ‘data_type’ is invalid.

add_custom_actions(actions)[source]

Add multiple custom actions to the manifest.

Note: If a custom action with the same type already exists, it will be overwritten.

Parameters:

actions (list[dict[str, Any]]) – List of dicts with ‘type’ (required) and ‘way’ (optional) keys. The ‘way’ can be a CAWayEnum or string (‘input-ca’ or ‘output-ca’).

Return type:

Self

Returns:

Self for chaining.

Raises:
  • KeyError – If any action dict is missing the required ‘type’ key.

  • ValueError – If any action type is None, empty, or whitespace-only.

  • ValueError – If ‘way’ is an invalid string.

set_configuration(config)[source]

Set the app configuration.

Note: This replaces any existing configuration entirely.

Parameters:

config (dict[str, Any]) – Configuration dictionary.

Return type:

Self

Returns:

Self for chaining.

clear()[source]

Clear all builder state to allow reuse.

Resets assets, datastreams, custom actions, and configuration to empty state. Useful for building multiple manifests with the same builder instance.

Return type:

Self

Returns:

Self for chaining.

build()[source]

Build the RuntimeManifest.

Constructs the final RuntimeManifest from all configured assets, datastreams, custom actions, and configuration.

Return type:

RuntimeManifest

Returns:

RuntimeManifest instance ready for use in testing.

InMemoryStream

InMemoryStream implementation for in-memory message passing.

final class kelvin.testing.in_memory_stream.InMemoryStream(manifest)[source]

Bases: StreamInterface

In-memory stream implementation for testing.

Replaces KelvinStream with queues for deterministic testing. Messages can be injected via inject() and outputs collected via drain_outputs().

Parameters:

manifest (RuntimeManifest)

__init__(manifest)[source]

Initialize the test stream.

Parameters:

manifest (RuntimeManifest) – RuntimeManifest to inject on connect.

Return type:

None

async connect()[source]

Connect and inject the manifest as the first message.

Raises:

ConnectionError – If the stream is already connected.

Return type:

None

async disconnect()[source]

Disconnect the stream.

Return type:

None

async read()[source]

Read the next message from the input queue.

Blocks until a message is available. Calls task_done() immediately after dequeuing so that wait_for_inputs() can track dequeue progress. Note that this signals the message has been dequeued, not that the handler has finished processing it.

Raises:

ConnectionError – If the stream is not connected.

Return type:

Message

async write(msg)[source]

Write a message to the output queue.

Parameters:

msg (Message) – Message to write.

Return type:

bool

Returns:

True on success.

Raises:

ConnectionError – If the stream is not connected.

async inject(msg)[source]

Inject a message into the input queue (for test harness).

Parameters:

msg (Message) – Message to inject as input.

Raises:

ConnectionError – If the stream is not connected.

Return type:

None

drain_outputs()[source]

Collect all outputs without blocking (for test harness).

Return type:

list[Message]

Returns:

List of all messages in the output queue.

property is_connected: bool

Check if the stream is connected.

input_queue_size()[source]

Get the number of messages waiting in the input queue.

Return type:

int

async wait_for_inputs()[source]

Wait for all input messages to be dequeued.

Blocks until every message in the input queue has been read (task_done called). This signals that messages have been dequeued, not that handlers have finished processing them. Use KelvinAppTest.run_until_idle() for full synchronization including handler completion.

Return type:

None

reset()[source]

Clear all queues for harness reuse.

Return type:

None

Data Sources

Data sources for test input generation.

class kelvin.testing.sources.DataSource[source]

Bases: ABC

Abstract base class for test data sources.

Data sources generate messages that are fed into the test harness as simulated inputs. They can be time-based (CSV replay, synthetic waveforms) or event-based (random generation).

Usage:

source = CSVSource(“data.csv”).with_asset(“pump-001”) test.add_source(source)

__init__()[source]

Initialize the data source.

Return type:

None

abstractmethod async generate(clock)[source]

Generate messages.

This is an async generator that yields messages at appropriate times. The implementation should use clock.sleep() to wait between messages for deterministic time-based generation.

Parameters:

clock (VirtualClock) – VirtualClock for time control.

Yields:

Message instances to inject as inputs.

Return type:

AsyncGenerator[Message, None]

with_asset(asset)[source]

Set the asset name for generated messages.

Parameters:

asset (str) – Asset name to use.

Return type:

Self

Returns:

Self for chaining.

with_timing(interval)[source]

Set the timing interval between messages.

Parameters:

interval (timedelta) – Time interval between messages.

Return type:

Self

Returns:

Self for chaining.

property asset: str | None

Get the configured asset name.

property interval: timedelta

Get the configured timing interval.

class kelvin.testing.sources.TabularSource(playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Bases: DataSource, ABC

Base class for tabular data sources (CSV, DataFrame).

This class provides shared functionality for data sources that work with tabular data (rows and columns), including: - Timestamp parsing and offset management - Column selection and mapping - Message creation from typed values - Asset resolution from columns or fallback - Unified generate() implementation

Subclasses must implement: - _iter_rows(): Yield rows as dictionaries - _get_columns(): Return list of column names - _has_timestamp_column(): Check if timestamp column exists

Parameters:
  • playback (bool)

  • ignore_timestamps (bool)

  • now_offset (bool)

  • asset_column (str)

  • timestamp_column (str)

__init__(playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Initialize the tabular source.

Parameters:
  • playback (bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in data.

  • ignore_timestamps (bool) – When True, use virtual clock time instead of data timestamps.

  • now_offset (bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.

  • asset_column (str) – Name of the column containing asset names.

  • timestamp_column (str) – Name of the timestamp column.

Return type:

None

with_columns(*columns)[source]

Select specific columns as inputs.

Only the selected columns will be processed as datastreams. Can be called with varargs or a single list. Empty strings are filtered out.

Parameters:

columns (str | list[str]) – Column names to select.

Return type:

Self

Returns:

Self for chaining.

Examples

source.with_columns(“temperature”, “pressure”) source.with_columns([“temperature”, “pressure”])

with_column_mapping(mapping)[source]

Map column names to datastream names.

Allows columns with arbitrary names to map to specific datastreams.

Parameters:

mapping (dict[str, str]) – Dictionary mapping column names to datastream names.

Return type:

Self

Returns:

Self for chaining.

Example

source.with_column_mapping({

“temp_c”: “temperature”, “pressure_psi”: “pressure”,

})

async generate(clock)[source]

Generate messages from tabular data.

Iterates through rows and yields messages at appropriate timestamps, using the virtual clock for time control. Processes active columns and applies column mapping.

Parameters:

clock (VirtualClock) – VirtualClock for time control.

Yields:

Message instances from tabular data.

Return type:

AsyncGenerator[Message, None]

final class kelvin.testing.sources.CSVSource(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Bases: TabularSource

Data source that replays messages from a CSV file.

The CSV file should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.

Usage:
source = CSVSource(“data.csv”, playback=True, now_offset=True)

.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))

# Select specific columns source = CSVSource(“data.csv”).with_columns(“temperature”, “pressure”)

# Map columns to datastream names source = CSVSource(“data.csv”).with_column_mapping({

“temp_c”: “temperature”, “pressure_psi”: “pressure”,

})

Parameters:
  • path (Path | str)

  • playback (bool)

  • ignore_timestamps (bool)

  • now_offset (bool)

  • asset_column (str)

  • timestamp_column (str)

__init__(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Initialize the CSV source.

Parameters:
  • path (Path | str) – Path to the CSV file.

  • playback (bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in CSV.

  • ignore_timestamps (bool) – When True, use virtual clock time instead of CSV timestamps.

  • now_offset (bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.

  • asset_column (str) – Name of the column containing asset names.

  • timestamp_column (str) – Name of the timestamp column.

Return type:

None

final class kelvin.testing.sources.DataFrameSource(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Bases: TabularSource

Data source that replays messages from a pandas DataFrame.

The DataFrame should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.

Usage:
source = DataFrameSource(df, playback=True, now_offset=True)

.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))

# Select specific columns source = DataFrameSource(df).with_columns(“temperature”, “pressure”)

# Map columns to datastream names source = DataFrameSource(df).with_column_mapping({

“temp_c”: “temperature”, “pressure_psi”: “pressure”,

})

Parameters:
  • df (pd.DataFrame)

  • playback (bool)

  • ignore_timestamps (bool)

  • now_offset (bool)

  • asset_column (str)

  • timestamp_column (str)

__init__(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Initialize the DataFrame source.

Parameters:
  • df (DataFrame) – Pandas DataFrame containing the data.

  • playback (bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in DataFrame.

  • ignore_timestamps (bool) – When True, use virtual clock time instead of DataFrame timestamps.

  • now_offset (bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.

  • asset_column (str) – Name of the column containing asset names.

  • timestamp_column (str) – Name of the timestamp column.

Return type:

None

final class kelvin.testing.sources.RandomSource(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]

Bases: DataSource

Data source that generates random values for fuzz testing.

Generates messages with random numeric values within specified bounds, optionally across multiple datastreams.

Usage:
source = RandomSource(

datastreams=[“temperature”, “pressure”], min_value=0, max_value=100,

).with_asset(“sensor-001”)

Parameters:
  • datastreams (Optional[list[str]])

  • min_value (float)

  • max_value (float)

  • seed (Optional[int])

  • count (Optional[int])

  • value_type (str)

__init__(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]

Initialize the random source.

Parameters:
  • datastreams (Optional[list[str]]) – List of datastream names. If None, uses [“value”].

  • min_value (float) – Minimum random value (for numeric types).

  • max_value (float) – Maximum random value (for numeric types).

  • seed (Optional[int]) – Optional random seed for reproducibility.

  • count (Optional[int]) – Optional number of messages to generate. If None, generates indefinitely.

  • value_type (str) – Type of values to generate (“number”, “string”, “boolean”).

Return type:

None

async generate(clock)[source]

Generate random messages.

Parameters:

clock (VirtualClock) – VirtualClock for time control.

Yields:

Messages with random values.

Return type:

AsyncGenerator[Message, None]

with_datastreams(datastreams)[source]

Set the datastreams to generate for.

Parameters:

datastreams (list[str]) – List of datastream names.

Return type:

RandomSource

Returns:

Self for chaining.

with_range(min_value, max_value)[source]

Set the value range for numeric generation.

Parameters:
  • min_value (float) – Minimum value.

  • max_value (float) – Maximum value.

Return type:

RandomSource

Returns:

Self for chaining.

with_count(count)[source]

Set the number of messages to generate.

Parameters:

count (int) – Number of messages.

Return type:

RandomSource

Returns:

Self for chaining.

final class kelvin.testing.sources.SyntheticSource(pattern, datastream, sample_rate=None, duration=None)[source]

Bases: DataSource

Data source that generates synthetic values from wave patterns.

Generates messages at regular intervals with values computed from a wave pattern function.

Usage:
source = SyntheticSource(

pattern=SineWave(amplitude=10, period=timedelta(minutes=1)), datastream=”temperature”,

).with_asset(“sensor-001”)

Parameters:
  • pattern (WavePattern)

  • datastream (str)

  • sample_rate (Optional[timedelta])

  • duration (Optional[timedelta])

__init__(pattern, datastream, sample_rate=None, duration=None)[source]

Initialize the synthetic source.

Parameters:
  • pattern (WavePattern) – Wave pattern for value generation.

  • datastream (str) – Name of the datastream for messages.

  • sample_rate (Optional[timedelta]) – Time between samples.

  • duration (Optional[timedelta]) – Optional total duration. If None, generates indefinitely.

Return type:

None

async generate(clock)[source]

Generate messages from the wave pattern.

Parameters:

clock (VirtualClock) – VirtualClock for time control.

Yields:

Number messages with values from the pattern.

Return type:

AsyncGenerator[Message, None]

Base class for data sources.

class kelvin.testing.sources.base.DataSource[source]

Bases: ABC

Abstract base class for test data sources.

Data sources generate messages that are fed into the test harness as simulated inputs. They can be time-based (CSV replay, synthetic waveforms) or event-based (random generation).

Usage:

source = CSVSource(“data.csv”).with_asset(“pump-001”) test.add_source(source)

__init__()[source]

Initialize the data source.

Return type:

None

abstractmethod async generate(clock)[source]

Generate messages.

This is an async generator that yields messages at appropriate times. The implementation should use clock.sleep() to wait between messages for deterministic time-based generation.

Parameters:

clock (VirtualClock) – VirtualClock for time control.

Yields:

Message instances to inject as inputs.

Return type:

AsyncGenerator[Message, None]

with_asset(asset)[source]

Set the asset name for generated messages.

Parameters:

asset (str) – Asset name to use.

Return type:

Self

Returns:

Self for chaining.

with_timing(interval)[source]

Set the timing interval between messages.

Parameters:

interval (timedelta) – Time interval between messages.

Return type:

Self

Returns:

Self for chaining.

property asset: str | None

Get the configured asset name.

property interval: timedelta

Get the configured timing interval.

CSV data source for replaying recorded data.

final class kelvin.testing.sources.csv_source.CSVSource(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Bases: TabularSource

Data source that replays messages from a CSV file.

The CSV file should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.

Usage:
source = CSVSource(“data.csv”, playback=True, now_offset=True)

.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))

# Select specific columns source = CSVSource(“data.csv”).with_columns(“temperature”, “pressure”)

# Map columns to datastream names source = CSVSource(“data.csv”).with_column_mapping({

“temp_c”: “temperature”, “pressure_psi”: “pressure”,

})

Parameters:
  • path (Path | str)

  • playback (bool)

  • ignore_timestamps (bool)

  • now_offset (bool)

  • asset_column (str)

  • timestamp_column (str)

__init__(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Initialize the CSV source.

Parameters:
  • path (Path | str) – Path to the CSV file.

  • playback (bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in CSV.

  • ignore_timestamps (bool) – When True, use virtual clock time instead of CSV timestamps.

  • now_offset (bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.

  • asset_column (str) – Name of the column containing asset names.

  • timestamp_column (str) – Name of the timestamp column.

Return type:

None

DataFrame data source for replaying recorded data.

final class kelvin.testing.sources.dataframe_source.DataFrameSource(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Bases: TabularSource

Data source that replays messages from a pandas DataFrame.

The DataFrame should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.

Usage:
source = DataFrameSource(df, playback=True, now_offset=True)

.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))

# Select specific columns source = DataFrameSource(df).with_columns(“temperature”, “pressure”)

# Map columns to datastream names source = DataFrameSource(df).with_column_mapping({

“temp_c”: “temperature”, “pressure_psi”: “pressure”,

})

Parameters:
  • df (pd.DataFrame)

  • playback (bool)

  • ignore_timestamps (bool)

  • now_offset (bool)

  • asset_column (str)

  • timestamp_column (str)

__init__(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]

Initialize the DataFrame source.

Parameters:
  • df (DataFrame) – Pandas DataFrame containing the data.

  • playback (bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in DataFrame.

  • ignore_timestamps (bool) – When True, use virtual clock time instead of DataFrame timestamps.

  • now_offset (bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.

  • asset_column (str) – Name of the column containing asset names.

  • timestamp_column (str) – Name of the timestamp column.

Return type:

None

Random data source for fuzz testing.

final class kelvin.testing.sources.random_source.RandomSource(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]

Bases: DataSource

Data source that generates random values for fuzz testing.

Generates messages with random numeric values within specified bounds, optionally across multiple datastreams.

Usage:
source = RandomSource(

datastreams=[“temperature”, “pressure”], min_value=0, max_value=100,

).with_asset(“sensor-001”)

Parameters:
  • datastreams (Optional[list[str]])

  • min_value (float)

  • max_value (float)

  • seed (Optional[int])

  • count (Optional[int])

  • value_type (str)

__init__(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]

Initialize the random source.

Parameters:
  • datastreams (Optional[list[str]]) – List of datastream names. If None, uses [“value”].

  • min_value (float) – Minimum random value (for numeric types).

  • max_value (float) – Maximum random value (for numeric types).

  • seed (Optional[int]) – Optional random seed for reproducibility.

  • count (Optional[int]) – Optional number of messages to generate. If None, generates indefinitely.

  • value_type (str) – Type of values to generate (“number”, “string”, “boolean”).

Return type:

None

async generate(clock)[source]

Generate random messages.

Parameters:

clock (VirtualClock) – VirtualClock for time control.

Yields:

Messages with random values.

Return type:

AsyncGenerator[Message, None]

with_datastreams(datastreams)[source]

Set the datastreams to generate for.

Parameters:

datastreams (list[str]) – List of datastream names.

Return type:

RandomSource

Returns:

Self for chaining.

with_range(min_value, max_value)[source]

Set the value range for numeric generation.

Parameters:
  • min_value (float) – Minimum value.

  • max_value (float) – Maximum value.

Return type:

RandomSource

Returns:

Self for chaining.

with_count(count)[source]

Set the number of messages to generate.

Parameters:

count (int) – Number of messages.

Return type:

RandomSource

Returns:

Self for chaining.

Synthetic data source with wave pattern generation.

class kelvin.testing.sources.synthetic.WavePattern[source]

Bases: ABC

Abstract base class for wave patterns.

Wave patterns define mathematical functions that generate values based on time. They can be combined and modified to create complex test signals.

abstractmethod value_at(t)[source]

Get the value at time t.

Parameters:

t (float) – Time in seconds.

Return type:

float

Returns:

Value at time t.

final class kelvin.testing.sources.synthetic.SineWave(amplitude=1.0, period=None, offset=0.0, phase=0.0)[source]

Bases: WavePattern

Sine wave pattern.

Generates values following a sinusoidal function: value = offset + amplitude * sin(2*pi*t/period + phase)

+amp |    *****
     |  **     **
  0  |**         ***         **
     |              **     **
-amp |                *****
     +-----------------------------> t
      0    T/4   T/2  3T/4   T
Parameters:
  • amplitude (float)

  • period (Optional[timedelta])

  • offset (float)

  • phase (float)

__init__(amplitude=1.0, period=None, offset=0.0, phase=0.0)[source]

Initialize the sine wave.

Parameters:
  • amplitude (float) – Wave amplitude (peak deviation from offset).

  • period (Optional[timedelta]) – Time for one complete cycle.

  • offset (float) – DC offset (center value).

  • phase (float) – Phase offset in radians.

Return type:

None

value_at(t)[source]

Get the sine wave value at time t.

Return type:

float

Parameters:

t (float)

final class kelvin.testing.sources.synthetic.SquareWave(amplitude=1.0, period=None, offset=0.0, duty_cycle=0.5)[source]

Bases: WavePattern

Square wave pattern.

Generates values that alternate between high and low: - First half of period: offset + amplitude - Second half of period: offset - amplitude

+amp  ┌─────────┐           ┌─────────┐
      │         │           │         │
-amp  ┘         └───────────┘         └───
                                           t -->
      |← duty →|
      |←──── period ────→|
Parameters:
  • amplitude (float)

  • period (Optional[timedelta])

  • offset (float)

  • duty_cycle (float)

__init__(amplitude=1.0, period=None, offset=0.0, duty_cycle=0.5)[source]

Initialize the square wave.

Parameters:
  • amplitude (float) – Wave amplitude.

  • period (Optional[timedelta]) – Time for one complete cycle.

  • offset (float) – DC offset.

  • duty_cycle (float) – Fraction of period spent at high value (0-1).

Return type:

None

value_at(t)[source]

Get the square wave value at time t.

Return type:

float

Parameters:

t (float)

final class kelvin.testing.sources.synthetic.RampWave(min_value=0.0, max_value=1.0, period=None)[source]

Bases: WavePattern

Ramp (sawtooth) wave pattern.

Generates values that linearly increase from min to max, then reset to min.

max |    /|    /|    /|
    |   / |   / |   / |
    |  /  |  /  |  /  |
min | /   | /   | /   |
    +-------------------------> t
     |period|
Parameters:
  • min_value (float)

  • max_value (float)

  • period (Optional[timedelta])

__init__(min_value=0.0, max_value=1.0, period=None)[source]

Initialize the ramp wave.

Parameters:
  • min_value (float) – Minimum value (start of ramp).

  • max_value (float) – Maximum value (end of ramp).

  • period (Optional[timedelta]) – Time for one complete cycle.

Return type:

None

value_at(t)[source]

Get the ramp wave value at time t.

Return type:

float

Parameters:

t (float)

final class kelvin.testing.sources.synthetic.NoiseWave(base, std_dev=1.0, seed=None)[source]

Bases: WavePattern

Noise wave that adds gaussian noise to another pattern.

Wraps a base pattern and adds random noise to its values.

     |    *
     |  *   *  *      *
base |*───────*──*────────
     |    *        *
     |         *
     +-------------------------> t
Parameters:
__init__(base, std_dev=1.0, seed=None)[source]

Initialize the noise wave.

Parameters:
  • base (WavePattern) – Base wave pattern to add noise to.

  • std_dev (float) – Standard deviation of gaussian noise.

  • seed (Optional[int]) – Optional random seed for reproducibility.

Return type:

None

value_at(t)[source]

Get the base wave value plus noise at time t.

Return type:

float

Parameters:

t (float)

final class kelvin.testing.sources.synthetic.ConstantWave(value=0.0)[source]

Bases: WavePattern

Constant value pattern.

Always returns the same value, useful for baseline testing.

     |
value|──────────────────────
     |
     +-------------------------> t
Parameters:

value (float)

__init__(value=0.0)[source]

Initialize the constant wave.

Parameters:

value (float) – Constant value to return.

Return type:

None

value_at(t)[source]

Get the constant value.

Return type:

float

Parameters:

t (float)

final class kelvin.testing.sources.synthetic.SyntheticSource(pattern, datastream, sample_rate=None, duration=None)[source]

Bases: DataSource

Data source that generates synthetic values from wave patterns.

Generates messages at regular intervals with values computed from a wave pattern function.

Usage:
source = SyntheticSource(

pattern=SineWave(amplitude=10, period=timedelta(minutes=1)), datastream=”temperature”,

).with_asset(“sensor-001”)

Parameters:
  • pattern (WavePattern)

  • datastream (str)

  • sample_rate (Optional[timedelta])

  • duration (Optional[timedelta])

__init__(pattern, datastream, sample_rate=None, duration=None)[source]

Initialize the synthetic source.

Parameters:
  • pattern (WavePattern) – Wave pattern for value generation.

  • datastream (str) – Name of the datastream for messages.

  • sample_rate (Optional[timedelta]) – Time between samples.

  • duration (Optional[timedelta]) – Optional total duration. If None, generates indefinitely.

Return type:

None

async generate(clock)[source]

Generate messages from the wave pattern.

Parameters:

clock (VirtualClock) – VirtualClock for time control.

Yields:

Number messages with values from the pattern.

Return type:

AsyncGenerator[Message, None]