Source code for kelvin.testing.app_test

"""KelvinAppTest harness for testing KelvinApp instances.

This module provides a test harness that wraps a KelvinApp with an in-memory
stream and virtual clock for deterministic testing without network dependencies.

Example:
    >>> app = KelvinApp()
    >>> manifest = ManifestBuilder().add_asset("test").build()
    >>> async with KelvinAppTest(app, manifest) as harness:
    ...     await harness.publish(Number(resource=..., payload=5.0))
    ...     await harness.run_until_idle()
    ...     assert len(harness.outputs) == 1
"""

from __future__ import annotations

import asyncio
from collections.abc import Iterable
from types import TracebackType
from typing import TYPE_CHECKING, Any, Optional, Union, cast, final

import structlog
from typing_extensions import Self, override

from kelvin.application.clock import VirtualClock
from kelvin.message import Message
from kelvin.message.msg_builders import MessageBuilder
from kelvin.message.runtime_manifest import RuntimeManifest
from kelvin.testing.in_memory_stream import InMemoryStream

if TYPE_CHECKING:
    from kelvin.application.client import KelvinApp
    from kelvin.testing.sources.base import DataSource

logger = structlog.get_logger(__name__)

# Default timeout for waiting on message handlers (real-time seconds)
_HANDLER_TIMEOUT: float = 5.0


# ====================================================
# KelvinAppTest Harness
# ====================================================


