Source code for kelvin.application.stream

"""Kelvin Stream interface for message communication.

This module provides the low-level stream interface for connecting to and
communicating with the Kelvin platform via TCP sockets.

Main Components:
    StreamInterface: Abstract base class defining the stream protocol.
    KelvinStream: Concrete implementation for Kelvin platform communication.
    KelvinStreamConfig: Configuration settings for the stream connection.
"""

from __future__ import annotations

import asyncio
from abc import ABC, abstractmethod
from asyncio import StreamReader, StreamWriter
from typing import Optional

from pydantic_settings import BaseSettings

from kelvin.message import Message


[docs] class StreamInterface(ABC): """Abstract interface for a connection to a Kelvin system. This class defines the protocol for stream-based communication with the Kelvin platform. Implementations must provide connect, disconnect, read, and write operations. """
[docs] @abstractmethod async def connect(self) -> None: raise NotImplementedError
[docs] @abstractmethod async def disconnect(self) -> None: """Disconnect from the Kelvin Stream.""" raise NotImplementedError
[docs] @abstractmethod async def read(self) -> Message: """Read the next message from the stream.""" raise NotImplementedError
[docs] @abstractmethod async def write(self, msg: Message) -> bool: """Write a message to the stream.""" raise NotImplementedError
[docs] class KelvinStreamConfig(BaseSettings): """Configuration for the Kelvin Stream connection. Attributes: ip: The IP address of the Kelvin stream server. Default: "127.0.0.1". port: The port number for the stream connection. Default: 49167. limit: Maximum buffer size in bytes for the stream reader. Default: 4MB. Environment Variables: KELVIN_STREAM_IP: Override the IP address. KELVIN_STREAM_PORT: Override the port number. KELVIN_STREAM_LIMIT: Override the buffer limit. """ model_config = {"env_prefix": "KELVIN_STREAM_"} ip: str = "127.0.0.1" port: int = 49167 limit: int = 2**22
[docs] class KelvinStream(StreamInterface): """TCP stream implementation for Kelvin platform communication. This class manages a persistent TCP connection to the Kelvin stream server, handling message serialization/deserialization and connection lifecycle. Attributes: _config: The stream configuration settings. _reader: The async stream reader (None when disconnected). _writer: The async stream writer (None when disconnected). """ _reader: Optional[StreamReader] _writer: Optional[StreamWriter]
[docs] def __init__(self, config: KelvinStreamConfig = KelvinStreamConfig()) -> None: """Initialize the Kelvin Stream. Args: config: Stream configuration settings. Uses defaults if not provided. """ self._config = config self._writer = None self._reader = None
[docs] async def connect(self) -> None: """Connect to the Kelvin Stream server. Raises: ConnectionError: If the stream server is unreachable. """ self._reader, self._writer = await asyncio.open_connection( self._config.ip, self._config.port, limit=self._config.limit )
[docs] async def disconnect(self) -> None: """Disconnects from Kelvin Stream""" if self._writer: self._writer.close() await self._writer.wait_closed()
[docs] async def read(self) -> Message: """Reads the next Kelvin Message Raises: ConnectionError: When connection is unavailable. Returns: Message: the read Message """ if not self._reader: raise ConnectionError("Not connected to stream.") data = await self._reader.readline() if not len(data): raise ConnectionError("Connection lost.") return Message.model_validate_json(data)
[docs] async def write(self, msg: Message) -> bool: """Writes a Message to the Kelvin Stream Args: msg (Message): Kelvin message to write Raises: ConnectionError: If the connection is lost. Returns: bool: True if the message was sent with success. """ if not self._writer: raise ConnectionError("Not connected to stream.") self._writer.write(msg.encode() + b"\n") await self._writer.drain() return True