Source code for kelvin.testing.sources.csv_source
"""CSV data source for replaying recorded data."""
from __future__ import annotations
import csv
import sys
from collections.abc import Iterator
from pathlib import Path
from typing import Any, final
from typing_extensions import override
from kelvin.testing.sources.tabular import TabularSource
[docs]
@final
class CSVSource(TabularSource):
"""Data source that replays messages from a CSV file.
The CSV file should have columns for timestamp, asset_name, and one or more
datastream columns. All columns except reserved ones (timestamp, asset_name)
are treated as datastream values, unless specific columns are selected.
Usage:
source = CSVSource("data.csv", playback=True, now_offset=True)
.with_asset("pump-001") # fallback if no asset_name column
.with_timing(timedelta(seconds=1))
# Select specific columns
source = CSVSource("data.csv").with_columns("temperature", "pressure")
# Map columns to datastream names
source = CSVSource("data.csv").with_column_mapping({
"temp_c": "temperature",
"pressure_psi": "pressure",
})
"""
[docs]
def __init__(
self,
path: Path | str,
playback: bool = False,
ignore_timestamps: bool = False,
now_offset: bool = False,
asset_column: str = "asset_name",
timestamp_column: str = "timestamp",
) -> None:
"""Initialize the CSV source.
Parameters:
path: Path to the CSV file.
playback: When True, wait between rows based on timestamp differences
(real-time replay). Requires timestamps in CSV.
ignore_timestamps: When True, use virtual clock time instead of CSV
timestamps.
now_offset: When True, offset all timestamps so the first row starts
at the virtual clock's current time.
asset_column: Name of the column containing asset names.
timestamp_column: Name of the timestamp column.
"""
super().__init__(
playback=playback,
ignore_timestamps=ignore_timestamps,
now_offset=now_offset,
asset_column=asset_column,
timestamp_column=timestamp_column,
)
_ = csv.field_size_limit(sys.maxsize)
self._path = Path(path)
# Cache column info
self._cached_columns: list[str] | None = None
self._cached_has_timestamp: bool | None = None
# Validate structure if file exists
if self._path.exists():
self._read_columns()
self._validate_playback()
def _read_columns(self) -> None:
"""Read and cache column names from CSV header."""
if self._cached_columns is not None:
return
with open(self._path, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
empty: list[str] = []
headers = next(reader, empty)
self._cached_columns = headers
self._cached_has_timestamp = self._timestamp_col in headers
@override
def _get_columns(self) -> list[str]:
"""Get CSV column names.
Returns:
List of column names from CSV header.
"""
if self._cached_columns is None:
if not self._path.exists():
return []
self._read_columns()
return self._cached_columns or []
@override
def _has_timestamp_column(self) -> bool:
"""Check if CSV has timestamp column.
Returns:
True if timestamp column exists in CSV.
"""
if self._cached_has_timestamp is None:
if not self._path.exists():
return False
self._read_columns()
return self._cached_has_timestamp or False
@override
def _is_empty(self) -> bool:
"""Check if CSV file is empty or doesn't exist.
Returns:
True if CSV doesn't exist or has no data rows.
"""
if not self._path.exists():
return True
with open(self._path, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
_ = next(reader, None) # skip header
return next(reader, None) is None
@override
def _iter_rows(self) -> Iterator[dict[str, Any]]:
"""Iterate CSV rows as dictionaries.
Yields:
Row dictionaries mapping column names to string values.
"""
if not self._path.exists():
return
with open(self._path, newline="", encoding="utf-8") as f:
yield from csv.DictReader(f)