"""KelvinAppTest harness for testing KelvinApp instances.
This module provides a test harness that wraps a KelvinApp with an in-memory
stream and virtual clock for deterministic testing without network dependencies.
Example:
>>> app = KelvinApp()
>>> manifest = ManifestBuilder().add_asset("test").build()
>>> async with KelvinAppTest(app, manifest) as harness:
... await harness.publish(Number(resource=..., payload=5.0))
... await harness.run_until_idle()
... assert len(harness.outputs) == 1
"""
from __future__ import annotations
import asyncio
from collections.abc import Iterable
from types import TracebackType
from typing import TYPE_CHECKING, Any, Optional, Union, cast, final
import structlog
from typing_extensions import Self, override
from kelvin.application.clock import VirtualClock
from kelvin.message import Message
from kelvin.message.msg_builders import MessageBuilder
from kelvin.message.runtime_manifest import RuntimeManifest
from kelvin.testing.in_memory_stream import InMemoryStream
if TYPE_CHECKING:
from kelvin.application.client import KelvinApp
from kelvin.testing.sources.base import DataSource
logger = structlog.get_logger(__name__)
# Default timeout for waiting on message handlers (real-time seconds)
_HANDLER_TIMEOUT: float = 5.0
# ====================================================
# KelvinAppTest Harness
# ====================================================
[docs]
@final
class KelvinAppTest:
"""Test harness for KelvinApp instances.
Wraps a KelvinApp with an in-memory InMemoryStream and VirtualClock
for deterministic testing without network dependencies. Supports
reuse after disconnect - state is cleared on each connect.
Attributes:
app: The underlying KelvinApp instance.
manifest: The RuntimeManifest injected on connect.
clock: The VirtualClock for time control.
is_connected: Whether the harness is currently connected.
inputs: Copy of all injected input messages.
outputs: Copy of all captured output messages.
Example:
>>> async with KelvinAppTest(app, manifest) as harness:
... await harness.publish(msg)
... await harness.run_until_idle()
... assert len(harness.outputs) == 1
"""
# ------------------------------------------------
# Initialization
# ------------------------------------------------
[docs]
def __init__(
self,
app: KelvinApp,
manifest: RuntimeManifest,
clock: Optional[VirtualClock] = None,
) -> None:
"""Initialize the test harness.
Parameters:
app: The KelvinApp instance to test. Must be disconnected.
manifest: RuntimeManifest to inject when connecting.
clock: Optional VirtualClock. If None, creates one with
manual time control.
Raises:
RuntimeError: If the app is already connected or has a running task.
ValueError: If the app is missing required internal attributes.
Example:
>>> harness = KelvinAppTest(app, manifest)
>>> harness = KelvinAppTest(app, manifest, clock=VirtualClock())
"""
self._validate_app(app)
self._app = app
self._original_connection = app.connection
self._original_clock = app.clock
self._connected = False
self._clock = clock or VirtualClock()
self._manifest = manifest
self._connection = InMemoryStream(self._manifest)
self._sources: list[DataSource] = []
self._inputs: list[Message] = []
self._outputs: list[Message] = []
self._source_tasks: list[asyncio.Task[None]] = []
[docs]
@override
def __repr__(self) -> str:
"""Return a string representation for debugging.
Returns:
String showing connection state and message counts.
"""
return (
f"KelvinAppTest(connected={self._connected}, "
f"inputs={len(self._inputs)}, outputs={len(self._outputs)}, "
f"pending_inputs={self._connection.input_queue_size()})"
)
def _validate_app(self, app: KelvinApp) -> None:
"""Validate the app is properly initialized for testing.
Parameters:
app: The KelvinApp instance to validate.
Raises:
RuntimeError: If the app is connected or has a running read loop.
ValueError: If the app is missing required internal attributes.
"""
# Edge case: app already connected
if app.is_connected:
raise RuntimeError("cannot initialize: KelvinAppTest requires a disconnected app")
# Verify required internal attributes exist
required_attrs = ("_connection", "_clock", "_read_loop_task", "_filters")
for attr in required_attrs:
if not hasattr(app, attr):
raise ValueError(f"app missing required attribute: {attr}")
# Edge case: app has a running task (partially initialized)
if app._read_loop_task is not None and not app._read_loop_task.done(): # pyright: ignore[reportPrivateUsage]
raise RuntimeError("cannot initialize: KelvinAppTest requires a fully stopped app")
def _restore_app_state(self) -> None:
"""Restore the app's original connection and clock."""
self._app._connection = self._original_connection # pyright: ignore[reportPrivateUsage]
self._app._clock = self._original_clock # pyright: ignore[reportPrivateUsage]
def _replace_app_state(self) -> None:
"""Replace the app's connection and clock with test versions."""
self._app._connection = self._connection # pyright: ignore[reportPrivateUsage]
self._app._clock = self._clock # pyright: ignore[reportPrivateUsage]
# ------------------------------------------------
# Input Injection
# ------------------------------------------------
[docs]
def add_source(self, source: DataSource) -> Self:
"""Add a data source for input generation.
Sources are started when connect() is called and stopped on disconnect().
Must be called before connect().
Parameters:
source: DataSource that generates messages to inject.
Returns:
Self for method chaining.
Raises:
RuntimeError: If called after connect().
Example:
>>> harness.add_source(source1).add_source(source2)
"""
if self._connected:
raise RuntimeError("cannot add sources: KelvinAppTest is already connected")
self._sources.append(source)
return self
[docs]
async def publish(self, msg: Union[Message, MessageBuilder[Any]]) -> None:
"""Inject a message into the input queue and record it.
The message is added to the inputs list and injected into the
in-memory stream for processing by the app.
Parameters:
msg: Message or MessageBuilder to inject as input.
Raises:
RuntimeError: If called before connect().
Example:
>>> await harness.publish(Number(resource=krn, payload=5.0))
"""
# Edge case: not connected
if not self._connected:
raise RuntimeError("cannot publish: KelvinAppTest is not connected")
# Handle MessageBuilder by converting to Message
if isinstance(msg, MessageBuilder):
m = cast(Message, msg.to_message())
else:
m = msg
self._inputs.append(m)
await self._connection.inject(m)
[docs]
async def publish_batch(self, messages: Iterable[Union[Message, MessageBuilder[Any]]]) -> None:
"""Inject multiple messages sequentially.
Parameters:
messages: Iterable of messages or message builders to inject.
Raises:
RuntimeError: If called before connect().
Example:
>>> await harness.publish_batch([msg1, msg2, msg3])
"""
# Edge case: empty iterable is valid, just does nothing
for msg in messages:
await self.publish(msg)
# ------------------------------------------------
# State Inspection
# ------------------------------------------------
@property
def app(self) -> KelvinApp:
"""Get the underlying KelvinApp instance.
Returns:
The KelvinApp being tested.
"""
return self._app
@property
def manifest(self) -> RuntimeManifest:
"""Get the runtime manifest.
Returns:
The RuntimeManifest injected on connect.
"""
return self._manifest
@property
def clock(self) -> VirtualClock:
"""Get the virtual clock.
Returns:
The VirtualClock used for time control.
"""
return self._clock
@property
def is_connected(self) -> bool:
"""Check if the test harness is connected.
Returns:
True if connected, False otherwise.
"""
return self._connected
@property
def inputs(self) -> list[Message]:
"""Get a copy of all injected inputs.
Returns:
Copy of the inputs list. Modifications don't affect internal state.
"""
return self._inputs.copy()
@property
def outputs(self) -> list[Message]:
"""Get a copy of all captured outputs.
Drains any pending outputs from the output queue before returning.
Each access may return additional outputs if the app has published more.
Returns:
Copy of all captured output messages.
"""
# Drain pending outputs from queue into our list
self._outputs.extend(self._connection.drain_outputs())
return self._outputs.copy()
# ------------------------------------------------
# Time Control
# ------------------------------------------------
[docs]
async def advance_time(self, seconds: float) -> None:
"""Advance virtual time and yield to the event loop.
Parameters:
seconds: Number of seconds to advance. Must be >= 0.
Raises:
ValueError: If seconds is negative.
Example:
>>> await harness.advance_time(10.0) # Advance 10 seconds
"""
# Edge case: negative time
if seconds < 0:
raise ValueError(f"seconds must be >= 0, got {seconds}")
self._clock.advance(seconds)
await asyncio.sleep(0) # Yield to let tasks process
[docs]
async def run_until_idle(self, timeout: float = 5.0) -> None:
"""Run until queues are drained and processing completes.
Advances virtual time to process pending sleeps, then waits for
all queues to be fully processed using join().
Parameters:
timeout: Maximum virtual time to advance in seconds.
Must be positive. Default is 5.0 seconds.
Raises:
ValueError: If timeout is not positive.
Example:
>>> await harness.run_until_idle()
>>> await harness.run_until_idle(timeout=10.0)
"""
if timeout <= 0:
raise ValueError(f"timeout must be > 0, got {timeout}")
deadline = self._clock.perf_counter() + timeout
# Phase 1: Advance time to process all pending sleeps within deadline
while self._clock.perf_counter() < deadline:
await asyncio.sleep(0) # Yield to let handlers run
next_event = self._clock.next_event_time()
if next_event is None or next_event > deadline:
break
delta = next_event - self._clock.perf_counter()
if delta > 0:
self._clock.advance(delta)
# Phase 2: Wait for input queue to be fully consumed
try:
await asyncio.wait_for(
self._connection.wait_for_inputs(),
timeout=_HANDLER_TIMEOUT,
)
except asyncio.TimeoutError:
logger.warning(
"timed out waiting for inputs",
pending_inputs=self._connection.input_queue_size(),
)
# Phase 3: Wait for all stream processing to complete
try:
await asyncio.wait_for(
self._app.wait_for_processing(),
timeout=_HANDLER_TIMEOUT,
)
except asyncio.TimeoutError:
logger.warning("timed out waiting for processing")
# Phase 4: Give one-shot tasks (including sync tasks running in
# threads via asyncio.to_thread) a chance to finish before the
# harness disconnects and cancels everything. A short timeout
# avoids blocking on long-running/continuous tasks.
await self._app.wait_for_tasks(timeout=0.01)
# Log if there are still pending sleeps (created during processing)
pending_sleeps = self._clock.pending_sleeps()
if pending_sleeps > 0:
logger.debug(
"pending sleeps remain after run_until_idle",
pending_sleeps=pending_sleeps,
)
# ------------------------------------------------
# Source Management
# ------------------------------------------------
async def _feed_source(self, source: DataSource) -> None:
"""Feed messages from a source into the input queue.
Parameters:
source: DataSource to read messages from.
Raises:
Exception: Propagates any exception from the source after logging.
"""
try:
async for msg in source.generate(self._clock):
await self._connection.inject(msg)
self._inputs.append(msg)
except asyncio.CancelledError:
# Normal cancellation during stop, propagate without logging
raise
except Exception:
logger.exception("data source failed", source=type(source).__name__)
raise
async def _start_sources(self) -> None:
"""Start all data sources as background tasks."""
for source in self._sources:
task = asyncio.create_task(self._feed_source(source))
self._source_tasks.append(task)
async def _stop_sources(self) -> None:
"""Stop all data source tasks gracefully.
Cancels running tasks and awaits their completion. Logs a warning
if any tasks failed with exceptions (excluding cancellation).
"""
errors: list[BaseException] = []
for task in self._source_tasks:
if not task.done():
_ = task.cancel()
try:
await task
except asyncio.CancelledError:
# Normal cancellation, not an error
pass
except BaseException as e:
# Re-raise system-level exceptions immediately
if isinstance(e, (KeyboardInterrupt, SystemExit)):
raise
errors.append(e)
self._source_tasks.clear()
if errors:
logger.warning("source tasks failed during stop", error_count=len(errors))
# ------------------------------------------------
# Connection Lifecycle
# ------------------------------------------------
[docs]
async def connect(self) -> Self:
"""Connect the app and start data sources.
Clears inputs/outputs for a fresh start. Can be called again after
disconnect() to reuse the harness.
Returns:
Self for method chaining.
Raises:
RuntimeError: If already connected.
Example:
>>> await harness.connect()
>>> # ... run tests ...
>>> await harness.disconnect()
"""
# Edge case: already connected
if self._connected:
raise RuntimeError("cannot connect: KelvinAppTest is already connected")
# Reset state for clean reuse
self._inputs.clear()
self._outputs.clear()
# Recreate the InMemoryStream inside the running event loop so that
# asyncio.Queue instances are bound to the current loop (required
# for Python 3.9 where Queue.__init__ captures the running loop).
self._connection = InMemoryStream(self._manifest)
# Replace app's connection and clock with test versions
self._replace_app_state()
try:
await self._app.connect()
await self._start_sources()
except Exception:
# Cleanup on failure
await self._stop_sources()
await self._app.disconnect()
self._restore_app_state()
raise
self._connected = True
logger.debug("harness connected", sources=len(self._sources))
return self
[docs]
async def disconnect(self) -> None:
"""Disconnect the app and stop data sources.
Safe to call multiple times or when not connected (no-op).
Example:
>>> await harness.disconnect()
"""
# Edge case: not connected - no-op
if not self._connected:
return
try:
await self._stop_sources()
await self._app.disconnect()
finally:
self._restore_app_state()
self._connected = False
logger.debug("harness disconnected")
# ------------------------------------------------
# Context Manager
# ------------------------------------------------
[docs]
async def __aenter__(self) -> Self:
"""Enter the async context and connect.
Returns:
Self for use in the context.
Example:
>>> async with KelvinAppTest(app, manifest) as harness:
... await harness.publish(msg)
"""
return await self.connect()
[docs]
async def __aexit__(
self,
_exc_type: Optional[type[BaseException]] = None,
_exc_value: Optional[BaseException] = None,
_traceback: Optional[TracebackType] = None,
) -> None:
"""Exit the async context and disconnect.
Always disconnects, even if an exception occurred.
"""
await self.disconnect()