Source code for kelvin.application.stream
"""Kelvin Stream interface for message communication.
This module provides the low-level stream interface for connecting to and
communicating with the Kelvin platform via TCP sockets.
Main Components:
StreamInterface: Abstract base class defining the stream protocol.
KelvinStream: Concrete implementation for Kelvin platform communication.
KelvinStreamConfig: Configuration settings for the stream connection.
"""
from __future__ import annotations
import asyncio
from abc import ABC, abstractmethod
from asyncio import StreamReader, StreamWriter
from typing import Optional
from pydantic_settings import BaseSettings
from kelvin.message import Message
[docs]
class StreamInterface(ABC):
"""Abstract interface for a connection to a Kelvin system.
This class defines the protocol for stream-based communication with
the Kelvin platform. Implementations must provide connect, disconnect,
read, and write operations.
"""
[docs]
@abstractmethod
async def connect(self) -> None:
raise NotImplementedError
[docs]
@abstractmethod
async def disconnect(self) -> None:
"""Disconnect from the Kelvin Stream."""
raise NotImplementedError
[docs]
@abstractmethod
async def read(self) -> Message:
"""Read the next message from the stream."""
raise NotImplementedError
[docs]
@abstractmethod
async def write(self, msg: Message) -> bool:
"""Write a message to the stream."""
raise NotImplementedError
[docs]
class KelvinStreamConfig(BaseSettings):
"""Configuration for the Kelvin Stream connection.
Attributes:
ip: The IP address of the Kelvin stream server. Default: "127.0.0.1".
port: The port number for the stream connection. Default: 49167.
limit: Maximum buffer size in bytes for the stream reader. Default: 4MB.
Environment Variables:
KELVIN_STREAM_IP: Override the IP address.
KELVIN_STREAM_PORT: Override the port number.
KELVIN_STREAM_LIMIT: Override the buffer limit.
"""
model_config = {"env_prefix": "KELVIN_STREAM_"}
ip: str = "127.0.0.1"
port: int = 49167
limit: int = 2**22
[docs]
class KelvinStream(StreamInterface):
"""TCP stream implementation for Kelvin platform communication.
This class manages a persistent TCP connection to the Kelvin stream server,
handling message serialization/deserialization and connection lifecycle.
Attributes:
_config: The stream configuration settings.
_reader: The async stream reader (None when disconnected).
_writer: The async stream writer (None when disconnected).
"""
_reader: Optional[StreamReader]
_writer: Optional[StreamWriter]
[docs]
def __init__(self, config: KelvinStreamConfig = KelvinStreamConfig()) -> None:
"""Initialize the Kelvin Stream.
Args:
config: Stream configuration settings. Uses defaults if not provided.
"""
self._config = config
self._writer = None
self._reader = None
[docs]
async def connect(self) -> None:
"""Connect to the Kelvin Stream server.
Raises:
ConnectionError: If the stream server is unreachable.
"""
self._reader, self._writer = await asyncio.open_connection(
self._config.ip, self._config.port, limit=self._config.limit
)
[docs]
async def disconnect(self) -> None:
"""Disconnects from Kelvin Stream"""
if self._writer:
self._writer.close()
await self._writer.wait_closed()
[docs]
async def read(self) -> Message:
"""Reads the next Kelvin Message
Raises:
ConnectionError: When connection is unavailable.
Returns:
Message: the read Message
"""
if not self._reader:
raise ConnectionError("Not connected to stream.")
data = await self._reader.readline()
if not len(data):
raise ConnectionError("Connection lost.")
return Message.model_validate_json(data)
[docs]
async def write(self, msg: Message) -> bool:
"""Writes a Message to the Kelvin Stream
Args:
msg (Message): Kelvin message to write
Raises:
ConnectionError: If the connection is lost.
Returns:
bool: True if the message was sent with success.
"""
if not self._writer:
raise ConnectionError("Not connected to stream.")
self._writer.write(msg.encode() + b"\n")
await self._writer.drain()
return True