Source code for kelvin.testing.sources.random_source

"""Random data source for fuzz testing."""

from __future__ import annotations

import random
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Optional, Union, final

from typing_extensions import override

from kelvin.krn import KRNAssetDataStream
from kelvin.message import Boolean, Message, Number, String
from kelvin.testing.sources.base import DataSource

if TYPE_CHECKING:
    from kelvin.application.clock import VirtualClock

_VALID_VALUE_TYPES = frozenset({"number", "boolean", "string"})


[docs] @final class RandomSource(DataSource): """Data source that generates random values for fuzz testing. Generates messages with random numeric values within specified bounds, optionally across multiple datastreams. Usage: source = RandomSource( datastreams=["temperature", "pressure"], min_value=0, max_value=100, ).with_asset("sensor-001") """
[docs] def __init__( self, datastreams: Optional[list[str]] = None, min_value: float = 0.0, max_value: float = 100.0, seed: Optional[int] = None, count: Optional[int] = None, value_type: str = "number", ) -> None: """Initialize the random source. Parameters: datastreams: List of datastream names. If None, uses ["value"]. min_value: Minimum random value (for numeric types). max_value: Maximum random value (for numeric types). seed: Optional random seed for reproducibility. count: Optional number of messages to generate. If None, generates indefinitely. value_type: Type of values to generate ("number", "string", "boolean"). """ super().__init__() self._datastreams = datastreams or ["value"] self._min_value = min_value self._max_value = max_value self._seed = seed self._count = count if value_type not in _VALID_VALUE_TYPES: raise ValueError( f"unsupported value_type {value_type!r}, must be one of: {', '.join(sorted(_VALID_VALUE_TYPES))}" ) self._value_type = value_type self._rng = random.Random(seed)
[docs] @override async def generate(self, clock: VirtualClock) -> AsyncGenerator[Message, None]: """Generate random messages. Parameters: clock: VirtualClock for time control. Yields: Messages with random values. """ if self._asset is None: return generated = 0 sample_interval = self._interval.total_seconds() while self._count is None or generated < self._count: # Pick a random datastream datastream = self._rng.choice(self._datastreams) resource = KRNAssetDataStream(self._asset, datastream) # Generate message based on type msg = self._generate_message(resource, clock) if msg: yield msg generated += 1 # Wait for next sample await clock.sleep(sample_interval)
def _generate_message(self, resource: KRNAssetDataStream, clock: VirtualClock) -> Optional[Message]: """Generate a random message. Parameters: resource: Target resource. clock: Clock for timestamp. Returns: Random message. """ timestamp = clock.now() value: Union[float, bool, str] if self._value_type == "number": value = self._rng.uniform(self._min_value, self._max_value) return Number(resource=resource, payload=value, timestamp=timestamp) elif self._value_type == "boolean": value = self._rng.choice([True, False]) return Boolean(resource=resource, payload=value, timestamp=timestamp) elif self._value_type == "string": # Generate a random string length = self._rng.randint(5, 20) chars = "abcdefghijklmnopqrstuvwxyz0123456789" value = "".join(self._rng.choice(chars) for _ in range(length)) return String(resource=resource, payload=value, timestamp=timestamp) return None
[docs] def with_datastreams(self, datastreams: list[str]) -> "RandomSource": """Set the datastreams to generate for. Parameters: datastreams: List of datastream names. Returns: Self for chaining. """ self._datastreams = datastreams return self
[docs] def with_range(self, min_value: float, max_value: float) -> "RandomSource": """Set the value range for numeric generation. Parameters: min_value: Minimum value. max_value: Maximum value. Returns: Self for chaining. """ self._min_value = min_value self._max_value = max_value return self
[docs] def with_count(self, count: int) -> "RandomSource": """Set the number of messages to generate. Parameters: count: Number of messages. Returns: Self for chaining. """ self._count = count return self