Source code for kelvin.publisher.csv_publisher

from __future__ import annotations

import asyncio
import csv
import sys
from collections.abc import AsyncGenerator
from datetime import datetime, timedelta
from typing import Optional, Union, cast

import arrow
import structlog

from kelvin.krn import KRNAssetDataStream
from kelvin.publisher.server import DataGenerator, MessageData, PublisherError

local_logger = cast(structlog.stdlib.BoundLogger, structlog.get_logger())


[docs] class CSVPublisher(DataGenerator): CSV_ASSET_KEYS = ["asset", "asset name", "asset_name"] def __init__( self, csv_file_path: str, publish_interval: Optional[float] = None, playback: bool = False, ignore_timestamps: bool = False, now_offset: bool = False, logger: structlog.stdlib.BoundLogger = local_logger, ): csv.field_size_limit(sys.maxsize) self.csv_file_path = csv_file_path self.publish_rate = publish_interval self.playback = playback self.ignore_timestamps = ignore_timestamps self.now_offset = now_offset with open(self.csv_file_path, encoding="utf-8-sig") as csv_file: csv_reader = csv.reader(csv_file) self.headers = next(csv_reader) _header_map = {header.lower(): header for header in self.headers} self.asset_key = next((_header_map[header] for header in self.CSV_ASSET_KEYS if header in _header_map), None) self.csv_has_timestamp = "timestamp" in self.headers self.use_csv_timestamps = self.csv_has_timestamp and not self.ignore_timestamps if self.playback and not self.use_csv_timestamps: raise PublisherError("csv must have timestamp column to use publish-interval csv timestamps") self.logger = logger
[docs] def parse_timestamp(self, ts_str: str, offset: timedelta) -> Optional[datetime]: timestamp: Union[str, float] = ts_str try: timestamp = float(ts_str) except ValueError: pass try: return arrow.get(timestamp).datetime + offset except Exception as e: self.logger.error("csv: error parsing timestamp.", timestamp=ts_str, error=e) return None
[docs] def pop_asset_key(self, row_dict: dict[str, str]) -> str: return row_dict.pop(self.asset_key, "") if self.asset_key else ""
[docs] async def run(self) -> AsyncGenerator[MessageData, None]: with open(self.csv_file_path, encoding="utf-8-sig") as csv_file: csv_reader = csv.reader(csv_file) _ = next(csv_reader) # skip headers as they are already fetched last_timestamp = datetime.max ts_offset = timedelta(0) timestamp = datetime.now() wait_time = 0.0 first_row = True for row in csv_reader: row_dict = dict(zip(self.headers, row)) asset = self.pop_asset_key(row_dict) row_ts_str = row_dict.pop("timestamp", "") if self.use_csv_timestamps: parsed_ts = self.parse_timestamp(row_ts_str, ts_offset) if not parsed_ts: if first_row: raise PublisherError(f"csv: invalid timestamp in first row. timestamp={row_ts_str}") self.logger.warning("csv: skipping row", row=row_dict) continue if first_row: first_row = False if self.now_offset: ts_offset = timestamp.astimezone() - parsed_ts.astimezone() parsed_ts = parsed_ts + ts_offset else: parsed_ts = datetime.now() timestamp = parsed_ts if self.playback: # wait time between rows wait_time = max((timestamp.astimezone() - last_timestamp.astimezone()).total_seconds(), 0.0) last_timestamp = timestamp await asyncio.sleep(wait_time) for r, v in row_dict.items(): if not v: continue yield MessageData(resource=KRNAssetDataStream(asset, r), value=v, timestamp=timestamp) if self.publish_rate: await asyncio.sleep(self.publish_rate) if self.playback and wait_time > 0: # wait same time as last row, before replay await asyncio.sleep(wait_time) self.logger.info("CSV ingestion is complete")