Source code for kelvin.testing.in_memory_stream

"""InMemoryStream implementation for in-memory message passing."""

from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, final

from typing_extensions import override

from kelvin.application.stream import StreamInterface
from kelvin.message import Message

if TYPE_CHECKING:
    from kelvin.message.runtime_manifest import RuntimeManifest


[docs] @final class InMemoryStream(StreamInterface): """In-memory stream implementation for testing. Replaces KelvinStream with queues for deterministic testing. Messages can be injected via inject() and outputs collected via drain_outputs(). """
[docs] def __init__(self, manifest: RuntimeManifest) -> None: """Initialize the test stream. Parameters: manifest: RuntimeManifest to inject on connect. """ self._input_queue: asyncio.Queue[Message] = asyncio.Queue() self._output_queue: asyncio.Queue[Message] = asyncio.Queue() self._manifest = manifest self._connected = False
[docs] @override async def connect(self) -> None: """Connect and inject the manifest as the first message. Raises: ConnectionError: If the stream is already connected. """ if self._connected: raise ConnectionError("already connected to stream") self._connected = True # Inject manifest as first message await self._input_queue.put(self._manifest)
[docs] @override async def disconnect(self) -> None: """Disconnect the stream.""" self._connected = False
[docs] @override async def read(self) -> Message: """Read the next message from the input queue. Blocks until a message is available. Calls ``task_done()`` immediately after dequeuing so that :meth:`wait_for_inputs` can track dequeue progress. Note that this signals the message has been *dequeued*, not that the handler has finished processing it. Raises: ConnectionError: If the stream is not connected. """ if not self._connected: raise ConnectionError("not connected to stream") msg = await self._input_queue.get() self._input_queue.task_done() return msg
[docs] @override async def write(self, msg: Message) -> bool: """Write a message to the output queue. Parameters: msg: Message to write. Returns: True on success. Raises: ConnectionError: If the stream is not connected. """ if not self._connected: raise ConnectionError("not connected to stream") await self._output_queue.put(msg) return True
[docs] async def inject(self, msg: Message) -> None: """Inject a message into the input queue (for test harness). Parameters: msg: Message to inject as input. Raises: ConnectionError: If the stream is not connected. """ if not self._connected: raise ConnectionError("not connected to stream") await self._input_queue.put(msg)
[docs] def drain_outputs(self) -> list[Message]: """Collect all outputs without blocking (for test harness). Returns: List of all messages in the output queue. """ outputs: list[Message] = [] while not self._output_queue.empty(): outputs.append(self._output_queue.get_nowait()) return outputs
@property def is_connected(self) -> bool: """Check if the stream is connected.""" return self._connected
[docs] def input_queue_size(self) -> int: """Get the number of messages waiting in the input queue.""" return self._input_queue.qsize()
[docs] async def wait_for_inputs(self) -> None: """Wait for all input messages to be dequeued. Blocks until every message in the input queue has been read (``task_done`` called). This signals that messages have been *dequeued*, not that handlers have finished processing them. Use :meth:`KelvinAppTest.run_until_idle` for full synchronization including handler completion. """ await self._input_queue.join()
[docs] def reset(self) -> None: """Clear all queues for harness reuse.""" while not self._input_queue.empty(): try: _ = self._input_queue.get_nowait() self._input_queue.task_done() except asyncio.QueueEmpty: break while not self._output_queue.empty(): try: _ = self._output_queue.get_nowait() except asyncio.QueueEmpty: break