"""Base class for tabular data sources (CSV, DataFrame)."""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator, Iterator
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
import arrow
from typing_extensions import Self, override
from kelvin.krn import KRNAssetDataStream
from kelvin.message import Boolean, Message, Number, String
from kelvin.testing.sources.base import DataSource
if TYPE_CHECKING:
from kelvin.application.clock import VirtualClock
logger = logging.getLogger(__name__)
[docs]
class TabularSource(DataSource, ABC):
"""Base class for tabular data sources (CSV, DataFrame).
This class provides shared functionality for data sources that work with
tabular data (rows and columns), including:
- Timestamp parsing and offset management
- Column selection and mapping
- Message creation from typed values
- Asset resolution from columns or fallback
- Unified generate() implementation
Subclasses must implement:
- _iter_rows(): Yield rows as dictionaries
- _get_columns(): Return list of column names
- _has_timestamp_column(): Check if timestamp column exists
"""
[docs]
def __init__(
self,
playback: bool = False,
ignore_timestamps: bool = False,
now_offset: bool = False,
asset_column: str = "asset_name",
timestamp_column: str = "timestamp",
) -> None:
"""Initialize the tabular source.
Parameters:
playback: When True, wait between rows based on timestamp differences
(real-time replay). Requires timestamps in data.
ignore_timestamps: When True, use virtual clock time instead of data
timestamps.
now_offset: When True, offset all timestamps so the first row starts
at the virtual clock's current time.
asset_column: Name of the column containing asset names.
timestamp_column: Name of the timestamp column.
"""
super().__init__()
self._playback: bool = playback
self._ignore_timestamps: bool = ignore_timestamps
self._now_offset: bool = now_offset
self._asset_col: str = asset_column
self._timestamp_col: str = timestamp_column
# Column selection and mapping
self._columns: list[str] | None = None
self._column_mapping: dict[str, str] | None = None
# --- Column Configuration ---
[docs]
def with_columns(self, *columns: str | list[str]) -> Self:
"""Select specific columns as inputs.
Only the selected columns will be processed as datastreams.
Can be called with varargs or a single list. Empty strings are filtered out.
Parameters:
columns: Column names to select.
Returns:
Self for chaining.
Examples:
source.with_columns("temperature", "pressure")
source.with_columns(["temperature", "pressure"])
"""
col_list: list[str]
if len(columns) == 1 and isinstance(columns[0], list):
col_list = columns[0]
else:
col_list = [str(c) for c in columns]
# Filter out empty/whitespace-only column names
self._columns = [c.strip() for c in col_list if c.strip()]
return self
[docs]
def with_column_mapping(self, mapping: dict[str, str]) -> Self:
"""Map column names to datastream names.
Allows columns with arbitrary names to map to specific datastreams.
Parameters:
mapping: Dictionary mapping column names to datastream names.
Returns:
Self for chaining.
Example:
source.with_column_mapping({
"temp_c": "temperature",
"pressure_psi": "pressure",
})
"""
self._column_mapping = mapping
return self
# --- Abstract Methods for Subclasses ---
@abstractmethod
def _iter_rows(self) -> Iterator[dict[str, Any]]:
"""Yield rows as dictionaries.
Each row should be a dictionary mapping column names to values.
Yields:
Row dictionaries.
"""
...
@abstractmethod
def _get_columns(self) -> list[str]:
"""Get available column names.
Returns:
List of column names.
"""
...
@abstractmethod
def _has_timestamp_column(self) -> bool:
"""Check if timestamp column exists.
Returns:
True if timestamp column exists.
"""
...
@abstractmethod
def _is_empty(self) -> bool:
"""Check if data source is empty.
Returns:
True if data source has no rows.
"""
...
# --- Shared Implementation ---
def _use_timestamps(self) -> bool:
"""Check if timestamps should be used from the data.
Returns:
True if timestamps should be parsed from data.
"""
return self._has_timestamp_column() and not self._ignore_timestamps
def _validate_playback(self) -> None:
"""Validate playback mode requirements.
Raises:
ValueError: If playback mode requires timestamps but none available.
"""
if self._playback and not self._use_timestamps():
raise ValueError(f"data must have '{self._timestamp_col}' column to use playback mode")
def _parse_timestamp(self, value: Any, offset: timedelta | None = None) -> datetime | None:
"""Parse a timestamp value using arrow for flexible format support.
Handles multiple formats:
- Numeric (epoch seconds as int or float)
- String (ISO format or other arrow-supported formats)
- datetime objects
Parameters:
value: Timestamp value to parse.
offset: Timedelta offset to apply to the parsed timestamp.
Returns:
Parsed datetime or None if parsing fails.
"""
if value is None:
return None
actual_offset = offset if offset is not None else timedelta(0)
# Handle datetime
if isinstance(value, datetime):
return value + actual_offset
# Try as numeric (epoch seconds)
try:
timestamp = float(value)
return arrow.get(timestamp).datetime + actual_offset
except (ValueError, TypeError):
pass
# Try as string (ISO/arrow-supported format)
try:
return arrow.get(str(value)).datetime + actual_offset
except Exception:
return None
def _create_message(self, asset: str, datastream: str, value: Any, timestamp: datetime) -> Message | None:
"""Create a typed message from a value.
Automatically detects the value type and creates the appropriate
message type (Boolean, Number, or String).
Parameters:
asset: Asset name.
datastream: Datastream name.
value: Value to convert.
timestamp: Message timestamp.
Returns:
Message instance or None if value is empty/invalid.
"""
if value is None:
return None
resource = KRNAssetDataStream(asset, datastream)
# Boolean (native)
if isinstance(value, bool):
return Boolean(resource=resource, payload=value, timestamp=timestamp)
# Check string boolean values
if isinstance(value, str):
value_stripped = value.strip()
if not value_stripped:
return None
value_lower = value_stripped.lower()
if value_lower in ("true", "false"):
return Boolean(resource=resource, payload=value_lower == "true", timestamp=timestamp)
# Number
try:
payload = float(value)
return Number(resource=resource, payload=payload, timestamp=timestamp)
except (ValueError, TypeError):
pass
# String
str_value = str(value).strip()
if not str_value:
return None
return String(resource=resource, payload=str_value, timestamp=timestamp)
def _get_asset(self, row: dict[str, Any]) -> str | None:
"""Get asset name from row or fallback.
Priority:
1. Data column (asset_column) if exists and has value
2. Fallback to .with_asset() value
Parameters:
row: Row data dictionary.
Returns:
Asset name or None if not available.
"""
# Try data column first
if self._asset_col in row:
asset_value = row[self._asset_col]
if asset_value is None:
return self._asset
asset_str = str(asset_value).strip()
if asset_str:
return asset_str
# Fallback to configured asset
return self._asset
def _get_active_columns(self) -> list[str]:
"""Get columns to process as datastreams.
Returns selected columns if configured, otherwise returns all
non-reserved columns. Logs a warning if selected columns are not found.
Returns:
List of column names to process.
"""
reserved_columns = {self._timestamp_col, self._asset_col}
available_columns = set(self._get_columns())
if self._columns is not None:
# Warn about missing columns
missing = set(self._columns) - available_columns - reserved_columns
if missing:
logger.warning("Configured columns not found in data: %s", sorted(missing))
# Use selected columns (filter out reserved and missing)
return [c for c in self._columns if c not in reserved_columns and c in available_columns]
# Use all non-reserved columns
return [c for c in self._get_columns() if c not in reserved_columns]
def _get_datastream_name(self, column: str) -> str:
"""Get datastream name for a column.
Applies column mapping if configured.
Parameters:
column: Column name.
Returns:
Datastream name (mapped or original).
"""
if self._column_mapping is not None and column in self._column_mapping:
return self._column_mapping[column]
return column
[docs]
@override
async def generate(self, clock: VirtualClock) -> AsyncGenerator[Message, None]:
"""Generate messages from tabular data.
Iterates through rows and yields messages at appropriate timestamps,
using the virtual clock for time control. Processes active columns
and applies column mapping.
Parameters:
clock: VirtualClock for time control.
Yields:
Message instances from tabular data.
"""
if self._is_empty():
return
active_columns = self._get_active_columns()
use_timestamps = self._use_timestamps()
ts_offset = timedelta(0)
last_timestamp: datetime | None = None
is_first_row = True
row_num = 0
for row in self._iter_rows():
row_num += 1
# Get asset for this row
asset = self._get_asset(row)
if not asset:
logger.warning("Skipping row %d: no asset name available", row_num)
continue
# Parse timestamp
timestamp: datetime
if use_timestamps:
ts_value = row.get(self._timestamp_col)
parsed_ts = self._parse_timestamp(ts_value, ts_offset)
if parsed_ts is None:
logger.warning("Skipping row %d: invalid timestamp '%s'", row_num, ts_value)
continue
# Handle now_offset for first row
if is_first_row and self._now_offset:
ts_offset = clock.now().astimezone() - parsed_ts.astimezone()
parsed_ts = parsed_ts + ts_offset
timestamp = parsed_ts
else:
timestamp = clock.now()
# Playback mode: wait based on timestamp differences
if self._playback and last_timestamp is not None:
wait_time = max(
(timestamp.astimezone() - last_timestamp.astimezone()).total_seconds(),
0,
)
if wait_time > 0:
await clock.sleep(wait_time)
last_timestamp = timestamp
is_first_row = False
# Yield messages for active columns
for column in active_columns:
value = row.get(column)
datastream = self._get_datastream_name(column)
msg = self._create_message(asset, datastream, value, timestamp)
if msg:
yield msg
# Non-playback mode: use configured interval
if not self._playback:
await clock.sleep(self._interval.total_seconds())