Testing ======= The ``kelvin.testing`` module provides a deterministic, in-memory test harness for ``KelvinApp`` instances. Tests run without network dependencies, use a virtual clock for time control, and support flexible data injection through messages and data sources. Quick Start ----------- Add a ``pytest.ini`` alongside your tests: .. code-block:: ini [pytest] pythonpath = . Write tests against the app singleton from your ``main.py``: .. code-block:: python import pytest from main import app from kelvin.krn import KRNAssetDataStream from kelvin.message import Number from kelvin.testing import KelvinAppTest, ManifestBuilder def _build_manifest(): return ( ManifestBuilder() .add_input("temperature") .add_output("alert") .add_asset("pump-001") ) class TestMyApp: @pytest.mark.asyncio async def test_processes_input(self): harness = KelvinAppTest(app, manifest=_build_manifest().build()) async with harness: await harness.publish( Number( resource=KRNAssetDataStream("pump-001", "temperature"), payload=42.0, ) ) await harness.run_until_idle() outputs = harness.outputs assert len(outputs) > 0 The pattern is always: 1. Build a manifest describing assets and datastreams 2. Create a ``KelvinAppTest`` wrapping your app 3. Use ``async with`` to connect/disconnect 4. Publish inputs and/or advance time 5. Call ``run_until_idle()`` to let the app process everything 6. Assert on ``harness.outputs`` Project Layout -------------- A typical test setup alongside your app: .. code-block:: text my-app/ main.py # KelvinApp definition app.yaml # app configuration pytest.ini # [pytest] pythonpath = . tests/ test_main.py # tests data/ sample.csv # test data (optional) ManifestBuilder --------------- ``ManifestBuilder`` creates a ``RuntimeManifest`` that describes the test environment: which assets exist, which datastreams are available, and what configuration the app receives. Assets ^^^^^^ .. code-block:: python ManifestBuilder() .add_asset("pump-001") .add_asset("pump-002", parameters={"temperature-limit": 50}) .add_asset("pump-003", properties={"location": "building-a"}) Datastreams ^^^^^^^^^^^ Convenience methods for common datastream directions, plus a general-purpose method: .. code-block:: python from kelvin.message.runtime_manifest import WayEnum ManifestBuilder() # Convenience methods .add_input("temperature") # input .add_input("temperature", "number") # input with explicit type .add_output("alert", "string") # output .add_control_change_input("setpoint") # input_cc .add_control_change_output("target-speed") # output_cc # General-purpose (for bidirectional streams) .add_datastream("output-number", "number", WayEnum.input_cc_output) .add_datastream("sensor", "number", WayEnum.input_output_cc) Supported data types: ``"number"``, ``"string"``, ``"boolean"``, ``"object"``. Custom Actions ^^^^^^^^^^^^^^ .. code-block:: python ManifestBuilder() .add_custom_action_input("example-in") .add_custom_action_output("example-out") Configuration ^^^^^^^^^^^^^ .. code-block:: python ManifestBuilder() .set_configuration({"min": 0, "max": 100, "random": True}) Loading from app.yaml ^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python from pathlib import Path manifest = ManifestBuilder.from_app_yaml(Path("app.yaml")).add_asset("pump-001").build() Complete Example ^^^^^^^^^^^^^^^^ .. code-block:: python def _build_manifest(): return ( ManifestBuilder() .add_input("input-temperature") .add_control_change_output("out-temperature-setpoint") .add_asset("pump-001", parameters={"temperature-limit": 50, "auto-accept": False}) ) KelvinAppTest ------------- Lifecycle ^^^^^^^^^ The harness wraps your app singleton and manages connection/disconnection: .. code-block:: python harness = KelvinAppTest(app, manifest=_build_manifest().build()) # Option 1: async context manager (recommended) async with harness: # app is connected here ... # app is disconnected after the block # Option 2: manual control await harness.connect() # ... await harness.disconnect() The harness clears all state on each ``connect()``, so a single instance can be reused across tests. However, creating a fresh harness per test is the typical pattern. Publishing Messages ^^^^^^^^^^^^^^^^^^^ Inject messages into the app as if they arrived from the platform: .. code-block:: python from kelvin.krn import KRNAssetDataStream, KRNAsset from kelvin.message import Number, CustomAction # Single message await harness.publish( Number(resource=KRNAssetDataStream("pump-001", "temperature"), payload=42.0) ) # Batch publish await harness.publish_batch([ Number(resource=KRNAssetDataStream("pump-001", "temperature"), payload=42.0), Number(resource=KRNAssetDataStream("pump-001", "temperature"), payload=43.0), ]) # Custom actions await harness.publish( CustomAction( resource=KRNAsset("pump-001"), type="example-in", title="Incoming Action", expiration_date=timedelta(seconds=30), ) ) Synchronization with ``run_until_idle`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ``run_until_idle()`` is the key method for deterministic testing. It: 1. Advances virtual time to fire pending sleeps/timers 2. Waits for all input messages to be consumed 3. Waits for all stream processing to complete 4. Gives sync tasks (running in threads) a chance to finish .. code-block:: python # Default timeout of 5 seconds (virtual time) await harness.run_until_idle() # For timers with longer intervals, set timeout past the interval # e.g., a 10-second timer: await harness.run_until_idle(timeout=11.0) The ``timeout`` parameter controls how far virtual time can advance. Set it to at least the timer interval + 1 second. Reading Outputs ^^^^^^^^^^^^^^^ .. code-block:: python outputs = harness.outputs # list of all Message objects the app published Filter by message type to assert on specific outputs: .. code-block:: python from kelvin.message.base_messages import ControlChangeMsg, RecommendationMsg, CustomActionMsg recs = [m for m in harness.outputs if isinstance(m, RecommendationMsg)] ccs = [m for m in harness.outputs if isinstance(m, ControlChangeMsg)] actions = [m for m in harness.outputs if isinstance(m, CustomActionMsg)] Time Control ^^^^^^^^^^^^ For fine-grained time manipulation: .. code-block:: python # Advance virtual clock by a specific amount await harness.advance_time(5.0) Testing Patterns ---------------- Stream Handlers ^^^^^^^^^^^^^^^ Test that stream handlers receive the correct messages based on their filters: .. code-block:: python @pytest.mark.asyncio async def test_receives_matching_asset(self, capsys): harness = KelvinAppTest(app, manifest=_build_manifest().build()) async with harness: await harness.publish( Number(resource=KRNAssetDataStream("test-asset-1", "input-1"), payload=1.0) ) await harness.run_until_idle() captured = capsys.readouterr().out assert "Received" in captured @pytest.mark.asyncio async def test_ignores_other_assets(self, capsys): harness = KelvinAppTest(app, manifest=_build_manifest().build()) async with harness: await harness.publish( Number(resource=KRNAssetDataStream("test-asset-2", "input-1"), payload=1.0) ) await harness.run_until_idle() captured = capsys.readouterr().out assert "Received" not in captured Timers ^^^^^^ Timers fire automatically when virtual time advances past their interval: .. code-block:: python @pytest.mark.asyncio async def test_timer_fires(self, capsys): harness = KelvinAppTest(app, manifest=ManifestBuilder().build()) async with harness: # Timer interval is 5s - advance past the first firing await harness.run_until_idle(timeout=6.0) captured = capsys.readouterr().out assert "timer fired" in captured Test that the app survives a failing timer: .. code-block:: python @pytest.mark.asyncio async def test_app_survives_failing_timer(self): harness = KelvinAppTest(app, manifest=ManifestBuilder().build()) async with harness: await harness.run_until_idle(timeout=6.0) assert app.is_connected # app still running Windows (Rolling, Tumbling, Hopping) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Feed enough messages to fill the window: .. code-block:: python @pytest.mark.asyncio async def test_window_emits_after_count_size(self, capsys): harness = KelvinAppTest(app, manifest=_build_manifest().build()) async with harness: # count_size=5, so publish 5 messages for i in range(1, 6): await harness.publish( Number( resource=KRNAssetDataStream("pump-001", "motor-temperature"), payload=float(i), ) ) await harness.run_until_idle() captured = capsys.readouterr().out assert "window received" in captured @pytest.mark.asyncio async def test_window_does_not_emit_below_count_size(self, capsys): harness = KelvinAppTest(app, manifest=_build_manifest().build()) async with harness: for i in range(1, 4): # below count_size await harness.publish( Number( resource=KRNAssetDataStream("pump-001", "motor-temperature"), payload=float(i), ) ) await harness.run_until_idle() captured = capsys.readouterr().out assert "window received" not in captured Output Inspection ^^^^^^^^^^^^^^^^^ Assert on published messages by type, payload, and resource: .. code-block:: python @pytest.mark.asyncio async def test_publishes_correct_output_types(self): harness = KelvinAppTest(app, manifest=_build_manifest().build()) async with harness: await harness.run_until_idle(timeout=11.0) outputs = harness.outputs payloads = [msg.payload for msg in outputs] assert any(isinstance(p, bool) for p in payloads) assert any(isinstance(p, (int, float)) for p in payloads) assert any(isinstance(p, str) for p in payloads) @pytest.mark.asyncio async def test_all_messages_target_correct_asset(self): harness = KelvinAppTest(app, manifest=_build_manifest().build()) async with harness: await harness.run_until_idle() for msg in harness.outputs: assert msg.resource.asset == "test-asset-1" Recommendations and Evidence ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python @pytest.mark.asyncio async def test_publishes_recommendation_with_evidence(self): harness = KelvinAppTest(app, manifest=_build_manifest().build()) async with harness: for temp in [55.0, 56.0, 57.0]: await harness.publish( Number( resource=KRNAssetDataStream("pump-001", "input-temperature"), payload=temp, ) ) await harness.run_until_idle() await harness.run_until_idle(timeout=6.0) # fire the timer recs = [o for o in harness.outputs if isinstance(o, RecommendationMsg)] assert len(recs) == 1 rec = recs[0] assert rec.payload.evidences is not None assert rec.payload.evidences[0].type == "line-chart" assert rec.payload.actions.control_changes is not None Custom Actions ^^^^^^^^^^^^^^ .. code-block:: python @pytest.mark.asyncio async def test_receives_and_responds_to_custom_action(self): harness = KelvinAppTest(app, manifest=_build_manifest().build()) async with harness: await harness.publish( CustomAction( resource=KRNAsset("test-asset-1"), type="example-in", title="Incoming Action", expiration_date=timedelta(seconds=30), ) ) await harness.run_until_idle() results = [o for o in harness.outputs if isinstance(o, CustomActionResultMsg)] assert len(results) == 1 assert results[0].payload.success is True Multiple Timer Firings ^^^^^^^^^^^^^^^^^^^^^^ Chain ``run_until_idle()`` calls to fire the timer multiple times: .. code-block:: python @pytest.mark.asyncio async def test_cooldown_prevents_duplicates(self): harness = KelvinAppTest(app, manifest=_build_manifest().build()) async with harness: # ... publish inputs ... await harness.run_until_idle() # Fire timer twice await harness.run_until_idle(timeout=6.0) await harness.run_until_idle(timeout=6.0) recs = [o for o in harness.outputs if isinstance(o, RecommendationMsg)] assert len(recs) == 1 # only one due to cooldown Resetting Module State ^^^^^^^^^^^^^^^^^^^^^^ If your app uses module-level state, reset it between tests: .. code-block:: python import main as main_module def _reset_state(): main_module._counter = 0 main_module._readings.clear() class TestMyApp: @pytest.mark.asyncio async def test_something(self): _reset_state() harness = KelvinAppTest(app, manifest=_build_manifest().build()) # ... Data Sources ------------ Data sources automate input generation. Add them to the harness before connecting. RandomSource ^^^^^^^^^^^^ Generates random values for fuzz testing: .. code-block:: python from kelvin.testing import RandomSource source = ( RandomSource(datastreams=["temperature", "pressure"], min_value=0, max_value=100) .with_asset("pump-001") .with_timing(timedelta(seconds=1)) .with_count(10) ) harness = KelvinAppTest(app, manifest=manifest) harness.add_source(source) async with harness: await harness.run_until_idle() outputs = harness.outputs Options: - ``datastreams``: list of datastream names (default: ``["value"]``) - ``min_value`` / ``max_value``: numeric range - ``seed``: for reproducible results - ``count``: number of messages (``None`` = indefinite) - ``value_type``: ``"number"``, ``"string"``, or ``"boolean"`` CSVSource ^^^^^^^^^ Replay messages from a CSV file: .. code-block:: python from kelvin.testing import CSVSource source = ( CSVSource("data/sample.csv", playback=True, now_offset=True) .with_asset("pump-001") .with_timing(timedelta(seconds=1)) ) # Select specific columns source = CSVSource("data.csv").with_columns("temperature", "pressure") # Map column names to datastream names source = CSVSource("data.csv").with_column_mapping({ "temp_c": "temperature", "pressure_psi": "pressure", }) Options: - ``playback``: wait between rows based on timestamp differences - ``ignore_timestamps``: use virtual clock time instead of data timestamps - ``now_offset``: offset timestamps so first row starts at current virtual time - ``asset_column`` / ``timestamp_column``: column name overrides DataFrameSource ^^^^^^^^^^^^^^^ Same as ``CSVSource`` but from a pandas DataFrame: .. code-block:: python from kelvin.testing import DataFrameSource source = DataFrameSource(df, playback=True, now_offset=True).with_asset("pump-001") SyntheticSource ^^^^^^^^^^^^^^^ Generate values from mathematical wave patterns: .. code-block:: python from datetime import timedelta from kelvin.testing import SyntheticSource, SineWave, SquareWave, RampWave, NoiseWave, ConstantWave # Sine wave source = SyntheticSource( pattern=SineWave(amplitude=10, period=timedelta(minutes=1), offset=20), datastream="temperature", sample_rate=timedelta(seconds=1), duration=timedelta(minutes=5), ).with_asset("pump-001") # Square wave (on/off signal) source = SyntheticSource( pattern=SquareWave(amplitude=1, period=timedelta(seconds=10), duty_cycle=0.5), datastream="valve-state", ).with_asset("pump-001") # Ramp (sawtooth) source = SyntheticSource( pattern=RampWave(min_value=0, max_value=100, period=timedelta(minutes=1)), datastream="pressure", ).with_asset("pump-001") # Noisy sine wave source = SyntheticSource( pattern=NoiseWave(base=SineWave(amplitude=10), std_dev=0.5, seed=42), datastream="temperature", ).with_asset("pump-001") # Constant value source = SyntheticSource( pattern=ConstantWave(value=25.0), datastream="ambient-temp", ).with_asset("pump-001") Combining Sources ^^^^^^^^^^^^^^^^^ Add multiple sources to the same harness: .. code-block:: python harness = KelvinAppTest(app, manifest=manifest) harness.add_source(temperature_source) harness.add_source(pressure_source) async with harness: await harness.run_until_idle() API Reference ------------- .. automodule:: kelvin.testing :members: :undoc-members: :show-inheritance: KelvinAppTest ^^^^^^^^^^^^^ .. automodule:: kelvin.testing.app_test :members: :undoc-members: :show-inheritance: ManifestBuilder ^^^^^^^^^^^^^^^ .. automodule:: kelvin.testing.manifest :members: :undoc-members: :show-inheritance: InMemoryStream ^^^^^^^^^^^^^^ .. automodule:: kelvin.testing.in_memory_stream :members: :undoc-members: :show-inheritance: Data Sources ^^^^^^^^^^^^ .. automodule:: kelvin.testing.sources :members: :undoc-members: :show-inheritance: .. automodule:: kelvin.testing.sources.base :members: :undoc-members: :show-inheritance: .. automodule:: kelvin.testing.sources.csv_source :members: :undoc-members: :show-inheritance: .. automodule:: kelvin.testing.sources.dataframe_source :members: :undoc-members: :show-inheritance: .. automodule:: kelvin.testing.sources.random_source :members: :undoc-members: :show-inheritance: .. automodule:: kelvin.testing.sources.synthetic :members: :undoc-members: :show-inheritance: