Source code for kelvin.publisher.csv_publisher
from __future__ import annotations
import asyncio
import csv
import sys
from datetime import datetime, timedelta
from typing import AsyncGenerator, Optional
import arrow
from kelvin.krn import KRNAssetDataStream
from kelvin.publisher.server import DataGenerator, MessageData, PublisherError
[docs]
class CSVPublisher(DataGenerator):
CSV_ASSET_KEY = "asset_name"
def __init__(
self,
csv_file_path: str,
publish_interval: Optional[float] = None,
playback: bool = False,
ignore_timestamps: bool = False,
now_offset: bool = False,
):
csv.field_size_limit(sys.maxsize)
self.csv_file_path = csv_file_path
self.publish_rate = publish_interval
self.playback = playback
self.ignore_timestamps = ignore_timestamps
self.now_offset = now_offset
csv_file = open(self.csv_file_path)
csv_reader = csv.reader(csv_file)
headers = next(csv_reader)
self.csv_has_timestamp = "timestamp" in headers
self.use_csv_timestamps = self.csv_has_timestamp and not self.ignore_timestamps
if self.playback and not self.use_csv_timestamps:
raise PublisherError("csv must have timestamp column to use publish-interval csv timestamps")
[docs]
def parse_timestamp(self, ts_str: str, offset: timedelta = timedelta(0)) -> Optional[datetime]:
try:
timestamp = float(ts_str)
return arrow.get(timestamp).datetime + offset
except ValueError:
pass
try:
return arrow.get(ts_str).datetime + offset
except Exception as e:
print(f"csv: error parsing timestamp. timestamp={ts_str}", e)
return None
[docs]
async def run(self) -> AsyncGenerator[MessageData, None]:
csv_file = open(self.csv_file_path)
csv_reader = csv.reader(csv_file)
headers = next(csv_reader)
last_timestamp = datetime.max
ts_offset = timedelta(0)
row = next(csv_reader)
row_dict = dict(zip(headers, row))
timestamp = datetime.now()
row_ts_str = row_dict.pop("timestamp", "")
if self.use_csv_timestamps:
row_ts = self.parse_timestamp(row_ts_str)
if row_ts is None:
raise PublisherError(f"csv: invalid timestamp in first row. timestamp={row_ts_str}")
if self.now_offset:
ts_offset = timestamp.astimezone() - row_ts.astimezone()
timestamp = row_ts + ts_offset
asset = row_dict.pop(self.CSV_ASSET_KEY, "")
for r, v in row_dict.items():
if not v:
continue
yield MessageData(resource=KRNAssetDataStream(asset, r), value=v, timestamp=timestamp)
last_timestamp = timestamp
if self.publish_rate:
await asyncio.sleep(self.publish_rate)
for row in csv_reader:
row_dict = dict(zip(headers, row))
asset = row_dict.pop(self.CSV_ASSET_KEY, "")
row_ts_str = row_dict.pop("timestamp", "")
parsed_ts = self.parse_timestamp(row_ts_str, ts_offset) if self.use_csv_timestamps else datetime.now()
if parsed_ts is None:
print("csv: skipping row", row_dict)
continue
timestamp = parsed_ts
if self.playback:
# wait time between rows
wait_time = max((timestamp.astimezone() - last_timestamp.astimezone()).total_seconds(), 0)
last_timestamp = timestamp
await asyncio.sleep(wait_time)
for r, v in row_dict.items():
if not v:
continue
yield MessageData(resource=KRNAssetDataStream(asset, r), value=v, timestamp=timestamp)
if self.publish_rate:
await asyncio.sleep(self.publish_rate)
if self.playback and wait_time > 0:
# wait same time as last row, before replay
await asyncio.sleep(wait_time)
print("\nCSV ingestion is complete")