Source code for kelvin.testing.in_memory_stream
"""InMemoryStream implementation for in-memory message passing."""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING, final
from typing_extensions import override
from kelvin.application.stream import StreamInterface
from kelvin.message import Message
if TYPE_CHECKING:
from kelvin.message.runtime_manifest import RuntimeManifest
[docs]
@final
class InMemoryStream(StreamInterface):
"""In-memory stream implementation for testing.
Replaces KelvinStream with queues for deterministic testing.
Messages can be injected via inject() and outputs collected via drain_outputs().
"""
[docs]
def __init__(self, manifest: RuntimeManifest) -> None:
"""Initialize the test stream.
Parameters:
manifest: RuntimeManifest to inject on connect.
"""
self._input_queue: asyncio.Queue[Message] = asyncio.Queue()
self._output_queue: asyncio.Queue[Message] = asyncio.Queue()
self._manifest = manifest
self._connected = False
[docs]
@override
async def connect(self) -> None:
"""Connect and inject the manifest as the first message.
Raises:
ConnectionError: If the stream is already connected.
"""
if self._connected:
raise ConnectionError("already connected to stream")
self._connected = True
# Inject manifest as first message
await self._input_queue.put(self._manifest)
[docs]
@override
async def disconnect(self) -> None:
"""Disconnect the stream."""
self._connected = False
[docs]
@override
async def read(self) -> Message:
"""Read the next message from the input queue.
Blocks until a message is available. Calls ``task_done()``
immediately after dequeuing so that :meth:`wait_for_inputs`
can track dequeue progress. Note that this signals the message
has been *dequeued*, not that the handler has finished processing it.
Raises:
ConnectionError: If the stream is not connected.
"""
if not self._connected:
raise ConnectionError("not connected to stream")
msg = await self._input_queue.get()
self._input_queue.task_done()
return msg
[docs]
@override
async def write(self, msg: Message) -> bool:
"""Write a message to the output queue.
Parameters:
msg: Message to write.
Returns:
True on success.
Raises:
ConnectionError: If the stream is not connected.
"""
if not self._connected:
raise ConnectionError("not connected to stream")
await self._output_queue.put(msg)
return True
[docs]
async def inject(self, msg: Message) -> None:
"""Inject a message into the input queue (for test harness).
Parameters:
msg: Message to inject as input.
Raises:
ConnectionError: If the stream is not connected.
"""
if not self._connected:
raise ConnectionError("not connected to stream")
await self._input_queue.put(msg)
[docs]
def drain_outputs(self) -> list[Message]:
"""Collect all outputs without blocking (for test harness).
Returns:
List of all messages in the output queue.
"""
outputs: list[Message] = []
while not self._output_queue.empty():
outputs.append(self._output_queue.get_nowait())
return outputs
@property
def is_connected(self) -> bool:
"""Check if the stream is connected."""
return self._connected
[docs]
def reset(self) -> None:
"""Clear all queues for harness reuse."""
while not self._input_queue.empty():
try:
_ = self._input_queue.get_nowait()
self._input_queue.task_done()
except asyncio.QueueEmpty:
break
while not self._output_queue.empty():
try:
_ = self._output_queue.get_nowait()
except asyncio.QueueEmpty:
break