Source code for kelvin.testing.sources.synthetic

"""Synthetic data source with wave pattern generation."""

from __future__ import annotations

import math
import random
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from datetime import timedelta
from typing import TYPE_CHECKING, Optional, final

from typing_extensions import override

from kelvin.krn import KRNAssetDataStream
from kelvin.message import Message, Number
from kelvin.testing.sources.base import DataSource

if TYPE_CHECKING:
    from kelvin.application.clock import VirtualClock


[docs] class WavePattern(ABC): """Abstract base class for wave patterns. Wave patterns define mathematical functions that generate values based on time. They can be combined and modified to create complex test signals. """
[docs] @abstractmethod def value_at(self, t: float) -> float: """Get the value at time t. Parameters: t: Time in seconds. Returns: Value at time t. """ pass
[docs] @final class SineWave(WavePattern): """Sine wave pattern. Generates values following a sinusoidal function: value = offset + amplitude * sin(2*pi*t/period + phase) :: +amp | ***** | ** ** 0 |** *** ** | ** ** -amp | ***** +-----------------------------> t 0 T/4 T/2 3T/4 T """
[docs] def __init__( self, amplitude: float = 1.0, period: Optional[timedelta] = None, offset: float = 0.0, phase: float = 0.0, ) -> None: """Initialize the sine wave. Parameters: amplitude: Wave amplitude (peak deviation from offset). period: Time for one complete cycle. offset: DC offset (center value). phase: Phase offset in radians. """ if period is None: period = timedelta(seconds=60) self.amplitude = amplitude self.period = period.total_seconds() self.offset = offset self.phase = phase
[docs] @override def value_at(self, t: float) -> float: """Get the sine wave value at time t.""" return self.offset + self.amplitude * math.sin(2 * math.pi * t / self.period + self.phase)
[docs] @final class SquareWave(WavePattern): """Square wave pattern. Generates values that alternate between high and low: - First half of period: offset + amplitude - Second half of period: offset - amplitude .. code-block:: text +amp ┌─────────┐ ┌─────────┐ │ │ │ │ -amp ┘ └───────────┘ └─── t --> |← duty →| |←──── period ────→| """
[docs] def __init__( self, amplitude: float = 1.0, period: Optional[timedelta] = None, offset: float = 0.0, duty_cycle: float = 0.5, ) -> None: """Initialize the square wave. Parameters: amplitude: Wave amplitude. period: Time for one complete cycle. offset: DC offset. duty_cycle: Fraction of period spent at high value (0-1). """ if period is None: period = timedelta(seconds=60) self.amplitude = amplitude self.period = period.total_seconds() self.offset = offset self.duty_cycle = duty_cycle
[docs] @override def value_at(self, t: float) -> float: """Get the square wave value at time t.""" phase = (t % self.period) / self.period if phase < self.duty_cycle: return self.offset + self.amplitude return self.offset - self.amplitude
[docs] @final class RampWave(WavePattern): """Ramp (sawtooth) wave pattern. Generates values that linearly increase from min to max, then reset to min. :: max | /| /| /| | / | / | / | | / | / | / | min | / | / | / | +-------------------------> t |period| """
[docs] def __init__( self, min_value: float = 0.0, max_value: float = 1.0, period: Optional[timedelta] = None, ) -> None: """Initialize the ramp wave. Parameters: min_value: Minimum value (start of ramp). max_value: Maximum value (end of ramp). period: Time for one complete cycle. """ if period is None: period = timedelta(seconds=60) self.min_value = min_value self.max_value = max_value self.period = period.total_seconds()
[docs] @override def value_at(self, t: float) -> float: """Get the ramp wave value at time t.""" phase = (t % self.period) / self.period return self.min_value + (self.max_value - self.min_value) * phase
[docs] @final class NoiseWave(WavePattern): """Noise wave that adds gaussian noise to another pattern. Wraps a base pattern and adds random noise to its values. .. code-block:: text | * | * * * * base |*───────*──*──────── | * * | * +-------------------------> t """
[docs] def __init__( self, base: WavePattern, std_dev: float = 1.0, seed: Optional[int] = None, ) -> None: """Initialize the noise wave. Parameters: base: Base wave pattern to add noise to. std_dev: Standard deviation of gaussian noise. seed: Optional random seed for reproducibility. """ self.base = base self.std_dev = std_dev self._rng = random.Random(seed)
[docs] @override def value_at(self, t: float) -> float: """Get the base wave value plus noise at time t.""" base_value = self.base.value_at(t) noise = self._rng.gauss(0, self.std_dev) return base_value + noise
[docs] @final class ConstantWave(WavePattern): """Constant value pattern. Always returns the same value, useful for baseline testing. .. code-block:: text | value|────────────────────── | +-------------------------> t """
[docs] def __init__(self, value: float = 0.0) -> None: """Initialize the constant wave. Parameters: value: Constant value to return. """ self.value = value
[docs] @override def value_at(self, t: float) -> float: """Get the constant value.""" return self.value
[docs] @final class SyntheticSource(DataSource): """Data source that generates synthetic values from wave patterns. Generates messages at regular intervals with values computed from a wave pattern function. Usage: source = SyntheticSource( pattern=SineWave(amplitude=10, period=timedelta(minutes=1)), datastream="temperature", ).with_asset("sensor-001") """
[docs] def __init__( self, pattern: WavePattern, datastream: str, sample_rate: Optional[timedelta] = None, duration: Optional[timedelta] = None, ) -> None: """Initialize the synthetic source. Parameters: pattern: Wave pattern for value generation. datastream: Name of the datastream for messages. sample_rate: Time between samples. duration: Optional total duration. If None, generates indefinitely. """ if sample_rate is None: sample_rate = timedelta(seconds=1) super().__init__() self._pattern = pattern self._datastream = datastream self._sample_rate = sample_rate self._duration = duration
[docs] @override async def generate(self, clock: VirtualClock) -> AsyncGenerator[Message, None]: """Generate messages from the wave pattern. Parameters: clock: VirtualClock for time control. Yields: Number messages with values from the pattern. """ if self._asset is None: return resource = KRNAssetDataStream(self._asset, self._datastream) start_time = clock.perf_counter() sample_interval = self._sample_rate.total_seconds() while True: # Check duration limit elapsed = clock.perf_counter() - start_time if self._duration is not None and elapsed >= self._duration.total_seconds(): break # Generate value from pattern value = self._pattern.value_at(elapsed) yield Number( resource=resource, payload=value, timestamp=clock.now(), ) # Wait for next sample await clock.sleep(sample_interval)