Source code for kelvin.publisher.csv_publisher
from __future__ import annotations
import asyncio
import csv
import sys
from collections.abc import AsyncGenerator
from datetime import datetime, timedelta
from typing import Optional, Union, cast
import arrow
import structlog
from kelvin.krn import KRNAssetDataStream
from kelvin.publisher.server import DataGenerator, MessageData, PublisherError
local_logger = cast(structlog.stdlib.BoundLogger, structlog.get_logger())
[docs]
class CSVPublisher(DataGenerator):
CSV_ASSET_KEYS = ["asset", "asset name", "asset_name"]
def __init__(
self,
csv_file_path: str,
publish_interval: Optional[float] = None,
playback: bool = False,
ignore_timestamps: bool = False,
now_offset: bool = False,
logger: structlog.stdlib.BoundLogger = local_logger,
):
csv.field_size_limit(sys.maxsize)
self.csv_file_path = csv_file_path
self.publish_rate = publish_interval
self.playback = playback
self.ignore_timestamps = ignore_timestamps
self.now_offset = now_offset
with open(self.csv_file_path, encoding="utf-8-sig") as csv_file:
csv_reader = csv.reader(csv_file)
self.headers = next(csv_reader)
_header_map = {header.lower(): header for header in self.headers}
self.asset_key = next((_header_map[header] for header in self.CSV_ASSET_KEYS if header in _header_map), None)
self.csv_has_timestamp = "timestamp" in self.headers
self.use_csv_timestamps = self.csv_has_timestamp and not self.ignore_timestamps
if self.playback and not self.use_csv_timestamps:
raise PublisherError("csv must have timestamp column to use publish-interval csv timestamps")
self.logger = logger
[docs]
def parse_timestamp(self, ts_str: str, offset: timedelta) -> Optional[datetime]:
timestamp: Union[str, float] = ts_str
try:
timestamp = float(ts_str)
except ValueError:
pass
try:
return arrow.get(timestamp).datetime + offset
except Exception as e:
self.logger.error("csv: error parsing timestamp.", timestamp=ts_str, error=e)
return None
[docs]
def pop_asset_key(self, row_dict: dict[str, str]) -> str:
return row_dict.pop(self.asset_key, "") if self.asset_key else ""
[docs]
async def run(self) -> AsyncGenerator[MessageData, None]:
with open(self.csv_file_path, encoding="utf-8-sig") as csv_file:
csv_reader = csv.reader(csv_file)
_ = next(csv_reader) # skip headers as they are already fetched
last_timestamp = datetime.max
ts_offset = timedelta(0)
timestamp = datetime.now()
wait_time = 0.0
first_row = True
for row in csv_reader:
row_dict = dict(zip(self.headers, row))
asset = self.pop_asset_key(row_dict)
row_ts_str = row_dict.pop("timestamp", "")
if self.use_csv_timestamps:
parsed_ts = self.parse_timestamp(row_ts_str, ts_offset)
if not parsed_ts:
if first_row:
raise PublisherError(f"csv: invalid timestamp in first row. timestamp={row_ts_str}")
self.logger.warning("csv: skipping row", row=row_dict)
continue
if first_row:
first_row = False
if self.now_offset:
ts_offset = timestamp.astimezone() - parsed_ts.astimezone()
parsed_ts = parsed_ts + ts_offset
else:
parsed_ts = datetime.now()
timestamp = parsed_ts
if self.playback:
# wait time between rows
wait_time = max((timestamp.astimezone() - last_timestamp.astimezone()).total_seconds(), 0.0)
last_timestamp = timestamp
await asyncio.sleep(wait_time)
for r, v in row_dict.items():
if not v:
continue
yield MessageData(resource=KRNAssetDataStream(asset, r), value=v, timestamp=timestamp)
if self.publish_rate:
await asyncio.sleep(self.publish_rate)
if self.playback and wait_time > 0:
# wait same time as last row, before replay
await asyncio.sleep(wait_time)
self.logger.info("CSV ingestion is complete")