Source code for kelvin.testing.sources.dataframe_source
"""DataFrame data source for replaying recorded data."""
from __future__ import annotations
from collections.abc import Hashable, Iterator
from typing import Any, final
try:
import pandas as pd
except ImportError as e:
raise ImportError(
"missing requirements to use this feature, install with `pip install 'kelvin-python-sdk[testing]'`"
) from e
from typing_extensions import override
from kelvin.testing.sources.tabular import TabularSource
[docs]
@final
class DataFrameSource(TabularSource):
"""Data source that replays messages from a pandas DataFrame.
The DataFrame should have columns for timestamp, asset_name, and one or more
datastream columns. All columns except reserved ones (timestamp, asset_name)
are treated as datastream values, unless specific columns are selected.
Usage:
source = DataFrameSource(df, playback=True, now_offset=True)
.with_asset("pump-001") # fallback if no asset_name column
.with_timing(timedelta(seconds=1))
# Select specific columns
source = DataFrameSource(df).with_columns("temperature", "pressure")
# Map columns to datastream names
source = DataFrameSource(df).with_column_mapping({
"temp_c": "temperature",
"pressure_psi": "pressure",
})
"""
[docs]
def __init__(
self,
df: pd.DataFrame,
playback: bool = False,
ignore_timestamps: bool = False,
now_offset: bool = False,
asset_column: str = "asset_name",
timestamp_column: str = "timestamp",
) -> None:
"""Initialize the DataFrame source.
Parameters:
df: Pandas DataFrame containing the data.
playback: When True, wait between rows based on timestamp differences
(real-time replay). Requires timestamps in DataFrame.
ignore_timestamps: When True, use virtual clock time instead of DataFrame
timestamps.
now_offset: When True, offset all timestamps so the first row starts
at the virtual clock's current time.
asset_column: Name of the column containing asset names.
timestamp_column: Name of the timestamp column.
"""
super().__init__(
playback=playback,
ignore_timestamps=ignore_timestamps,
now_offset=now_offset,
asset_column=asset_column,
timestamp_column=timestamp_column,
)
self._df = df
# Validate structure
self._validate_playback()
@override
def _get_columns(self) -> list[str]:
"""Get DataFrame column names.
Returns:
List of column names from DataFrame.
"""
return list(self._df.columns)
@override
def _has_timestamp_column(self) -> bool:
"""Check if DataFrame has timestamp column.
Returns:
True if timestamp column exists in DataFrame.
"""
return self._timestamp_col in self._df.columns
@override
def _is_empty(self) -> bool:
"""Check if DataFrame is empty.
Returns:
True if DataFrame has no rows.
"""
return self._df.empty
def _sanitize_value(self, value: Any) -> Any:
"""Convert pandas types to Python natives.
Converts:
- pd.Timestamp -> datetime
- pd.NA/NaN -> None
- Other values pass through unchanged
Parameters:
value: Value to sanitize.
Returns:
Python native value.
"""
if pd.isna(value): # pyright: ignore[reportUnknownMemberType]
return None
if isinstance(value, pd.Timestamp):
return value.to_pydatetime()
return value
def _sanitize_row(self, row: dict[Hashable, Any]) -> dict[str, Any]:
"""Sanitize all values in a row.
Parameters:
row: Row dictionary with potentially pandas types.
Returns:
Row dictionary with Python native types.
"""
return {str(k): self._sanitize_value(v) for k, v in row.items()}
@override
def _iter_rows(self) -> Iterator[dict[str, Any]]:
"""Iterate DataFrame rows with pandas types converted to Python natives.
Uses to_dict('records') for better performance compared to iterrows().
This is typically 10-100x faster for large DataFrames.
Yields:
Row dictionaries mapping column names to Python native values.
"""
for record in self._df.to_dict("records"): # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
yield self._sanitize_row(record) # pyright: ignore[reportUnknownArgumentType]