Source code for kelvin.testing.sources.csv_source

"""CSV data source for replaying recorded data."""

from __future__ import annotations

import csv
import sys
from collections.abc import Iterator
from pathlib import Path
from typing import Any, final

from typing_extensions import override

from kelvin.testing.sources.tabular import TabularSource


[docs] @final class CSVSource(TabularSource): """Data source that replays messages from a CSV file. The CSV file should have columns for timestamp, asset_name, and one or more datastream columns. All columns except reserved ones (timestamp, asset_name) are treated as datastream values, unless specific columns are selected. Usage: source = CSVSource("data.csv", playback=True, now_offset=True) .with_asset("pump-001") # fallback if no asset_name column .with_timing(timedelta(seconds=1)) # Select specific columns source = CSVSource("data.csv").with_columns("temperature", "pressure") # Map columns to datastream names source = CSVSource("data.csv").with_column_mapping({ "temp_c": "temperature", "pressure_psi": "pressure", }) """
[docs] def __init__( self, path: Path | str, playback: bool = False, ignore_timestamps: bool = False, now_offset: bool = False, asset_column: str = "asset_name", timestamp_column: str = "timestamp", ) -> None: """Initialize the CSV source. Parameters: path: Path to the CSV file. playback: When True, wait between rows based on timestamp differences (real-time replay). Requires timestamps in CSV. ignore_timestamps: When True, use virtual clock time instead of CSV timestamps. now_offset: When True, offset all timestamps so the first row starts at the virtual clock's current time. asset_column: Name of the column containing asset names. timestamp_column: Name of the timestamp column. """ super().__init__( playback=playback, ignore_timestamps=ignore_timestamps, now_offset=now_offset, asset_column=asset_column, timestamp_column=timestamp_column, ) _ = csv.field_size_limit(sys.maxsize) self._path = Path(path) # Cache column info self._cached_columns: list[str] | None = None self._cached_has_timestamp: bool | None = None # Validate structure if file exists if self._path.exists(): self._read_columns() self._validate_playback()
def _read_columns(self) -> None: """Read and cache column names from CSV header.""" if self._cached_columns is not None: return with open(self._path, newline="", encoding="utf-8") as f: reader = csv.reader(f) empty: list[str] = [] headers = next(reader, empty) self._cached_columns = headers self._cached_has_timestamp = self._timestamp_col in headers @override def _get_columns(self) -> list[str]: """Get CSV column names. Returns: List of column names from CSV header. """ if self._cached_columns is None: if not self._path.exists(): return [] self._read_columns() return self._cached_columns or [] @override def _has_timestamp_column(self) -> bool: """Check if CSV has timestamp column. Returns: True if timestamp column exists in CSV. """ if self._cached_has_timestamp is None: if not self._path.exists(): return False self._read_columns() return self._cached_has_timestamp or False @override def _is_empty(self) -> bool: """Check if CSV file is empty or doesn't exist. Returns: True if CSV doesn't exist or has no data rows. """ if not self._path.exists(): return True with open(self._path, newline="", encoding="utf-8") as f: reader = csv.reader(f) _ = next(reader, None) # skip header return next(reader, None) is None @override def _iter_rows(self) -> Iterator[dict[str, Any]]: """Iterate CSV rows as dictionaries. Yields: Row dictionaries mapping column names to string values. """ if not self._path.exists(): return with open(self._path, newline="", encoding="utf-8") as f: yield from csv.DictReader(f)