[docs] @final class KelvinAppTest: """Test harness for KelvinApp instances. Wraps a KelvinApp with an in-memory InMemoryStream and VirtualClock for deterministic testing without network dependencies. Supports reuse after disconnect - state is cleared on each connect. Attributes: app: The underlying KelvinApp instance. manifest: The RuntimeManifest injected on connect. clock: The VirtualClock for time control. is_connected: Whether the harness is currently connected. inputs: Copy of all injected input messages. outputs: Copy of all captured output messages. Example: >>> async with KelvinAppTest(app, manifest) as harness: ... await harness.publish(msg) ... await harness.run_until_idle() ... assert len(harness.outputs) == 1 """ # ------------------------------------------------ # Initialization # ------------------------------------------------
[docs] def __init__( self, app: KelvinApp, manifest: RuntimeManifest, clock: Optional[VirtualClock] = None, ) -> None: """Initialize the test harness. Parameters: app: The KelvinApp instance to test. Must be disconnected. manifest: RuntimeManifest to inject when connecting. clock: Optional VirtualClock. If None, creates one with manual time control. Raises: RuntimeError: If the app is already connected or has a running task. ValueError: If the app is missing required internal attributes. Example: >>> harness = KelvinAppTest(app, manifest) >>> harness = KelvinAppTest(app, manifest, clock=VirtualClock()) """ self._validate_app(app) self._app = app self._original_connection = app.connection self._original_clock = app.clock self._connected = False self._clock = clock or VirtualClock() self._manifest = manifest self._connection = InMemoryStream(self._manifest) self._sources: list[DataSource] = [] self._inputs: list[Message] = [] self._outputs: list[Message] = [] self._source_tasks: list[asyncio.Task[None]] = []
[docs] @override def __repr__(self) -> str: """Return a string representation for debugging. Returns: String showing connection state and message counts. """ return ( f"KelvinAppTest(connected={self._connected}, " f"inputs={len(self._inputs)}, outputs={len(self._outputs)}, " f"pending_inputs={self._connection.input_queue_size()})" )
def _validate_app(self, app: KelvinApp) -> None: """Validate the app is properly initialized for testing. Parameters: app: The KelvinApp instance to validate. Raises: RuntimeError: If the app is connected or has a running read loop. ValueError: If the app is missing required internal attributes. """ # Edge case: app already connected if app.is_connected: raise RuntimeError("cannot initialize: KelvinAppTest requires a disconnected app") # Verify required internal attributes exist required_attrs = ("_connection", "_clock", "_read_loop_task", "_filters") for attr in required_attrs: if not hasattr(app, attr): raise ValueError(f"app missing required attribute: {attr}") # Edge case: app has a running task (partially initialized) if app._read_loop_task is not None and not app._read_loop_task.done(): # pyright: ignore[reportPrivateUsage] raise RuntimeError("cannot initialize: KelvinAppTest requires a fully stopped app") def _restore_app_state(self) -> None: """Restore the app's original connection and clock.""" self._app._connection = self._original_connection # pyright: ignore[reportPrivateUsage] self._app._clock = self._original_clock # pyright: ignore[reportPrivateUsage] def _replace_app_state(self) -> None: """Replace the app's connection and clock with test versions.""" self._app._connection = self._connection # pyright: ignore[reportPrivateUsage] self._app._clock = self._clock # pyright: ignore[reportPrivateUsage] # ------------------------------------------------ # Input Injection # ------------------------------------------------
[docs] def add_source(self, source: DataSource) -> Self: """Add a data source for input generation. Sources are started when connect() is called and stopped on disconnect(). Must be called before connect(). Parameters: source: DataSource that generates messages to inject. Returns: Self for method chaining. Raises: RuntimeError: If called after connect(). Example: >>> harness.add_source(source1).add_source(source2) """ if self._connected: raise RuntimeError("cannot add sources: KelvinAppTest is already connected") self._sources.append(source) return self
[docs] async def publish(self, msg: Union[Message, MessageBuilder[Any]]) -> None: """Inject a message into the input queue and record it. The message is added to the inputs list and injected into the in-memory stream for processing by the app. Parameters: msg: Message or MessageBuilder to inject as input. Raises: RuntimeError: If called before connect(). Example: >>> await harness.publish(Number(resource=krn, payload=5.0)) """ # Edge case: not connected if not self._connected: raise RuntimeError("cannot publish: KelvinAppTest is not connected") # Handle MessageBuilder by converting to Message if isinstance(msg, MessageBuilder): m = cast(Message, msg.to_message()) else: m = msg self._inputs.append(m) await self._connection.inject(m)
[docs] async def publish_batch(self, messages: Iterable[Union[Message, MessageBuilder[Any]]]) -> None: """Inject multiple messages sequentially. Parameters: messages: Iterable of messages or message builders to inject. Raises: RuntimeError: If called before connect(). Example: >>> await harness.publish_batch([msg1, msg2, msg3]) """ # Edge case: empty iterable is valid, just does nothing for msg in messages: await self.publish(msg)
# ------------------------------------------------ # State Inspection # ------------------------------------------------ @property def app(self) -> KelvinApp: """Get the underlying KelvinApp instance. Returns: The KelvinApp being tested. """ return self._app @property def manifest(self) -> RuntimeManifest: """Get the runtime manifest. Returns: The RuntimeManifest injected on connect. """ return self._manifest @property def clock(self) -> VirtualClock: """Get the virtual clock. Returns: The VirtualClock used for time control. """ return self._clock @property def is_connected(self) -> bool: """Check if the test harness is connected. Returns: True if connected, False otherwise. """ return self._connected @property def inputs(self) -> list[Message]: """Get a copy of all injected inputs. Returns: Copy of the inputs list. Modifications don't affect internal state. """ return self._inputs.copy() @property def outputs(self) -> list[Message]: """Get a copy of all captured outputs. Drains any pending outputs from the output queue before returning. Each access may return additional outputs if the app has published more. Returns: Copy of all captured output messages. """ # Drain pending outputs from queue into our list self._outputs.extend(self._connection.drain_outputs()) return self._outputs.copy() # ------------------------------------------------ # Time Control # ------------------------------------------------
[docs] async def advance_time(self, seconds: float) -> None: """Advance virtual time and yield to the event loop. Parameters: seconds: Number of seconds to advance. Must be >= 0. Raises: ValueError: If seconds is negative. Example: >>> await harness.advance_time(10.0) # Advance 10 seconds """ # Edge case: negative time if seconds < 0: raise ValueError(f"seconds must be >= 0, got {seconds}") self._clock.advance(seconds) await asyncio.sleep(0) # Yield to let tasks process
[docs] async def run_until_idle(self, timeout: float = 5.0) -> None: """Run until queues are drained and processing completes. Advances virtual time to process pending sleeps, then waits for all queues to be fully processed using join(). Parameters: timeout: Maximum virtual time to advance in seconds. Must be positive. Default is 5.0 seconds. Raises: ValueError: If timeout is not positive. Example: >>> await harness.run_until_idle() >>> await harness.run_until_idle(timeout=10.0) """ if timeout <= 0: raise ValueError(f"timeout must be > 0, got {timeout}") deadline = self._clock.perf_counter() + timeout # Phase 1: Advance time to process all pending sleeps within deadline while self._clock.perf_counter() < deadline: await asyncio.sleep(0) # Yield to let handlers run next_event = self._clock.next_event_time() if next_event is None or next_event > deadline: break delta = next_event - self._clock.perf_counter() if delta > 0: self._clock.advance(delta) # Phase 2: Wait for input queue to be fully consumed try: await asyncio.wait_for( self._connection.wait_for_inputs(), timeout=_HANDLER_TIMEOUT, ) except asyncio.TimeoutError: logger.warning( "timed out waiting for inputs", pending_inputs=self._connection.input_queue_size(), ) # Phase 3: Wait for all stream processing to complete try: await asyncio.wait_for( self._app.wait_for_processing(), timeout=_HANDLER_TIMEOUT, ) except asyncio.TimeoutError: logger.warning("timed out waiting for processing") # Phase 4: Give one-shot tasks (including sync tasks running in # threads via asyncio.to_thread) a chance to finish before the # harness disconnects and cancels everything. A short timeout # avoids blocking on long-running/continuous tasks. await self._app.wait_for_tasks(timeout=0.01) # Log if there are still pending sleeps (created during processing) pending_sleeps = self._clock.pending_sleeps() if pending_sleeps > 0: logger.debug( "pending sleeps remain after run_until_idle", pending_sleeps=pending_sleeps, )
# ------------------------------------------------ # Source Management # ------------------------------------------------ async def _feed_source(self, source: DataSource) -> None: """Feed messages from a source into the input queue. Parameters: source: DataSource to read messages from. Raises: Exception: Propagates any exception from the source after logging. """ try: async for msg in source.generate(self._clock): await self._connection.inject(msg) self._inputs.append(msg) except asyncio.CancelledError: # Normal cancellation during stop, propagate without logging raise except Exception: logger.exception("data source failed", source=type(source).__name__) raise async def _start_sources(self) -> None: """Start all data sources as background tasks.""" for source in self._sources: task = asyncio.create_task(self._feed_source(source)) self._source_tasks.append(task) async def _stop_sources(self) -> None: """Stop all data source tasks gracefully. Cancels running tasks and awaits their completion. Logs a warning if any tasks failed with exceptions (excluding cancellation). """ errors: list[BaseException] = [] for task in self._source_tasks: if not task.done(): _ = task.cancel() try: await task except asyncio.CancelledError: # Normal cancellation, not an error pass except BaseException as e: # Re-raise system-level exceptions immediately if isinstance(e, (KeyboardInterrupt, SystemExit)): raise errors.append(e) self._source_tasks.clear() if errors: logger.warning("source tasks failed during stop", error_count=len(errors)) # ------------------------------------------------ # Connection Lifecycle # ------------------------------------------------
[docs] async def connect(self) -> Self: """Connect the app and start data sources. Clears inputs/outputs for a fresh start. Can be called again after disconnect() to reuse the harness. Returns: Self for method chaining. Raises: RuntimeError: If already connected. Example: >>> await harness.connect() >>> # ... run tests ... >>> await harness.disconnect() """ # Edge case: already connected if self._connected: raise RuntimeError("cannot connect: KelvinAppTest is already connected") # Reset state for clean reuse self._inputs.clear() self._outputs.clear() # Recreate the InMemoryStream inside the running event loop so that # asyncio.Queue instances are bound to the current loop (required # for Python 3.9 where Queue.__init__ captures the running loop). self._connection = InMemoryStream(self._manifest) # Replace app's connection and clock with test versions self._replace_app_state() try: await self._app.connect() await self._start_sources() except Exception: # Cleanup on failure await self._stop_sources() await self._app.disconnect() self._restore_app_state() raise self._connected = True logger.debug("harness connected", sources=len(self._sources)) return self
[docs] async def disconnect(self) -> None: """Disconnect the app and stop data sources. Safe to call multiple times or when not connected (no-op). Example: >>> await harness.disconnect() """ # Edge case: not connected - no-op if not self._connected: return try: await self._stop_sources() await self._app.disconnect() finally: self._restore_app_state() self._connected = False logger.debug("harness disconnected")
# ------------------------------------------------ # Context Manager # ------------------------------------------------
[docs] async def __aenter__(self) -> Self: """Enter the async context and connect. Returns: Self for use in the context. Example: >>> async with KelvinAppTest(app, manifest) as harness: ... await harness.publish(msg) """ return await self.connect()
[docs] async def __aexit__( self, _exc_type: Optional[type[BaseException]] = None, _exc_value: Optional[BaseException] = None, _traceback: Optional[TracebackType] = None, ) -> None: """Exit the async context and disconnect. Always disconnects, even if an exception occurred. """ await self.disconnect()