Source code for kelvin.testing.sources.tabular

"""Base class for tabular data sources (CSV, DataFrame)."""

from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator, Iterator
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any

import arrow
from typing_extensions import Self, override

from kelvin.krn import KRNAssetDataStream
from kelvin.message import Boolean, Message, Number, String
from kelvin.testing.sources.base import DataSource

if TYPE_CHECKING:
    from kelvin.application.clock import VirtualClock

logger = logging.getLogger(__name__)


[docs] class TabularSource(DataSource, ABC): """Base class for tabular data sources (CSV, DataFrame). This class provides shared functionality for data sources that work with tabular data (rows and columns), including: - Timestamp parsing and offset management - Column selection and mapping - Message creation from typed values - Asset resolution from columns or fallback - Unified generate() implementation Subclasses must implement: - _iter_rows(): Yield rows as dictionaries - _get_columns(): Return list of column names - _has_timestamp_column(): Check if timestamp column exists """
[docs] def __init__( self, playback: bool = False, ignore_timestamps: bool = False, now_offset: bool = False, asset_column: str = "asset_name", timestamp_column: str = "timestamp", ) -> None: """Initialize the tabular source. Parameters: playback: When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in data. ignore_timestamps: When True, use virtual clock time instead of data timestamps. now_offset: When True, offset all timestamps so the first row starts at the virtual clock's current time. asset_column: Name of the column containing asset names. timestamp_column: Name of the timestamp column. """ super().__init__() self._playback: bool = playback self._ignore_timestamps: bool = ignore_timestamps self._now_offset: bool = now_offset self._asset_col: str = asset_column self._timestamp_col: str = timestamp_column # Column selection and mapping self._columns: list[str] | None = None self._column_mapping: dict[str, str] | None = None
# --- Column Configuration ---
[docs] def with_columns(self, *columns: str | list[str]) -> Self: """Select specific columns as inputs. Only the selected columns will be processed as datastreams. Can be called with varargs or a single list. Empty strings are filtered out. Parameters: columns: Column names to select. Returns: Self for chaining. Examples: source.with_columns("temperature", "pressure") source.with_columns(["temperature", "pressure"]) """ col_list: list[str] if len(columns) == 1 and isinstance(columns[0], list): col_list = columns[0] else: col_list = [str(c) for c in columns] # Filter out empty/whitespace-only column names self._columns = [c.strip() for c in col_list if c.strip()] return self
[docs] def with_column_mapping(self, mapping: dict[str, str]) -> Self: """Map column names to datastream names. Allows columns with arbitrary names to map to specific datastreams. Parameters: mapping: Dictionary mapping column names to datastream names. Returns: Self for chaining. Example: source.with_column_mapping({ "temp_c": "temperature", "pressure_psi": "pressure", }) """ self._column_mapping = mapping return self
# --- Abstract Methods for Subclasses --- @abstractmethod def _iter_rows(self) -> Iterator[dict[str, Any]]: """Yield rows as dictionaries. Each row should be a dictionary mapping column names to values. Yields: Row dictionaries. """ ... @abstractmethod def _get_columns(self) -> list[str]: """Get available column names. Returns: List of column names. """ ... @abstractmethod def _has_timestamp_column(self) -> bool: """Check if timestamp column exists. Returns: True if timestamp column exists. """ ... @abstractmethod def _is_empty(self) -> bool: """Check if data source is empty. Returns: True if data source has no rows. """ ... # --- Shared Implementation --- def _use_timestamps(self) -> bool: """Check if timestamps should be used from the data. Returns: True if timestamps should be parsed from data. """ return self._has_timestamp_column() and not self._ignore_timestamps def _validate_playback(self) -> None: """Validate playback mode requirements. Raises: ValueError: If playback mode requires timestamps but none available. """ if self._playback and not self._use_timestamps(): raise ValueError(f"data must have '{self._timestamp_col}' column to use playback mode") def _parse_timestamp(self, value: Any, offset: timedelta | None = None) -> datetime | None: """Parse a timestamp value using arrow for flexible format support. Handles multiple formats: - Numeric (epoch seconds as int or float) - String (ISO format or other arrow-supported formats) - datetime objects Parameters: value: Timestamp value to parse. offset: Timedelta offset to apply to the parsed timestamp. Returns: Parsed datetime or None if parsing fails. """ if value is None: return None actual_offset = offset if offset is not None else timedelta(0) # Handle datetime if isinstance(value, datetime): return value + actual_offset # Try as numeric (epoch seconds) try: timestamp = float(value) return arrow.get(timestamp).datetime + actual_offset except (ValueError, TypeError): pass # Try as string (ISO/arrow-supported format) try: return arrow.get(str(value)).datetime + actual_offset except Exception: return None def _create_message(self, asset: str, datastream: str, value: Any, timestamp: datetime) -> Message | None: """Create a typed message from a value. Automatically detects the value type and creates the appropriate message type (Boolean, Number, or String). Parameters: asset: Asset name. datastream: Datastream name. value: Value to convert. timestamp: Message timestamp. Returns: Message instance or None if value is empty/invalid. """ if value is None: return None resource = KRNAssetDataStream(asset, datastream) # Boolean (native) if isinstance(value, bool): return Boolean(resource=resource, payload=value, timestamp=timestamp) # Check string boolean values if isinstance(value, str): value_stripped = value.strip() if not value_stripped: return None value_lower = value_stripped.lower() if value_lower in ("true", "false"): return Boolean(resource=resource, payload=value_lower == "true", timestamp=timestamp) # Number try: payload = float(value) return Number(resource=resource, payload=payload, timestamp=timestamp) except (ValueError, TypeError): pass # String str_value = str(value).strip() if not str_value: return None return String(resource=resource, payload=str_value, timestamp=timestamp) def _get_asset(self, row: dict[str, Any]) -> str | None: """Get asset name from row or fallback. Priority: 1. Data column (asset_column) if exists and has value 2. Fallback to .with_asset() value Parameters: row: Row data dictionary. Returns: Asset name or None if not available. """ # Try data column first if self._asset_col in row: asset_value = row[self._asset_col] if asset_value is None: return self._asset asset_str = str(asset_value).strip() if asset_str: return asset_str # Fallback to configured asset return self._asset def _get_active_columns(self) -> list[str]: """Get columns to process as datastreams. Returns selected columns if configured, otherwise returns all non-reserved columns. Logs a warning if selected columns are not found. Returns: List of column names to process. """ reserved_columns = {self._timestamp_col, self._asset_col} available_columns = set(self._get_columns()) if self._columns is not None: # Warn about missing columns missing = set(self._columns) - available_columns - reserved_columns if missing: logger.warning("Configured columns not found in data: %s", sorted(missing)) # Use selected columns (filter out reserved and missing) return [c for c in self._columns if c not in reserved_columns and c in available_columns] # Use all non-reserved columns return [c for c in self._get_columns() if c not in reserved_columns] def _get_datastream_name(self, column: str) -> str: """Get datastream name for a column. Applies column mapping if configured. Parameters: column: Column name. Returns: Datastream name (mapped or original). """ if self._column_mapping is not None and column in self._column_mapping: return self._column_mapping[column] return column
[docs] @override async def generate(self, clock: VirtualClock) -> AsyncGenerator[Message, None]: """Generate messages from tabular data. Iterates through rows and yields messages at appropriate timestamps, using the virtual clock for time control. Processes active columns and applies column mapping. Parameters: clock: VirtualClock for time control. Yields: Message instances from tabular data. """ if self._is_empty(): return active_columns = self._get_active_columns() use_timestamps = self._use_timestamps() ts_offset = timedelta(0) last_timestamp: datetime | None = None is_first_row = True row_num = 0 for row in self._iter_rows(): row_num += 1 # Get asset for this row asset = self._get_asset(row) if not asset: logger.warning("Skipping row %d: no asset name available", row_num) continue # Parse timestamp timestamp: datetime if use_timestamps: ts_value = row.get(self._timestamp_col) parsed_ts = self._parse_timestamp(ts_value, ts_offset) if parsed_ts is None: logger.warning("Skipping row %d: invalid timestamp '%s'", row_num, ts_value) continue # Handle now_offset for first row if is_first_row and self._now_offset: ts_offset = clock.now().astimezone() - parsed_ts.astimezone() parsed_ts = parsed_ts + ts_offset timestamp = parsed_ts else: timestamp = clock.now() # Playback mode: wait based on timestamp differences if self._playback and last_timestamp is not None: wait_time = max( (timestamp.astimezone() - last_timestamp.astimezone()).total_seconds(), 0, ) if wait_time > 0: await clock.sleep(wait_time) last_timestamp = timestamp is_first_row = False # Yield messages for active columns for column in active_columns: value = row.get(column) datastream = self._get_datastream_name(column) msg = self._create_message(asset, datastream, value, timestamp) if msg: yield msg # Non-playback mode: use configured interval if not self._playback: await clock.sleep(self._interval.total_seconds())