Source code for kelvin.testing.sources.base
"""Base class for data sources."""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from datetime import timedelta
from typing import TYPE_CHECKING, Optional
from typing_extensions import Self
if TYPE_CHECKING:
from kelvin.application.clock import VirtualClock
from kelvin.message import Message
[docs]
class DataSource(ABC):
"""Abstract base class for test data sources.
Data sources generate messages that are fed into the test harness
as simulated inputs. They can be time-based (CSV replay, synthetic
waveforms) or event-based (random generation).
Usage:
source = CSVSource("data.csv").with_asset("pump-001")
test.add_source(source)
"""
[docs]
def __init__(self) -> None:
"""Initialize the data source."""
self._asset: Optional[str] = None
self._interval: timedelta = timedelta(seconds=1)
[docs]
@abstractmethod
async def generate(self, clock: VirtualClock) -> AsyncGenerator[Message, None]:
"""Generate messages.
This is an async generator that yields messages at appropriate times.
The implementation should use clock.sleep() to wait between messages
for deterministic time-based generation.
Parameters:
clock: VirtualClock for time control.
Yields:
Message instances to inject as inputs.
"""
yield # type: ignore[return-value, misc] # pragma: no cover
[docs]
def with_asset(self, asset: str) -> Self:
"""Set the asset name for generated messages.
Parameters:
asset: Asset name to use.
Returns:
Self for chaining.
"""
self._asset = asset
return self
[docs]
def with_timing(self, interval: timedelta) -> Self:
"""Set the timing interval between messages.
Parameters:
interval: Time interval between messages.
Returns:
Self for chaining.
"""
self._interval = interval
return self
@property
def asset(self) -> Optional[str]:
"""Get the configured asset name."""
return self._asset
@property
def interval(self) -> timedelta:
"""Get the configured timing interval."""
return self._interval