Testing¶
The kelvin.testing module provides a deterministic, in-memory test harness
for KelvinApp instances. Tests run without network dependencies, use a
virtual clock for time control, and support flexible data injection through
messages and data sources.
Quick Start¶
Add a pytest.ini alongside your tests:
[pytest]
pythonpath = .
Write tests against the app singleton from your main.py:
import pytest
from main import app
from kelvin.krn import KRNAssetDataStream
from kelvin.message import Number
from kelvin.testing import KelvinAppTest, ManifestBuilder
def _build_manifest():
return (
ManifestBuilder()
.add_input("temperature")
.add_output("alert")
.add_asset("pump-001")
)
class TestMyApp:
@pytest.mark.asyncio
async def test_processes_input(self):
harness = KelvinAppTest(app, manifest=_build_manifest().build())
async with harness:
await harness.publish(
Number(
resource=KRNAssetDataStream("pump-001", "temperature"),
payload=42.0,
)
)
await harness.run_until_idle()
outputs = harness.outputs
assert len(outputs) > 0
The pattern is always:
Build a manifest describing assets and datastreams
Create a
KelvinAppTestwrapping your appUse
async withto connect/disconnectPublish inputs and/or advance time
Call
run_until_idle()to let the app process everythingAssert on
harness.outputs
Project Layout¶
A typical test setup alongside your app:
my-app/
main.py # KelvinApp definition
app.yaml # app configuration
pytest.ini # [pytest] pythonpath = .
tests/
test_main.py # tests
data/
sample.csv # test data (optional)
ManifestBuilder¶
ManifestBuilder creates a RuntimeManifest that describes the test
environment: which assets exist, which datastreams are available, and what
configuration the app receives.
Assets¶
ManifestBuilder()
.add_asset("pump-001")
.add_asset("pump-002", parameters={"temperature-limit": 50})
.add_asset("pump-003", properties={"location": "building-a"})
Datastreams¶
Convenience methods for common datastream directions, plus a general-purpose method:
from kelvin.message.runtime_manifest import WayEnum
ManifestBuilder()
# Convenience methods
.add_input("temperature") # input
.add_input("temperature", "number") # input with explicit type
.add_output("alert", "string") # output
.add_control_change_input("setpoint") # input_cc
.add_control_change_output("target-speed") # output_cc
# General-purpose (for bidirectional streams)
.add_datastream("output-number", "number", WayEnum.input_cc_output)
.add_datastream("sensor", "number", WayEnum.input_output_cc)
Supported data types: "number", "string", "boolean", "object".
Custom Actions¶
ManifestBuilder()
.add_custom_action_input("example-in")
.add_custom_action_output("example-out")
Configuration¶
ManifestBuilder()
.set_configuration({"min": 0, "max": 100, "random": True})
Loading from app.yaml¶
from pathlib import Path
manifest = ManifestBuilder.from_app_yaml(Path("app.yaml")).add_asset("pump-001").build()
Complete Example¶
def _build_manifest():
return (
ManifestBuilder()
.add_input("input-temperature")
.add_control_change_output("out-temperature-setpoint")
.add_asset("pump-001", parameters={"temperature-limit": 50, "auto-accept": False})
)
KelvinAppTest¶
Lifecycle¶
The harness wraps your app singleton and manages connection/disconnection:
harness = KelvinAppTest(app, manifest=_build_manifest().build())
# Option 1: async context manager (recommended)
async with harness:
# app is connected here
...
# app is disconnected after the block
# Option 2: manual control
await harness.connect()
# ...
await harness.disconnect()
The harness clears all state on each connect(), so a single instance can
be reused across tests. However, creating a fresh harness per test is the
typical pattern.
Publishing Messages¶
Inject messages into the app as if they arrived from the platform:
from kelvin.krn import KRNAssetDataStream, KRNAsset
from kelvin.message import Number, CustomAction
# Single message
await harness.publish(
Number(resource=KRNAssetDataStream("pump-001", "temperature"), payload=42.0)
)
# Batch publish
await harness.publish_batch([
Number(resource=KRNAssetDataStream("pump-001", "temperature"), payload=42.0),
Number(resource=KRNAssetDataStream("pump-001", "temperature"), payload=43.0),
])
# Custom actions
await harness.publish(
CustomAction(
resource=KRNAsset("pump-001"),
type="example-in",
title="Incoming Action",
expiration_date=timedelta(seconds=30),
)
)
Synchronization with run_until_idle¶
run_until_idle() is the key method for deterministic testing. It:
Advances virtual time to fire pending sleeps/timers
Waits for all input messages to be consumed
Waits for all stream processing to complete
Gives sync tasks (running in threads) a chance to finish
# Default timeout of 5 seconds (virtual time)
await harness.run_until_idle()
# For timers with longer intervals, set timeout past the interval
# e.g., a 10-second timer:
await harness.run_until_idle(timeout=11.0)
The timeout parameter controls how far virtual time can advance. Set it to
at least the timer interval + 1 second.
Reading Outputs¶
outputs = harness.outputs # list of all Message objects the app published
Filter by message type to assert on specific outputs:
from kelvin.message.base_messages import ControlChangeMsg, RecommendationMsg, CustomActionMsg
recs = [m for m in harness.outputs if isinstance(m, RecommendationMsg)]
ccs = [m for m in harness.outputs if isinstance(m, ControlChangeMsg)]
actions = [m for m in harness.outputs if isinstance(m, CustomActionMsg)]
Time Control¶
For fine-grained time manipulation:
# Advance virtual clock by a specific amount
await harness.advance_time(5.0)
Testing Patterns¶
Stream Handlers¶
Test that stream handlers receive the correct messages based on their filters:
@pytest.mark.asyncio
async def test_receives_matching_asset(self, capsys):
harness = KelvinAppTest(app, manifest=_build_manifest().build())
async with harness:
await harness.publish(
Number(resource=KRNAssetDataStream("test-asset-1", "input-1"), payload=1.0)
)
await harness.run_until_idle()
captured = capsys.readouterr().out
assert "Received" in captured
@pytest.mark.asyncio
async def test_ignores_other_assets(self, capsys):
harness = KelvinAppTest(app, manifest=_build_manifest().build())
async with harness:
await harness.publish(
Number(resource=KRNAssetDataStream("test-asset-2", "input-1"), payload=1.0)
)
await harness.run_until_idle()
captured = capsys.readouterr().out
assert "Received" not in captured
Timers¶
Timers fire automatically when virtual time advances past their interval:
@pytest.mark.asyncio
async def test_timer_fires(self, capsys):
harness = KelvinAppTest(app, manifest=ManifestBuilder().build())
async with harness:
# Timer interval is 5s - advance past the first firing
await harness.run_until_idle(timeout=6.0)
captured = capsys.readouterr().out
assert "timer fired" in captured
Test that the app survives a failing timer:
@pytest.mark.asyncio
async def test_app_survives_failing_timer(self):
harness = KelvinAppTest(app, manifest=ManifestBuilder().build())
async with harness:
await harness.run_until_idle(timeout=6.0)
assert app.is_connected # app still running
Windows (Rolling, Tumbling, Hopping)¶
Feed enough messages to fill the window:
@pytest.mark.asyncio
async def test_window_emits_after_count_size(self, capsys):
harness = KelvinAppTest(app, manifest=_build_manifest().build())
async with harness:
# count_size=5, so publish 5 messages
for i in range(1, 6):
await harness.publish(
Number(
resource=KRNAssetDataStream("pump-001", "motor-temperature"),
payload=float(i),
)
)
await harness.run_until_idle()
captured = capsys.readouterr().out
assert "window received" in captured
@pytest.mark.asyncio
async def test_window_does_not_emit_below_count_size(self, capsys):
harness = KelvinAppTest(app, manifest=_build_manifest().build())
async with harness:
for i in range(1, 4): # below count_size
await harness.publish(
Number(
resource=KRNAssetDataStream("pump-001", "motor-temperature"),
payload=float(i),
)
)
await harness.run_until_idle()
captured = capsys.readouterr().out
assert "window received" not in captured
Output Inspection¶
Assert on published messages by type, payload, and resource:
@pytest.mark.asyncio
async def test_publishes_correct_output_types(self):
harness = KelvinAppTest(app, manifest=_build_manifest().build())
async with harness:
await harness.run_until_idle(timeout=11.0)
outputs = harness.outputs
payloads = [msg.payload for msg in outputs]
assert any(isinstance(p, bool) for p in payloads)
assert any(isinstance(p, (int, float)) for p in payloads)
assert any(isinstance(p, str) for p in payloads)
@pytest.mark.asyncio
async def test_all_messages_target_correct_asset(self):
harness = KelvinAppTest(app, manifest=_build_manifest().build())
async with harness:
await harness.run_until_idle()
for msg in harness.outputs:
assert msg.resource.asset == "test-asset-1"
Recommendations and Evidence¶
@pytest.mark.asyncio
async def test_publishes_recommendation_with_evidence(self):
harness = KelvinAppTest(app, manifest=_build_manifest().build())
async with harness:
for temp in [55.0, 56.0, 57.0]:
await harness.publish(
Number(
resource=KRNAssetDataStream("pump-001", "input-temperature"),
payload=temp,
)
)
await harness.run_until_idle()
await harness.run_until_idle(timeout=6.0) # fire the timer
recs = [o for o in harness.outputs if isinstance(o, RecommendationMsg)]
assert len(recs) == 1
rec = recs[0]
assert rec.payload.evidences is not None
assert rec.payload.evidences[0].type == "line-chart"
assert rec.payload.actions.control_changes is not None
Custom Actions¶
@pytest.mark.asyncio
async def test_receives_and_responds_to_custom_action(self):
harness = KelvinAppTest(app, manifest=_build_manifest().build())
async with harness:
await harness.publish(
CustomAction(
resource=KRNAsset("test-asset-1"),
type="example-in",
title="Incoming Action",
expiration_date=timedelta(seconds=30),
)
)
await harness.run_until_idle()
results = [o for o in harness.outputs if isinstance(o, CustomActionResultMsg)]
assert len(results) == 1
assert results[0].payload.success is True
Multiple Timer Firings¶
Chain run_until_idle() calls to fire the timer multiple times:
@pytest.mark.asyncio
async def test_cooldown_prevents_duplicates(self):
harness = KelvinAppTest(app, manifest=_build_manifest().build())
async with harness:
# ... publish inputs ...
await harness.run_until_idle()
# Fire timer twice
await harness.run_until_idle(timeout=6.0)
await harness.run_until_idle(timeout=6.0)
recs = [o for o in harness.outputs if isinstance(o, RecommendationMsg)]
assert len(recs) == 1 # only one due to cooldown
Resetting Module State¶
If your app uses module-level state, reset it between tests:
import main as main_module
def _reset_state():
main_module._counter = 0
main_module._readings.clear()
class TestMyApp:
@pytest.mark.asyncio
async def test_something(self):
_reset_state()
harness = KelvinAppTest(app, manifest=_build_manifest().build())
# ...
Data Sources¶
Data sources automate input generation. Add them to the harness before connecting.
RandomSource¶
Generates random values for fuzz testing:
from kelvin.testing import RandomSource
source = (
RandomSource(datastreams=["temperature", "pressure"], min_value=0, max_value=100)
.with_asset("pump-001")
.with_timing(timedelta(seconds=1))
.with_count(10)
)
harness = KelvinAppTest(app, manifest=manifest)
harness.add_source(source)
async with harness:
await harness.run_until_idle()
outputs = harness.outputs
Options:
datastreams: list of datastream names (default:["value"])min_value/max_value: numeric rangeseed: for reproducible resultscount: number of messages (None= indefinite)value_type:"number","string", or"boolean"
CSVSource¶
Replay messages from a CSV file:
from kelvin.testing import CSVSource
source = (
CSVSource("data/sample.csv", playback=True, now_offset=True)
.with_asset("pump-001")
.with_timing(timedelta(seconds=1))
)
# Select specific columns
source = CSVSource("data.csv").with_columns("temperature", "pressure")
# Map column names to datastream names
source = CSVSource("data.csv").with_column_mapping({
"temp_c": "temperature",
"pressure_psi": "pressure",
})
Options:
playback: wait between rows based on timestamp differencesignore_timestamps: use virtual clock time instead of data timestampsnow_offset: offset timestamps so first row starts at current virtual timeasset_column/timestamp_column: column name overrides
DataFrameSource¶
Same as CSVSource but from a pandas DataFrame:
from kelvin.testing import DataFrameSource
source = DataFrameSource(df, playback=True, now_offset=True).with_asset("pump-001")
SyntheticSource¶
Generate values from mathematical wave patterns:
from datetime import timedelta
from kelvin.testing import SyntheticSource, SineWave, SquareWave, RampWave, NoiseWave, ConstantWave
# Sine wave
source = SyntheticSource(
pattern=SineWave(amplitude=10, period=timedelta(minutes=1), offset=20),
datastream="temperature",
sample_rate=timedelta(seconds=1),
duration=timedelta(minutes=5),
).with_asset("pump-001")
# Square wave (on/off signal)
source = SyntheticSource(
pattern=SquareWave(amplitude=1, period=timedelta(seconds=10), duty_cycle=0.5),
datastream="valve-state",
).with_asset("pump-001")
# Ramp (sawtooth)
source = SyntheticSource(
pattern=RampWave(min_value=0, max_value=100, period=timedelta(minutes=1)),
datastream="pressure",
).with_asset("pump-001")
# Noisy sine wave
source = SyntheticSource(
pattern=NoiseWave(base=SineWave(amplitude=10), std_dev=0.5, seed=42),
datastream="temperature",
).with_asset("pump-001")
# Constant value
source = SyntheticSource(
pattern=ConstantWave(value=25.0),
datastream="ambient-temp",
).with_asset("pump-001")
Combining Sources¶
Add multiple sources to the same harness:
harness = KelvinAppTest(app, manifest=manifest)
harness.add_source(temperature_source)
harness.add_source(pressure_source)
async with harness:
await harness.run_until_idle()
API Reference¶
Kelvin testing framework for KelvinApp unit testing.
This module provides a test harness that wraps KelvinApp with an in-memory InMemoryStream and integrates with VirtualClock for deterministic testing.
- final class kelvin.testing.KelvinAppTest(app, manifest, clock=None)[source]¶
Bases:
objectTest harness for KelvinApp instances.
Wraps a KelvinApp with an in-memory InMemoryStream and VirtualClock for deterministic testing without network dependencies. Supports reuse after disconnect - state is cleared on each connect.
- Parameters:
app (KelvinApp)
manifest (RuntimeManifest)
clock (Optional[VirtualClock])
- app¶
The underlying KelvinApp instance.
- manifest¶
The RuntimeManifest injected on connect.
- clock¶
The VirtualClock for time control.
- is_connected¶
Whether the harness is currently connected.
- inputs¶
Copy of all injected input messages.
- outputs¶
Copy of all captured output messages.
Example
>>> async with KelvinAppTest(app, manifest) as harness: ... await harness.publish(msg) ... await harness.run_until_idle() ... assert len(harness.outputs) == 1
- __init__(app, manifest, clock=None)[source]¶
Initialize the test harness.
- Parameters:
app (
KelvinApp) – The KelvinApp instance to test. Must be disconnected.manifest (
RuntimeManifest) – RuntimeManifest to inject when connecting.clock (
Optional[VirtualClock]) – Optional VirtualClock. If None, creates one with manual time control.
- Raises:
RuntimeError – If the app is already connected or has a running task.
ValueError – If the app is missing required internal attributes.
- Return type:
None
Example
>>> harness = KelvinAppTest(app, manifest) >>> harness = KelvinAppTest(app, manifest, clock=VirtualClock())
- __repr__()[source]¶
Return a string representation for debugging.
- Return type:
- Returns:
String showing connection state and message counts.
- add_source(source)[source]¶
Add a data source for input generation.
Sources are started when connect() is called and stopped on disconnect(). Must be called before connect().
- Parameters:
source (
DataSource) – DataSource that generates messages to inject.- Return type:
Self- Returns:
Self for method chaining.
- Raises:
RuntimeError – If called after connect().
Example
>>> harness.add_source(source1).add_source(source2)
- async publish(msg)[source]¶
Inject a message into the input queue and record it.
The message is added to the inputs list and injected into the in-memory stream for processing by the app.
- Parameters:
msg (
Union[Message,MessageBuilder[Any]]) – Message or MessageBuilder to inject as input.- Raises:
RuntimeError – If called before connect().
- Return type:
Example
>>> await harness.publish(Number(resource=krn, payload=5.0))
- async publish_batch(messages)[source]¶
Inject multiple messages sequentially.
- Parameters:
messages (
Iterable[Union[Message,MessageBuilder[Any]]]) – Iterable of messages or message builders to inject.- Raises:
RuntimeError – If called before connect().
- Return type:
Example
>>> await harness.publish_batch([msg1, msg2, msg3])
- property app: KelvinApp¶
Get the underlying KelvinApp instance.
- Returns:
The KelvinApp being tested.
- property manifest: RuntimeManifest¶
Get the runtime manifest.
- Returns:
The RuntimeManifest injected on connect.
- property clock: VirtualClock¶
Get the virtual clock.
- Returns:
The VirtualClock used for time control.
- property is_connected: bool¶
Check if the test harness is connected.
- Returns:
True if connected, False otherwise.
- property inputs: list[Message]¶
Get a copy of all injected inputs.
- Returns:
Copy of the inputs list. Modifications don’t affect internal state.
- property outputs: list[Message]¶
Get a copy of all captured outputs.
Drains any pending outputs from the output queue before returning. Each access may return additional outputs if the app has published more.
- Returns:
Copy of all captured output messages.
- async advance_time(seconds)[source]¶
Advance virtual time and yield to the event loop.
- Parameters:
seconds (
float) – Number of seconds to advance. Must be >= 0.- Raises:
ValueError – If seconds is negative.
- Return type:
Example
>>> await harness.advance_time(10.0) # Advance 10 seconds
- async run_until_idle(timeout=5.0)[source]¶
Run until queues are drained and processing completes.
Advances virtual time to process pending sleeps, then waits for all queues to be fully processed using join().
- Parameters:
timeout (
float) – Maximum virtual time to advance in seconds. Must be positive. Default is 5.0 seconds.- Raises:
ValueError – If timeout is not positive.
- Return type:
Example
>>> await harness.run_until_idle() >>> await harness.run_until_idle(timeout=10.0)
- async connect()[source]¶
Connect the app and start data sources.
Clears inputs/outputs for a fresh start. Can be called again after disconnect() to reuse the harness.
- Return type:
Self- Returns:
Self for method chaining.
- Raises:
RuntimeError – If already connected.
Example
>>> await harness.connect() >>> # ... run tests ... >>> await harness.disconnect()
- async disconnect()[source]¶
Disconnect the app and stop data sources.
Safe to call multiple times or when not connected (no-op).
Example
>>> await harness.disconnect()
- Return type:
- async __aenter__()[source]¶
Enter the async context and connect.
- Return type:
Self- Returns:
Self for use in the context.
Example
>>> async with KelvinAppTest(app, manifest) as harness: ... await harness.publish(msg)
- async __aexit__(_exc_type=None, _exc_value=None, _traceback=None)[source]¶
Exit the async context and disconnect.
Always disconnects, even if an exception occurred.
- Return type:
- Parameters:
_exc_type (type[BaseException] | None)
_exc_value (BaseException | None)
_traceback (TracebackType | None)
- class kelvin.testing.ManifestBuilder[source]¶
Bases:
objectFluent builder for creating RuntimeManifest instances for testing.
Provides a chainable API for constructing RuntimeManifest objects with datastreams, assets, and configuration for use in testing scenarios.
Examples
>>> manifest = (ManifestBuilder() ... .add_datastream("temperature", "number", WayEnum.input) ... .add_datastream("alert", "boolean", WayEnum.output) ... .add_asset("pump-001", properties={"location": "A1"}) ... .set_configuration({"threshold": 100}) ... .build())
- classmethod from_app_yaml(path=None)[source]¶
Load base configuration from app.yaml.
Supports both legacy format (inputs/outputs at top level) and modern format (spec_version 5.0.0 with data_streams section).
- Parameters:
- Return type:
- Returns:
ManifestBuilder populated from app.yaml.
- Raises:
FileNotFoundError – If the specified path does not exist.
yaml.YAMLError – If the YAML file is malformed.
- classmethod from_dict(config)[source]¶
Create a ManifestBuilder from a dictionary.
- add_asset(name, properties=None, parameters=None)[source]¶
Add an asset to the manifest.
Note: If an asset with the same name already exists, it will be overwritten.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
- add_datastream(name, data_type='number', way=WayEnum.input, unit=None, configuration=None)[source]¶
Add a datastream definition to the manifest.
Note: If a datastream with the same name already exists, it will be overwritten.
- Parameters:
name (
str) – Datastream name (cannot be None, empty, or whitespace-only).data_type (
str) – Data type (number, string, boolean, object). Defaults to “number”.way (
WayEnum) – Input/output direction (input, output, input_cc, output_cc, input_cc_output, input_output_cc). Defaults to WayEnum.input.unit (
str|None) – Optional unit name (e.g., “celsius”, “meters”).configuration (
dict[str,Any] |None) – Optional per-datastream configuration dict.
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_input(name, data_type='number', unit=None, configuration=None)[source]¶
Add an input datastream.
Convenience method that calls add_datastream with WayEnum.input.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_output(name, data_type='number', unit=None, configuration=None)[source]¶
Add an output datastream.
Convenience method that calls add_datastream with WayEnum.output.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_control_change_input(name, data_type='number', unit=None, configuration=None)[source]¶
Add a control change input datastream.
Convenience method that calls add_datastream with WayEnum.input_cc.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_control_change_output(name, data_type='number', unit=None, configuration=None)[source]¶
Add a control change output datastream.
Convenience method that calls add_datastream with WayEnum.output_cc.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_input_cc_output(name, data_type='number', unit=None, configuration=None)[source]¶
Add a datastream that is both input control change and output.
Convenience method that calls add_datastream with WayEnum.input_cc_output. Use for owned datastreams with read-write access.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_input_output_cc(name, data_type='number', unit=None, configuration=None)[source]¶
Add a datastream that is both input and output control change.
Convenience method that calls add_datastream with WayEnum.input_output_cc. Use for remote datastreams with read-write access.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_custom_action(action_type, way=CAWayEnum.output_ca)[source]¶
Add a custom action to the manifest.
Note: If a custom action with the same type already exists, it will be overwritten.
- Parameters:
action_type (
str) – Custom action type identifier (cannot be None, empty, or whitespace-only).way (
CAWayEnum) – Direction of the custom action (input_ca or output_ca). Defaults to output_ca.
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If action_type is None, empty, or whitespace-only.
- add_custom_action_input(action_type)[source]¶
Add an input custom action.
Convenience method that calls add_custom_action with CAWayEnum.input_ca.
- Parameters:
action_type (
str) – Custom action type identifier (cannot be None, empty, or whitespace-only).- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If action_type is None, empty, or whitespace-only.
- add_custom_action_output(action_type)[source]¶
Add an output custom action.
Convenience method that calls add_custom_action with CAWayEnum.output_ca.
- Parameters:
action_type (
str) – Custom action type identifier (cannot be None, empty, or whitespace-only).- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If action_type is None, empty, or whitespace-only.
- add_assets(assets)[source]¶
Add multiple assets to the manifest.
Note: If an asset with the same name already exists, it will be overwritten.
- Parameters:
assets (
list[dict[str,Any]]) – List of dicts with ‘name’ (required), ‘properties’, ‘parameters’ keys.- Return type:
Self- Returns:
Self for chaining.
- Raises:
KeyError – If any asset dict is missing the required ‘name’ key.
ValueError – If any asset name is None, empty, or whitespace-only.
- add_datastreams(datastreams)[source]¶
Add multiple datastreams to the manifest.
Note: If a datastream with the same name already exists, it will be overwritten.
- Parameters:
datastreams (
list[dict[str,Any]]) – List of dicts with ‘name’ (required), ‘data_type’, ‘way’, ‘unit’, ‘configuration’ keys.- Return type:
Self- Returns:
Self for chaining.
- Raises:
KeyError – If any datastream dict is missing the required ‘name’ key.
ValueError – If any datastream name is None, empty, or whitespace-only.
ValueError – If ‘way’ is an invalid string or ‘data_type’ is invalid.
- add_custom_actions(actions)[source]¶
Add multiple custom actions to the manifest.
Note: If a custom action with the same type already exists, it will be overwritten.
- Parameters:
actions (
list[dict[str,Any]]) – List of dicts with ‘type’ (required) and ‘way’ (optional) keys. The ‘way’ can be a CAWayEnum or string (‘input-ca’ or ‘output-ca’).- Return type:
Self- Returns:
Self for chaining.
- Raises:
KeyError – If any action dict is missing the required ‘type’ key.
ValueError – If any action type is None, empty, or whitespace-only.
ValueError – If ‘way’ is an invalid string.
- set_configuration(config)[source]¶
Set the app configuration.
Note: This replaces any existing configuration entirely.
- clear()[source]¶
Clear all builder state to allow reuse.
Resets assets, datastreams, custom actions, and configuration to empty state. Useful for building multiple manifests with the same builder instance.
- Return type:
Self- Returns:
Self for chaining.
- class kelvin.testing.DataSource[source]¶
Bases:
ABCAbstract base class for test data sources.
Data sources generate messages that are fed into the test harness as simulated inputs. They can be time-based (CSV replay, synthetic waveforms) or event-based (random generation).
- Usage:
source = CSVSource(“data.csv”).with_asset(“pump-001”) test.add_source(source)
- abstractmethod async generate(clock)[source]¶
Generate messages.
This is an async generator that yields messages at appropriate times. The implementation should use clock.sleep() to wait between messages for deterministic time-based generation.
- Parameters:
clock (
VirtualClock) – VirtualClock for time control.- Yields:
Message instances to inject as inputs.
- Return type:
AsyncGenerator[Message, None]
- with_asset(asset)[source]¶
Set the asset name for generated messages.
- Parameters:
asset (
str) – Asset name to use.- Return type:
Self- Returns:
Self for chaining.
- class kelvin.testing.TabularSource(playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Bases:
DataSource,ABCBase class for tabular data sources (CSV, DataFrame).
This class provides shared functionality for data sources that work with tabular data (rows and columns), including: - Timestamp parsing and offset management - Column selection and mapping - Message creation from typed values - Asset resolution from columns or fallback - Unified generate() implementation
Subclasses must implement: - _iter_rows(): Yield rows as dictionaries - _get_columns(): Return list of column names - _has_timestamp_column(): Check if timestamp column exists
- Parameters:
- __init__(playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Initialize the tabular source.
- Parameters:
playback (
bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in data.ignore_timestamps (
bool) – When True, use virtual clock time instead of data timestamps.now_offset (
bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.asset_column (
str) – Name of the column containing asset names.timestamp_column (
str) – Name of the timestamp column.
- Return type:
None
- with_columns(*columns)[source]¶
Select specific columns as inputs.
Only the selected columns will be processed as datastreams. Can be called with varargs or a single list. Empty strings are filtered out.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
Examples
source.with_columns(“temperature”, “pressure”) source.with_columns([“temperature”, “pressure”])
- with_column_mapping(mapping)[source]¶
Map column names to datastream names.
Allows columns with arbitrary names to map to specific datastreams.
- Parameters:
mapping (
dict[str,str]) – Dictionary mapping column names to datastream names.- Return type:
Self- Returns:
Self for chaining.
Example
- source.with_column_mapping({
“temp_c”: “temperature”, “pressure_psi”: “pressure”,
})
- async generate(clock)[source]¶
Generate messages from tabular data.
Iterates through rows and yields messages at appropriate timestamps, using the virtual clock for time control. Processes active columns and applies column mapping.
- Parameters:
clock (
VirtualClock) – VirtualClock for time control.- Yields:
Message instances from tabular data.
- Return type:
AsyncGenerator[Message, None]
- final class kelvin.testing.CSVSource(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Bases:
TabularSourceData source that replays messages from a CSV file.
The CSV file should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.
- Usage:
- source = CSVSource(“data.csv”, playback=True, now_offset=True)
.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))
# Select specific columns source = CSVSource(“data.csv”).with_columns(“temperature”, “pressure”)
# Map columns to datastream names source = CSVSource(“data.csv”).with_column_mapping({
“temp_c”: “temperature”, “pressure_psi”: “pressure”,
})
- Parameters:
- __init__(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Initialize the CSV source.
- Parameters:
playback (
bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in CSV.ignore_timestamps (
bool) – When True, use virtual clock time instead of CSV timestamps.now_offset (
bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.asset_column (
str) – Name of the column containing asset names.timestamp_column (
str) – Name of the timestamp column.
- Return type:
None
- final class kelvin.testing.DataFrameSource(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Bases:
TabularSourceData source that replays messages from a pandas DataFrame.
The DataFrame should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.
- Usage:
- source = DataFrameSource(df, playback=True, now_offset=True)
.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))
# Select specific columns source = DataFrameSource(df).with_columns(“temperature”, “pressure”)
# Map columns to datastream names source = DataFrameSource(df).with_column_mapping({
“temp_c”: “temperature”, “pressure_psi”: “pressure”,
})
- Parameters:
- __init__(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Initialize the DataFrame source.
- Parameters:
df (
DataFrame) – Pandas DataFrame containing the data.playback (
bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in DataFrame.ignore_timestamps (
bool) – When True, use virtual clock time instead of DataFrame timestamps.now_offset (
bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.asset_column (
str) – Name of the column containing asset names.timestamp_column (
str) – Name of the timestamp column.
- Return type:
None
- final class kelvin.testing.RandomSource(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]¶
Bases:
DataSourceData source that generates random values for fuzz testing.
Generates messages with random numeric values within specified bounds, optionally across multiple datastreams.
- Usage:
- source = RandomSource(
datastreams=[“temperature”, “pressure”], min_value=0, max_value=100,
).with_asset(“sensor-001”)
- Parameters:
- __init__(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]¶
Initialize the random source.
- Parameters:
datastreams (
Optional[list[str]]) – List of datastream names. If None, uses [“value”].min_value (
float) – Minimum random value (for numeric types).max_value (
float) – Maximum random value (for numeric types).seed (
Optional[int]) – Optional random seed for reproducibility.count (
Optional[int]) – Optional number of messages to generate. If None, generates indefinitely.value_type (
str) – Type of values to generate (“number”, “string”, “boolean”).
- Return type:
None
- async generate(clock)[source]¶
Generate random messages.
- Parameters:
clock (
VirtualClock) – VirtualClock for time control.- Yields:
Messages with random values.
- Return type:
AsyncGenerator[Message, None]
- with_datastreams(datastreams)[source]¶
Set the datastreams to generate for.
- Parameters:
- Return type:
- Returns:
Self for chaining.
- with_range(min_value, max_value)[source]¶
Set the value range for numeric generation.
- Parameters:
- Return type:
- Returns:
Self for chaining.
- final class kelvin.testing.SyntheticSource(pattern, datastream, sample_rate=None, duration=None)[source]¶
Bases:
DataSourceData source that generates synthetic values from wave patterns.
Generates messages at regular intervals with values computed from a wave pattern function.
- Usage:
- source = SyntheticSource(
pattern=SineWave(amplitude=10, period=timedelta(minutes=1)), datastream=”temperature”,
).with_asset(“sensor-001”)
- Parameters:
pattern (WavePattern)
datastream (str)
sample_rate (Optional[timedelta])
duration (Optional[timedelta])
- __init__(pattern, datastream, sample_rate=None, duration=None)[source]¶
Initialize the synthetic source.
- async generate(clock)[source]¶
Generate messages from the wave pattern.
- Parameters:
clock (
VirtualClock) – VirtualClock for time control.- Yields:
Number messages with values from the pattern.
- Return type:
AsyncGenerator[Message, None]
- final class kelvin.testing.SineWave(amplitude=1.0, period=None, offset=0.0, phase=0.0)[source]¶
Bases:
WavePatternSine wave pattern.
Generates values following a sinusoidal function: value = offset + amplitude * sin(2*pi*t/period + phase)
+amp | ***** | ** ** 0 |** *** ** | ** ** -amp | ***** +-----------------------------> t 0 T/4 T/2 3T/4 T
- final class kelvin.testing.SquareWave(amplitude=1.0, period=None, offset=0.0, duty_cycle=0.5)[source]¶
Bases:
WavePatternSquare wave pattern.
Generates values that alternate between high and low: - First half of period: offset + amplitude - Second half of period: offset - amplitude
+amp ┌─────────┐ ┌─────────┐ │ │ │ │ -amp ┘ └───────────┘ └─── t --> |← duty →| |←──── period ────→|
- final class kelvin.testing.RampWave(min_value=0.0, max_value=1.0, period=None)[source]¶
Bases:
WavePatternRamp (sawtooth) wave pattern.
Generates values that linearly increase from min to max, then reset to min.
max | /| /| /| | / | / | / | | / | / | / | min | / | / | / | +-------------------------> t |period|
- final class kelvin.testing.NoiseWave(base, std_dev=1.0, seed=None)[source]¶
Bases:
WavePatternNoise wave that adds gaussian noise to another pattern.
Wraps a base pattern and adds random noise to its values.
| * | * * * * base |*───────*──*──────── | * * | * +-------------------------> t- Parameters:
base (WavePattern)
std_dev (float)
seed (Optional[int])
- final class kelvin.testing.ConstantWave(value=0.0)[source]¶
Bases:
WavePatternConstant value pattern.
Always returns the same value, useful for baseline testing.
| value|────────────────────── | +-------------------------> t- Parameters:
value (float)
KelvinAppTest¶
KelvinAppTest harness for testing KelvinApp instances.
This module provides a test harness that wraps a KelvinApp with an in-memory stream and virtual clock for deterministic testing without network dependencies.
Example
>>> app = KelvinApp()
>>> manifest = ManifestBuilder().add_asset("test").build()
>>> async with KelvinAppTest(app, manifest) as harness:
... await harness.publish(Number(resource=..., payload=5.0))
... await harness.run_until_idle()
... assert len(harness.outputs) == 1
- final class kelvin.testing.app_test.KelvinAppTest(app, manifest, clock=None)[source]¶
Bases:
objectTest harness for KelvinApp instances.
Wraps a KelvinApp with an in-memory InMemoryStream and VirtualClock for deterministic testing without network dependencies. Supports reuse after disconnect - state is cleared on each connect.
- Parameters:
app (KelvinApp)
manifest (RuntimeManifest)
clock (Optional[VirtualClock])
- app¶
The underlying KelvinApp instance.
- manifest¶
The RuntimeManifest injected on connect.
- clock¶
The VirtualClock for time control.
- is_connected¶
Whether the harness is currently connected.
- inputs¶
Copy of all injected input messages.
- outputs¶
Copy of all captured output messages.
Example
>>> async with KelvinAppTest(app, manifest) as harness: ... await harness.publish(msg) ... await harness.run_until_idle() ... assert len(harness.outputs) == 1
- __init__(app, manifest, clock=None)[source]¶
Initialize the test harness.
- Parameters:
app (
KelvinApp) – The KelvinApp instance to test. Must be disconnected.manifest (
RuntimeManifest) – RuntimeManifest to inject when connecting.clock (
Optional[VirtualClock]) – Optional VirtualClock. If None, creates one with manual time control.
- Raises:
RuntimeError – If the app is already connected or has a running task.
ValueError – If the app is missing required internal attributes.
- Return type:
None
Example
>>> harness = KelvinAppTest(app, manifest) >>> harness = KelvinAppTest(app, manifest, clock=VirtualClock())
- __repr__()[source]¶
Return a string representation for debugging.
- Return type:
- Returns:
String showing connection state and message counts.
- add_source(source)[source]¶
Add a data source for input generation.
Sources are started when connect() is called and stopped on disconnect(). Must be called before connect().
- Parameters:
source (
DataSource) – DataSource that generates messages to inject.- Return type:
Self- Returns:
Self for method chaining.
- Raises:
RuntimeError – If called after connect().
Example
>>> harness.add_source(source1).add_source(source2)
- async publish(msg)[source]¶
Inject a message into the input queue and record it.
The message is added to the inputs list and injected into the in-memory stream for processing by the app.
- Parameters:
msg (
Union[Message,MessageBuilder[Any]]) – Message or MessageBuilder to inject as input.- Raises:
RuntimeError – If called before connect().
- Return type:
Example
>>> await harness.publish(Number(resource=krn, payload=5.0))
- async publish_batch(messages)[source]¶
Inject multiple messages sequentially.
- Parameters:
messages (
Iterable[Union[Message,MessageBuilder[Any]]]) – Iterable of messages or message builders to inject.- Raises:
RuntimeError – If called before connect().
- Return type:
Example
>>> await harness.publish_batch([msg1, msg2, msg3])
- property app: KelvinApp¶
Get the underlying KelvinApp instance.
- Returns:
The KelvinApp being tested.
- property manifest: RuntimeManifest¶
Get the runtime manifest.
- Returns:
The RuntimeManifest injected on connect.
- property clock: VirtualClock¶
Get the virtual clock.
- Returns:
The VirtualClock used for time control.
- property is_connected: bool¶
Check if the test harness is connected.
- Returns:
True if connected, False otherwise.
- property inputs: list[Message]¶
Get a copy of all injected inputs.
- Returns:
Copy of the inputs list. Modifications don’t affect internal state.
- property outputs: list[Message]¶
Get a copy of all captured outputs.
Drains any pending outputs from the output queue before returning. Each access may return additional outputs if the app has published more.
- Returns:
Copy of all captured output messages.
- async advance_time(seconds)[source]¶
Advance virtual time and yield to the event loop.
- Parameters:
seconds (
float) – Number of seconds to advance. Must be >= 0.- Raises:
ValueError – If seconds is negative.
- Return type:
Example
>>> await harness.advance_time(10.0) # Advance 10 seconds
- async run_until_idle(timeout=5.0)[source]¶
Run until queues are drained and processing completes.
Advances virtual time to process pending sleeps, then waits for all queues to be fully processed using join().
- Parameters:
timeout (
float) – Maximum virtual time to advance in seconds. Must be positive. Default is 5.0 seconds.- Raises:
ValueError – If timeout is not positive.
- Return type:
Example
>>> await harness.run_until_idle() >>> await harness.run_until_idle(timeout=10.0)
- async connect()[source]¶
Connect the app and start data sources.
Clears inputs/outputs for a fresh start. Can be called again after disconnect() to reuse the harness.
- Return type:
Self- Returns:
Self for method chaining.
- Raises:
RuntimeError – If already connected.
Example
>>> await harness.connect() >>> # ... run tests ... >>> await harness.disconnect()
- async disconnect()[source]¶
Disconnect the app and stop data sources.
Safe to call multiple times or when not connected (no-op).
Example
>>> await harness.disconnect()
- Return type:
- async __aenter__()[source]¶
Enter the async context and connect.
- Return type:
Self- Returns:
Self for use in the context.
Example
>>> async with KelvinAppTest(app, manifest) as harness: ... await harness.publish(msg)
- async __aexit__(_exc_type=None, _exc_value=None, _traceback=None)[source]¶
Exit the async context and disconnect.
Always disconnects, even if an exception occurred.
- Return type:
- Parameters:
_exc_type (type[BaseException] | None)
_exc_value (BaseException | None)
_traceback (TracebackType | None)
ManifestBuilder¶
ManifestBuilder for creating test RuntimeManifests.
- class kelvin.testing.manifest.ManifestBuilder[source]¶
Bases:
objectFluent builder for creating RuntimeManifest instances for testing.
Provides a chainable API for constructing RuntimeManifest objects with datastreams, assets, and configuration for use in testing scenarios.
Examples
>>> manifest = (ManifestBuilder() ... .add_datastream("temperature", "number", WayEnum.input) ... .add_datastream("alert", "boolean", WayEnum.output) ... .add_asset("pump-001", properties={"location": "A1"}) ... .set_configuration({"threshold": 100}) ... .build())
- classmethod from_app_yaml(path=None)[source]¶
Load base configuration from app.yaml.
Supports both legacy format (inputs/outputs at top level) and modern format (spec_version 5.0.0 with data_streams section).
- Parameters:
- Return type:
- Returns:
ManifestBuilder populated from app.yaml.
- Raises:
FileNotFoundError – If the specified path does not exist.
yaml.YAMLError – If the YAML file is malformed.
- classmethod from_dict(config)[source]¶
Create a ManifestBuilder from a dictionary.
- add_asset(name, properties=None, parameters=None)[source]¶
Add an asset to the manifest.
Note: If an asset with the same name already exists, it will be overwritten.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
- add_datastream(name, data_type='number', way=WayEnum.input, unit=None, configuration=None)[source]¶
Add a datastream definition to the manifest.
Note: If a datastream with the same name already exists, it will be overwritten.
- Parameters:
name (
str) – Datastream name (cannot be None, empty, or whitespace-only).data_type (
str) – Data type (number, string, boolean, object). Defaults to “number”.way (
WayEnum) – Input/output direction (input, output, input_cc, output_cc, input_cc_output, input_output_cc). Defaults to WayEnum.input.unit (
str|None) – Optional unit name (e.g., “celsius”, “meters”).configuration (
dict[str,Any] |None) – Optional per-datastream configuration dict.
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_input(name, data_type='number', unit=None, configuration=None)[source]¶
Add an input datastream.
Convenience method that calls add_datastream with WayEnum.input.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_output(name, data_type='number', unit=None, configuration=None)[source]¶
Add an output datastream.
Convenience method that calls add_datastream with WayEnum.output.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_control_change_input(name, data_type='number', unit=None, configuration=None)[source]¶
Add a control change input datastream.
Convenience method that calls add_datastream with WayEnum.input_cc.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_control_change_output(name, data_type='number', unit=None, configuration=None)[source]¶
Add a control change output datastream.
Convenience method that calls add_datastream with WayEnum.output_cc.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_input_cc_output(name, data_type='number', unit=None, configuration=None)[source]¶
Add a datastream that is both input control change and output.
Convenience method that calls add_datastream with WayEnum.input_cc_output. Use for owned datastreams with read-write access.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_input_output_cc(name, data_type='number', unit=None, configuration=None)[source]¶
Add a datastream that is both input and output control change.
Convenience method that calls add_datastream with WayEnum.input_output_cc. Use for remote datastreams with read-write access.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If name is None, empty, or whitespace-only.
ValueError – If data_type is not one of: boolean, number, object, string.
- add_custom_action(action_type, way=CAWayEnum.output_ca)[source]¶
Add a custom action to the manifest.
Note: If a custom action with the same type already exists, it will be overwritten.
- Parameters:
action_type (
str) – Custom action type identifier (cannot be None, empty, or whitespace-only).way (
CAWayEnum) – Direction of the custom action (input_ca or output_ca). Defaults to output_ca.
- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If action_type is None, empty, or whitespace-only.
- add_custom_action_input(action_type)[source]¶
Add an input custom action.
Convenience method that calls add_custom_action with CAWayEnum.input_ca.
- Parameters:
action_type (
str) – Custom action type identifier (cannot be None, empty, or whitespace-only).- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If action_type is None, empty, or whitespace-only.
- add_custom_action_output(action_type)[source]¶
Add an output custom action.
Convenience method that calls add_custom_action with CAWayEnum.output_ca.
- Parameters:
action_type (
str) – Custom action type identifier (cannot be None, empty, or whitespace-only).- Return type:
Self- Returns:
Self for chaining.
- Raises:
ValueError – If action_type is None, empty, or whitespace-only.
- add_assets(assets)[source]¶
Add multiple assets to the manifest.
Note: If an asset with the same name already exists, it will be overwritten.
- Parameters:
assets (
list[dict[str,Any]]) – List of dicts with ‘name’ (required), ‘properties’, ‘parameters’ keys.- Return type:
Self- Returns:
Self for chaining.
- Raises:
KeyError – If any asset dict is missing the required ‘name’ key.
ValueError – If any asset name is None, empty, or whitespace-only.
- add_datastreams(datastreams)[source]¶
Add multiple datastreams to the manifest.
Note: If a datastream with the same name already exists, it will be overwritten.
- Parameters:
datastreams (
list[dict[str,Any]]) – List of dicts with ‘name’ (required), ‘data_type’, ‘way’, ‘unit’, ‘configuration’ keys.- Return type:
Self- Returns:
Self for chaining.
- Raises:
KeyError – If any datastream dict is missing the required ‘name’ key.
ValueError – If any datastream name is None, empty, or whitespace-only.
ValueError – If ‘way’ is an invalid string or ‘data_type’ is invalid.
- add_custom_actions(actions)[source]¶
Add multiple custom actions to the manifest.
Note: If a custom action with the same type already exists, it will be overwritten.
- Parameters:
actions (
list[dict[str,Any]]) – List of dicts with ‘type’ (required) and ‘way’ (optional) keys. The ‘way’ can be a CAWayEnum or string (‘input-ca’ or ‘output-ca’).- Return type:
Self- Returns:
Self for chaining.
- Raises:
KeyError – If any action dict is missing the required ‘type’ key.
ValueError – If any action type is None, empty, or whitespace-only.
ValueError – If ‘way’ is an invalid string.
- set_configuration(config)[source]¶
Set the app configuration.
Note: This replaces any existing configuration entirely.
- clear()[source]¶
Clear all builder state to allow reuse.
Resets assets, datastreams, custom actions, and configuration to empty state. Useful for building multiple manifests with the same builder instance.
- Return type:
Self- Returns:
Self for chaining.
InMemoryStream¶
InMemoryStream implementation for in-memory message passing.
- final class kelvin.testing.in_memory_stream.InMemoryStream(manifest)[source]¶
Bases:
StreamInterfaceIn-memory stream implementation for testing.
Replaces KelvinStream with queues for deterministic testing. Messages can be injected via inject() and outputs collected via drain_outputs().
- Parameters:
manifest (RuntimeManifest)
- __init__(manifest)[source]¶
Initialize the test stream.
- Parameters:
manifest (
RuntimeManifest) – RuntimeManifest to inject on connect.- Return type:
None
- async connect()[source]¶
Connect and inject the manifest as the first message.
- Raises:
ConnectionError – If the stream is already connected.
- Return type:
- async read()[source]¶
Read the next message from the input queue.
Blocks until a message is available. Calls
task_done()immediately after dequeuing so thatwait_for_inputs()can track dequeue progress. Note that this signals the message has been dequeued, not that the handler has finished processing it.- Raises:
ConnectionError – If the stream is not connected.
- Return type:
- async write(msg)[source]¶
Write a message to the output queue.
- Parameters:
msg (
Message) – Message to write.- Return type:
- Returns:
True on success.
- Raises:
ConnectionError – If the stream is not connected.
- async inject(msg)[source]¶
Inject a message into the input queue (for test harness).
- Parameters:
msg (
Message) – Message to inject as input.- Raises:
ConnectionError – If the stream is not connected.
- Return type:
- async wait_for_inputs()[source]¶
Wait for all input messages to be dequeued.
Blocks until every message in the input queue has been read (
task_donecalled). This signals that messages have been dequeued, not that handlers have finished processing them. UseKelvinAppTest.run_until_idle()for full synchronization including handler completion.- Return type:
Data Sources¶
Data sources for test input generation.
- class kelvin.testing.sources.DataSource[source]¶
Bases:
ABCAbstract base class for test data sources.
Data sources generate messages that are fed into the test harness as simulated inputs. They can be time-based (CSV replay, synthetic waveforms) or event-based (random generation).
- Usage:
source = CSVSource(“data.csv”).with_asset(“pump-001”) test.add_source(source)
- abstractmethod async generate(clock)[source]¶
Generate messages.
This is an async generator that yields messages at appropriate times. The implementation should use clock.sleep() to wait between messages for deterministic time-based generation.
- Parameters:
clock (
VirtualClock) – VirtualClock for time control.- Yields:
Message instances to inject as inputs.
- Return type:
AsyncGenerator[Message, None]
- with_asset(asset)[source]¶
Set the asset name for generated messages.
- Parameters:
asset (
str) – Asset name to use.- Return type:
Self- Returns:
Self for chaining.
- class kelvin.testing.sources.TabularSource(playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Bases:
DataSource,ABCBase class for tabular data sources (CSV, DataFrame).
This class provides shared functionality for data sources that work with tabular data (rows and columns), including: - Timestamp parsing and offset management - Column selection and mapping - Message creation from typed values - Asset resolution from columns or fallback - Unified generate() implementation
Subclasses must implement: - _iter_rows(): Yield rows as dictionaries - _get_columns(): Return list of column names - _has_timestamp_column(): Check if timestamp column exists
- Parameters:
- __init__(playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Initialize the tabular source.
- Parameters:
playback (
bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in data.ignore_timestamps (
bool) – When True, use virtual clock time instead of data timestamps.now_offset (
bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.asset_column (
str) – Name of the column containing asset names.timestamp_column (
str) – Name of the timestamp column.
- Return type:
None
- with_columns(*columns)[source]¶
Select specific columns as inputs.
Only the selected columns will be processed as datastreams. Can be called with varargs or a single list. Empty strings are filtered out.
- Parameters:
- Return type:
Self- Returns:
Self for chaining.
Examples
source.with_columns(“temperature”, “pressure”) source.with_columns([“temperature”, “pressure”])
- with_column_mapping(mapping)[source]¶
Map column names to datastream names.
Allows columns with arbitrary names to map to specific datastreams.
- Parameters:
mapping (
dict[str,str]) – Dictionary mapping column names to datastream names.- Return type:
Self- Returns:
Self for chaining.
Example
- source.with_column_mapping({
“temp_c”: “temperature”, “pressure_psi”: “pressure”,
})
- async generate(clock)[source]¶
Generate messages from tabular data.
Iterates through rows and yields messages at appropriate timestamps, using the virtual clock for time control. Processes active columns and applies column mapping.
- Parameters:
clock (
VirtualClock) – VirtualClock for time control.- Yields:
Message instances from tabular data.
- Return type:
AsyncGenerator[Message, None]
- final class kelvin.testing.sources.CSVSource(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Bases:
TabularSourceData source that replays messages from a CSV file.
The CSV file should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.
- Usage:
- source = CSVSource(“data.csv”, playback=True, now_offset=True)
.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))
# Select specific columns source = CSVSource(“data.csv”).with_columns(“temperature”, “pressure”)
# Map columns to datastream names source = CSVSource(“data.csv”).with_column_mapping({
“temp_c”: “temperature”, “pressure_psi”: “pressure”,
})
- Parameters:
- __init__(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Initialize the CSV source.
- Parameters:
playback (
bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in CSV.ignore_timestamps (
bool) – When True, use virtual clock time instead of CSV timestamps.now_offset (
bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.asset_column (
str) – Name of the column containing asset names.timestamp_column (
str) – Name of the timestamp column.
- Return type:
None
- final class kelvin.testing.sources.DataFrameSource(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Bases:
TabularSourceData source that replays messages from a pandas DataFrame.
The DataFrame should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.
- Usage:
- source = DataFrameSource(df, playback=True, now_offset=True)
.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))
# Select specific columns source = DataFrameSource(df).with_columns(“temperature”, “pressure”)
# Map columns to datastream names source = DataFrameSource(df).with_column_mapping({
“temp_c”: “temperature”, “pressure_psi”: “pressure”,
})
- Parameters:
- __init__(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Initialize the DataFrame source.
- Parameters:
df (
DataFrame) – Pandas DataFrame containing the data.playback (
bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in DataFrame.ignore_timestamps (
bool) – When True, use virtual clock time instead of DataFrame timestamps.now_offset (
bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.asset_column (
str) – Name of the column containing asset names.timestamp_column (
str) – Name of the timestamp column.
- Return type:
None
- final class kelvin.testing.sources.RandomSource(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]¶
Bases:
DataSourceData source that generates random values for fuzz testing.
Generates messages with random numeric values within specified bounds, optionally across multiple datastreams.
- Usage:
- source = RandomSource(
datastreams=[“temperature”, “pressure”], min_value=0, max_value=100,
).with_asset(“sensor-001”)
- Parameters:
- __init__(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]¶
Initialize the random source.
- Parameters:
datastreams (
Optional[list[str]]) – List of datastream names. If None, uses [“value”].min_value (
float) – Minimum random value (for numeric types).max_value (
float) – Maximum random value (for numeric types).seed (
Optional[int]) – Optional random seed for reproducibility.count (
Optional[int]) – Optional number of messages to generate. If None, generates indefinitely.value_type (
str) – Type of values to generate (“number”, “string”, “boolean”).
- Return type:
None
- async generate(clock)[source]¶
Generate random messages.
- Parameters:
clock (
VirtualClock) – VirtualClock for time control.- Yields:
Messages with random values.
- Return type:
AsyncGenerator[Message, None]
- with_datastreams(datastreams)[source]¶
Set the datastreams to generate for.
- Parameters:
- Return type:
- Returns:
Self for chaining.
- with_range(min_value, max_value)[source]¶
Set the value range for numeric generation.
- Parameters:
- Return type:
- Returns:
Self for chaining.
- final class kelvin.testing.sources.SyntheticSource(pattern, datastream, sample_rate=None, duration=None)[source]¶
Bases:
DataSourceData source that generates synthetic values from wave patterns.
Generates messages at regular intervals with values computed from a wave pattern function.
- Usage:
- source = SyntheticSource(
pattern=SineWave(amplitude=10, period=timedelta(minutes=1)), datastream=”temperature”,
).with_asset(“sensor-001”)
- Parameters:
pattern (WavePattern)
datastream (str)
sample_rate (Optional[timedelta])
duration (Optional[timedelta])
- __init__(pattern, datastream, sample_rate=None, duration=None)[source]¶
Initialize the synthetic source.
- async generate(clock)[source]¶
Generate messages from the wave pattern.
- Parameters:
clock (
VirtualClock) – VirtualClock for time control.- Yields:
Number messages with values from the pattern.
- Return type:
AsyncGenerator[Message, None]
Base class for data sources.
- class kelvin.testing.sources.base.DataSource[source]¶
Bases:
ABCAbstract base class for test data sources.
Data sources generate messages that are fed into the test harness as simulated inputs. They can be time-based (CSV replay, synthetic waveforms) or event-based (random generation).
- Usage:
source = CSVSource(“data.csv”).with_asset(“pump-001”) test.add_source(source)
- abstractmethod async generate(clock)[source]¶
Generate messages.
This is an async generator that yields messages at appropriate times. The implementation should use clock.sleep() to wait between messages for deterministic time-based generation.
- Parameters:
clock (
VirtualClock) – VirtualClock for time control.- Yields:
Message instances to inject as inputs.
- Return type:
AsyncGenerator[Message, None]
- with_asset(asset)[source]¶
Set the asset name for generated messages.
- Parameters:
asset (
str) – Asset name to use.- Return type:
Self- Returns:
Self for chaining.
CSV data source for replaying recorded data.
- final class kelvin.testing.sources.csv_source.CSVSource(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Bases:
TabularSourceData source that replays messages from a CSV file.
The CSV file should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.
- Usage:
- source = CSVSource(“data.csv”, playback=True, now_offset=True)
.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))
# Select specific columns source = CSVSource(“data.csv”).with_columns(“temperature”, “pressure”)
# Map columns to datastream names source = CSVSource(“data.csv”).with_column_mapping({
“temp_c”: “temperature”, “pressure_psi”: “pressure”,
})
- Parameters:
- __init__(path, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Initialize the CSV source.
- Parameters:
playback (
bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in CSV.ignore_timestamps (
bool) – When True, use virtual clock time instead of CSV timestamps.now_offset (
bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.asset_column (
str) – Name of the column containing asset names.timestamp_column (
str) – Name of the timestamp column.
- Return type:
None
DataFrame data source for replaying recorded data.
- final class kelvin.testing.sources.dataframe_source.DataFrameSource(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Bases:
TabularSourceData source that replays messages from a pandas DataFrame.
The DataFrame should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected.
- Usage:
- source = DataFrameSource(df, playback=True, now_offset=True)
.with_asset(“pump-001”) # fallback if no asset_name column .with_timing(timedelta(seconds=1))
# Select specific columns source = DataFrameSource(df).with_columns(“temperature”, “pressure”)
# Map columns to datastream names source = DataFrameSource(df).with_column_mapping({
“temp_c”: “temperature”, “pressure_psi”: “pressure”,
})
- Parameters:
- __init__(df, playback=False, ignore_timestamps=False, now_offset=False, asset_column='asset_name', timestamp_column='timestamp')[source]¶
Initialize the DataFrame source.
- Parameters:
df (
DataFrame) – Pandas DataFrame containing the data.playback (
bool) – When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in DataFrame.ignore_timestamps (
bool) – When True, use virtual clock time instead of DataFrame timestamps.now_offset (
bool) – When True, offset all timestamps so the first row starts at the virtual clock’s current time.asset_column (
str) – Name of the column containing asset names.timestamp_column (
str) – Name of the timestamp column.
- Return type:
None
Random data source for fuzz testing.
- final class kelvin.testing.sources.random_source.RandomSource(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]¶
Bases:
DataSourceData source that generates random values for fuzz testing.
Generates messages with random numeric values within specified bounds, optionally across multiple datastreams.
- Usage:
- source = RandomSource(
datastreams=[“temperature”, “pressure”], min_value=0, max_value=100,
).with_asset(“sensor-001”)
- Parameters:
- __init__(datastreams=None, min_value=0.0, max_value=100.0, seed=None, count=None, value_type='number')[source]¶
Initialize the random source.
- Parameters:
datastreams (
Optional[list[str]]) – List of datastream names. If None, uses [“value”].min_value (
float) – Minimum random value (for numeric types).max_value (
float) – Maximum random value (for numeric types).seed (
Optional[int]) – Optional random seed for reproducibility.count (
Optional[int]) – Optional number of messages to generate. If None, generates indefinitely.value_type (
str) – Type of values to generate (“number”, “string”, “boolean”).
- Return type:
None
- async generate(clock)[source]¶
Generate random messages.
- Parameters:
clock (
VirtualClock) – VirtualClock for time control.- Yields:
Messages with random values.
- Return type:
AsyncGenerator[Message, None]
- with_datastreams(datastreams)[source]¶
Set the datastreams to generate for.
- Parameters:
- Return type:
- Returns:
Self for chaining.
- with_range(min_value, max_value)[source]¶
Set the value range for numeric generation.
- Parameters:
- Return type:
- Returns:
Self for chaining.
Synthetic data source with wave pattern generation.
- class kelvin.testing.sources.synthetic.WavePattern[source]¶
Bases:
ABCAbstract base class for wave patterns.
Wave patterns define mathematical functions that generate values based on time. They can be combined and modified to create complex test signals.
- final class kelvin.testing.sources.synthetic.SineWave(amplitude=1.0, period=None, offset=0.0, phase=0.0)[source]¶
Bases:
WavePatternSine wave pattern.
Generates values following a sinusoidal function: value = offset + amplitude * sin(2*pi*t/period + phase)
+amp | ***** | ** ** 0 |** *** ** | ** ** -amp | ***** +-----------------------------> t 0 T/4 T/2 3T/4 T
- final class kelvin.testing.sources.synthetic.SquareWave(amplitude=1.0, period=None, offset=0.0, duty_cycle=0.5)[source]¶
Bases:
WavePatternSquare wave pattern.
Generates values that alternate between high and low: - First half of period: offset + amplitude - Second half of period: offset - amplitude
+amp ┌─────────┐ ┌─────────┐ │ │ │ │ -amp ┘ └───────────┘ └─── t --> |← duty →| |←──── period ────→|
- final class kelvin.testing.sources.synthetic.RampWave(min_value=0.0, max_value=1.0, period=None)[source]¶
Bases:
WavePatternRamp (sawtooth) wave pattern.
Generates values that linearly increase from min to max, then reset to min.
max | /| /| /| | / | / | / | | / | / | / | min | / | / | / | +-------------------------> t |period|
- final class kelvin.testing.sources.synthetic.NoiseWave(base, std_dev=1.0, seed=None)[source]¶
Bases:
WavePatternNoise wave that adds gaussian noise to another pattern.
Wraps a base pattern and adds random noise to its values.
| * | * * * * base |*───────*──*──────── | * * | * +-------------------------> t- Parameters:
base (WavePattern)
std_dev (float)
seed (Optional[int])
- final class kelvin.testing.sources.synthetic.ConstantWave(value=0.0)[source]¶
Bases:
WavePatternConstant value pattern.
Always returns the same value, useful for baseline testing.
| value|────────────────────── | +-------------------------> t- Parameters:
value (float)
- final class kelvin.testing.sources.synthetic.SyntheticSource(pattern, datastream, sample_rate=None, duration=None)[source]¶
Bases:
DataSourceData source that generates synthetic values from wave patterns.
Generates messages at regular intervals with values computed from a wave pattern function.
- Usage:
- source = SyntheticSource(
pattern=SineWave(amplitude=10, period=timedelta(minutes=1)), datastream=”temperature”,
).with_asset(“sensor-001”)
- Parameters:
pattern (WavePattern)
datastream (str)
sample_rate (Optional[timedelta])
duration (Optional[timedelta])
- __init__(pattern, datastream, sample_rate=None, duration=None)[source]¶
Initialize the synthetic source.
- async generate(clock)[source]¶
Generate messages from the wave pattern.
- Parameters:
clock (
VirtualClock) – VirtualClock for time control.- Yields:
Number messages with values from the pattern.
- Return type:
AsyncGenerator[Message, None]