Source code for kelvin.testing.sources.dataframe_source

"""DataFrame data source for replaying recorded data."""

from __future__ import annotations

from collections.abc import Hashable, Iterator
from typing import Any, final

try:
    import pandas as pd
except ImportError as e:
    raise ImportError(
        "missing requirements to use this feature, install with `pip install 'kelvin-python-sdk[testing]'`"
    ) from e
from typing_extensions import override

from kelvin.testing.sources.tabular import TabularSource


[docs] @final class DataFrameSource(TabularSource): """Data source that replays messages from a pandas DataFrame. The DataFrame should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected. Usage: source = DataFrameSource(df, playback=True, now_offset=True) .with_asset("pump-001") # fallback if no asset_name column .with_timing(timedelta(seconds=1)) # Select specific columns source = DataFrameSource(df).with_columns("temperature", "pressure") # Map columns to datastream names source = DataFrameSource(df).with_column_mapping({ "temp_c": "temperature", "pressure_psi": "pressure", }) """
[docs] def __init__( self, df: pd.DataFrame, playback: bool = False, ignore_timestamps: bool = False, now_offset: bool = False, asset_column: str = "asset_name", timestamp_column: str = "timestamp", ) -> None: """Initialize the DataFrame source. Parameters: df: Pandas DataFrame containing the data. playback: When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in DataFrame. ignore_timestamps: When True, use virtual clock time instead of DataFrame timestamps. now_offset: When True, offset all timestamps so the first row starts at the virtual clock's current time. asset_column: Name of the column containing asset names. timestamp_column: Name of the timestamp column. """ super().__init__( playback=playback, ignore_timestamps=ignore_timestamps, now_offset=now_offset, asset_column=asset_column, timestamp_column=timestamp_column, ) self._df = df # Validate structure self._validate_playback()
@override def _get_columns(self) -> list[str]: """Get DataFrame column names. Returns: List of column names from DataFrame. """ return list(self._df.columns) @override def _has_timestamp_column(self) -> bool: """Check if DataFrame has timestamp column. Returns: True if timestamp column exists in DataFrame. """ return self._timestamp_col in self._df.columns @override def _is_empty(self) -> bool: """Check if DataFrame is empty. Returns: True if DataFrame has no rows. """ return self._df.empty def _sanitize_value(self, value: Any) -> Any: """Convert pandas types to Python natives. Converts: - pd.Timestamp -> datetime - pd.NA/NaN -> None - Other values pass through unchanged Parameters: value: Value to sanitize. Returns: Python native value. """ if pd.isna(value): # pyright: ignore[reportUnknownMemberType] return None if isinstance(value, pd.Timestamp): return value.to_pydatetime() return value def _sanitize_row(self, row: dict[Hashable, Any]) -> dict[str, Any]: """Sanitize all values in a row. Parameters: row: Row dictionary with potentially pandas types. Returns: Row dictionary with Python native types. """ return {str(k): self._sanitize_value(v) for k, v in row.items()} @override def _iter_rows(self) -> Iterator[dict[str, Any]]: """Iterate DataFrame rows with pandas types converted to Python natives. Uses to_dict('records') for better performance compared to iterrows(). This is typically 10-100x faster for large DataFrames. Yields: Row dictionaries mapping column names to Python native values. """ for record in self._df.to_dict("records"): # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] yield self._sanitize_row(record) # pyright: ignore[reportUnknownArgumentType]