"""Synthetic data source with wave pattern generation."""
from __future__ import annotations
import math
import random
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from datetime import timedelta
from typing import TYPE_CHECKING, Optional, final
from typing_extensions import override
from kelvin.krn import KRNAssetDataStream
from kelvin.message import Message, Number
from kelvin.testing.sources.base import DataSource
if TYPE_CHECKING:
from kelvin.application.clock import VirtualClock
[docs]
class WavePattern(ABC):
"""Abstract base class for wave patterns.
Wave patterns define mathematical functions that generate
values based on time. They can be combined and modified
to create complex test signals.
"""
[docs]
@abstractmethod
def value_at(self, t: float) -> float:
"""Get the value at time t.
Parameters:
t: Time in seconds.
Returns:
Value at time t.
"""
pass
[docs]
@final
class SineWave(WavePattern):
"""Sine wave pattern.
Generates values following a sinusoidal function:
value = offset + amplitude * sin(2*pi*t/period + phase)
::
+amp | *****
| ** **
0 |** *** **
| ** **
-amp | *****
+-----------------------------> t
0 T/4 T/2 3T/4 T
"""
[docs]
def __init__(
self,
amplitude: float = 1.0,
period: Optional[timedelta] = None,
offset: float = 0.0,
phase: float = 0.0,
) -> None:
"""Initialize the sine wave.
Parameters:
amplitude: Wave amplitude (peak deviation from offset).
period: Time for one complete cycle.
offset: DC offset (center value).
phase: Phase offset in radians.
"""
if period is None:
period = timedelta(seconds=60)
self.amplitude = amplitude
self.period = period.total_seconds()
self.offset = offset
self.phase = phase
[docs]
@override
def value_at(self, t: float) -> float:
"""Get the sine wave value at time t."""
return self.offset + self.amplitude * math.sin(2 * math.pi * t / self.period + self.phase)
[docs]
@final
class SquareWave(WavePattern):
"""Square wave pattern.
Generates values that alternate between high and low:
- First half of period: offset + amplitude
- Second half of period: offset - amplitude
.. code-block:: text
+amp ┌─────────┐ ┌─────────┐
│ │ │ │
-amp ┘ └───────────┘ └───
t -->
|← duty →|
|←──── period ────→|
"""
[docs]
def __init__(
self,
amplitude: float = 1.0,
period: Optional[timedelta] = None,
offset: float = 0.0,
duty_cycle: float = 0.5,
) -> None:
"""Initialize the square wave.
Parameters:
amplitude: Wave amplitude.
period: Time for one complete cycle.
offset: DC offset.
duty_cycle: Fraction of period spent at high value (0-1).
"""
if period is None:
period = timedelta(seconds=60)
self.amplitude = amplitude
self.period = period.total_seconds()
self.offset = offset
self.duty_cycle = duty_cycle
[docs]
@override
def value_at(self, t: float) -> float:
"""Get the square wave value at time t."""
phase = (t % self.period) / self.period
if phase < self.duty_cycle:
return self.offset + self.amplitude
return self.offset - self.amplitude
[docs]
@final
class RampWave(WavePattern):
"""Ramp (sawtooth) wave pattern.
Generates values that linearly increase from min to max,
then reset to min.
::
max | /| /| /|
| / | / | / |
| / | / | / |
min | / | / | / |
+-------------------------> t
|period|
"""
[docs]
def __init__(
self,
min_value: float = 0.0,
max_value: float = 1.0,
period: Optional[timedelta] = None,
) -> None:
"""Initialize the ramp wave.
Parameters:
min_value: Minimum value (start of ramp).
max_value: Maximum value (end of ramp).
period: Time for one complete cycle.
"""
if period is None:
period = timedelta(seconds=60)
self.min_value = min_value
self.max_value = max_value
self.period = period.total_seconds()
[docs]
@override
def value_at(self, t: float) -> float:
"""Get the ramp wave value at time t."""
phase = (t % self.period) / self.period
return self.min_value + (self.max_value - self.min_value) * phase
[docs]
@final
class NoiseWave(WavePattern):
"""Noise wave that adds gaussian noise to another pattern.
Wraps a base pattern and adds random noise to its values.
.. code-block:: text
| *
| * * * *
base |*───────*──*────────
| * *
| *
+-------------------------> t
"""
[docs]
def __init__(
self,
base: WavePattern,
std_dev: float = 1.0,
seed: Optional[int] = None,
) -> None:
"""Initialize the noise wave.
Parameters:
base: Base wave pattern to add noise to.
std_dev: Standard deviation of gaussian noise.
seed: Optional random seed for reproducibility.
"""
self.base = base
self.std_dev = std_dev
self._rng = random.Random(seed)
[docs]
@override
def value_at(self, t: float) -> float:
"""Get the base wave value plus noise at time t."""
base_value = self.base.value_at(t)
noise = self._rng.gauss(0, self.std_dev)
return base_value + noise
[docs]
@final
class ConstantWave(WavePattern):
"""Constant value pattern.
Always returns the same value, useful for baseline testing.
.. code-block:: text
|
value|──────────────────────
|
+-------------------------> t
"""
[docs]
def __init__(self, value: float = 0.0) -> None:
"""Initialize the constant wave.
Parameters:
value: Constant value to return.
"""
self.value = value
[docs]
@override
def value_at(self, t: float) -> float:
"""Get the constant value."""
return self.value
[docs]
@final
class SyntheticSource(DataSource):
"""Data source that generates synthetic values from wave patterns.
Generates messages at regular intervals with values computed
from a wave pattern function.
Usage:
source = SyntheticSource(
pattern=SineWave(amplitude=10, period=timedelta(minutes=1)),
datastream="temperature",
).with_asset("sensor-001")
"""
[docs]
def __init__(
self,
pattern: WavePattern,
datastream: str,
sample_rate: Optional[timedelta] = None,
duration: Optional[timedelta] = None,
) -> None:
"""Initialize the synthetic source.
Parameters:
pattern: Wave pattern for value generation.
datastream: Name of the datastream for messages.
sample_rate: Time between samples.
duration: Optional total duration. If None, generates indefinitely.
"""
if sample_rate is None:
sample_rate = timedelta(seconds=1)
super().__init__()
self._pattern = pattern
self._datastream = datastream
self._sample_rate = sample_rate
self._duration = duration
[docs]
@override
async def generate(self, clock: VirtualClock) -> AsyncGenerator[Message, None]:
"""Generate messages from the wave pattern.
Parameters:
clock: VirtualClock for time control.
Yields:
Number messages with values from the pattern.
"""
if self._asset is None:
return
resource = KRNAssetDataStream(self._asset, self._datastream)
start_time = clock.perf_counter()
sample_interval = self._sample_rate.total_seconds()
while True:
# Check duration limit
elapsed = clock.perf_counter() - start_time
if self._duration is not None and elapsed >= self._duration.total_seconds():
break
# Generate value from pattern
value = self._pattern.value_at(elapsed)
yield Number(
resource=resource,
payload=value,
timestamp=clock.now(),
)
# Wait for next sample
await clock.sleep(sample_interval)