"""Random data source for fuzz testing."""
from __future__ import annotations
import random
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Optional, Union, final
from typing_extensions import override
from kelvin.krn import KRNAssetDataStream
from kelvin.message import Boolean, Message, Number, String
from kelvin.testing.sources.base import DataSource
if TYPE_CHECKING:
from kelvin.application.clock import VirtualClock
_VALID_VALUE_TYPES = frozenset({"number", "boolean", "string"})
[docs]
@final
class RandomSource(DataSource):
"""Data source that generates random values for fuzz testing.
Generates messages with random numeric values within specified bounds,
optionally across multiple datastreams.
Usage:
source = RandomSource(
datastreams=["temperature", "pressure"],
min_value=0,
max_value=100,
).with_asset("sensor-001")
"""
[docs]
def __init__(
self,
datastreams: Optional[list[str]] = None,
min_value: float = 0.0,
max_value: float = 100.0,
seed: Optional[int] = None,
count: Optional[int] = None,
value_type: str = "number",
) -> None:
"""Initialize the random source.
Parameters:
datastreams: List of datastream names. If None, uses ["value"].
min_value: Minimum random value (for numeric types).
max_value: Maximum random value (for numeric types).
seed: Optional random seed for reproducibility.
count: Optional number of messages to generate. If None, generates indefinitely.
value_type: Type of values to generate ("number", "string", "boolean").
"""
super().__init__()
self._datastreams = datastreams or ["value"]
self._min_value = min_value
self._max_value = max_value
self._seed = seed
self._count = count
if value_type not in _VALID_VALUE_TYPES:
raise ValueError(
f"unsupported value_type {value_type!r}, must be one of: {', '.join(sorted(_VALID_VALUE_TYPES))}"
)
self._value_type = value_type
self._rng = random.Random(seed)
[docs]
@override
async def generate(self, clock: VirtualClock) -> AsyncGenerator[Message, None]:
"""Generate random messages.
Parameters:
clock: VirtualClock for time control.
Yields:
Messages with random values.
"""
if self._asset is None:
return
generated = 0
sample_interval = self._interval.total_seconds()
while self._count is None or generated < self._count:
# Pick a random datastream
datastream = self._rng.choice(self._datastreams)
resource = KRNAssetDataStream(self._asset, datastream)
# Generate message based on type
msg = self._generate_message(resource, clock)
if msg:
yield msg
generated += 1
# Wait for next sample
await clock.sleep(sample_interval)
def _generate_message(self, resource: KRNAssetDataStream, clock: VirtualClock) -> Optional[Message]:
"""Generate a random message.
Parameters:
resource: Target resource.
clock: Clock for timestamp.
Returns:
Random message.
"""
timestamp = clock.now()
value: Union[float, bool, str]
if self._value_type == "number":
value = self._rng.uniform(self._min_value, self._max_value)
return Number(resource=resource, payload=value, timestamp=timestamp)
elif self._value_type == "boolean":
value = self._rng.choice([True, False])
return Boolean(resource=resource, payload=value, timestamp=timestamp)
elif self._value_type == "string":
# Generate a random string
length = self._rng.randint(5, 20)
chars = "abcdefghijklmnopqrstuvwxyz0123456789"
value = "".join(self._rng.choice(chars) for _ in range(length))
return String(resource=resource, payload=value, timestamp=timestamp)
return None
[docs]
def with_datastreams(self, datastreams: list[str]) -> "RandomSource":
"""Set the datastreams to generate for.
Parameters:
datastreams: List of datastream names.
Returns:
Self for chaining.
"""
self._datastreams = datastreams
return self
[docs]
def with_range(self, min_value: float, max_value: float) -> "RandomSource":
"""Set the value range for numeric generation.
Parameters:
min_value: Minimum value.
max_value: Maximum value.
Returns:
Self for chaining.
"""
self._min_value = min_value
self._max_value = max_value
return self
[docs]
def with_count(self, count: int) -> "RandomSource":
"""Set the number of messages to generate.
Parameters:
count: Number of messages.
Returns:
Self for chaining.
"""
self._count = count
return self