Source code for kelvin.testing.sources.base

"""Base class for data sources."""

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from datetime import timedelta
from typing import TYPE_CHECKING, Optional

from typing_extensions import Self

if TYPE_CHECKING:
    from kelvin.application.clock import VirtualClock
    from kelvin.message import Message


[docs] class DataSource(ABC): """Abstract base class for test data sources. Data sources generate messages that are fed into the test harness as simulated inputs. They can be time-based (CSV replay, synthetic waveforms) or event-based (random generation). Usage: source = CSVSource("data.csv").with_asset("pump-001") test.add_source(source) """
[docs] def __init__(self) -> None: """Initialize the data source.""" self._asset: Optional[str] = None self._interval: timedelta = timedelta(seconds=1)
[docs] @abstractmethod async def generate(self, clock: VirtualClock) -> AsyncGenerator[Message, None]: """Generate messages. This is an async generator that yields messages at appropriate times. The implementation should use clock.sleep() to wait between messages for deterministic time-based generation. Parameters: clock: VirtualClock for time control. Yields: Message instances to inject as inputs. """ yield # type: ignore[return-value, misc] # pragma: no cover
[docs] def with_asset(self, asset: str) -> Self: """Set the asset name for generated messages. Parameters: asset: Asset name to use. Returns: Self for chaining. """ self._asset = asset return self
[docs] def with_timing(self, interval: timedelta) -> Self: """Set the timing interval between messages. Parameters: interval: Time interval between messages. Returns: Self for chaining. """ self._interval = interval return self
@property def asset(self) -> Optional[str]: """Get the configured asset name.""" return self._asset @property def interval(self) -> timedelta: """Get the configured timing interval.""" return self._interval