from __future__ import annotations
import asyncio
import csv
from abc import ABC, abstractmethod
from asyncio import Queue, StreamReader, StreamWriter
from datetime import datetime
from pathlib import Path
from typing import Any, AsyncGenerator, Callable, Dict, List, Optional, Tuple, Type, Union
import yaml
from pydantic import ValidationError
from pydantic.dataclasses import dataclass
from kelvin.application.config import ConfigurationError
from kelvin.application.stream import KelvinStreamConfig
from kelvin.config.appyaml import (
AppBridge,
AppKelvin,
AppYaml,
AssetsEntry,
Metric,
ParameterDefinition,
)
from kelvin.config.common import AppTypes
from kelvin.config.manifest import IOWay
from kelvin.config.parser import AppConfigObj
from kelvin.config.smart_app import IOConfig, SmartAppConfig, SmartAppParams
from kelvin.krn import KRN, KRNAsset, KRNAssetDataStream, KRNAssetParameter, KRNWorkloadAppVersion
from kelvin.message import (
KMessageType,
KMessageTypeControl,
KMessageTypeData,
KMessageTypeDataTag,
KMessageTypeParameter,
KMessageTypeRecommendation,
Message,
)
from kelvin.message.msg_builders import MessageBuilder
from kelvin.message.msg_type import PrimitiveTypes
from kelvin.message.runtime_manifest import (
ManifestDatastream,
Resource,
ResourceDatastream,
RuntimeManifest,
RuntimeManifestPayload,
)
[docs]
def flatten_dict(d: Dict, parent_key: str = "", sep: str = ".") -> Dict:
items: list = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, Dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
[docs]
def parse_assets_csv(csv_file_path: str) -> List[AssetsEntry]:
non_properties = ["Name ID", "Display Name", "Asset Type Name ID"]
with open(csv_file_path) as f:
csv_reader = csv.DictReader(f)
return [
AssetsEntry(
name=row["Name ID"], properties={k.lower(): v for k, v in row.items() if k not in non_properties}
)
for row in csv_reader
]
[docs]
def string_to_strict_type(value: Any, data_type: Type) -> Union[bool, float, str, dict]:
if isinstance(value, data_type):
return value
if data_type is bool:
return str(value).lower() in ["true", "1"]
if data_type is float:
return float(value)
return value
[docs]
def msg_type_param_dict(msg_type_on_config: str) -> Dict:
"""To parse different arguments of KMessageTypePrimitive if type is a object with icd"""
if msg_type_on_config in PrimitiveTypes.__members__:
return {"primitive": msg_type_on_config}
return {"primitive": "object", "icd": msg_type_on_config}
[docs]
def inject_test_config(manifest: RuntimeManifest, config_file: Path = Path("config.yaml")) -> RuntimeManifest:
if not config_file.exists():
return manifest
try:
with open(config_file) as f:
config_content = yaml.safe_load(f)
if config_content and isinstance(config_content, dict):
print("Injecting test configuration from", config_file)
manifest.payload.configuration = config_content
except Exception:
# If there's any error reading or parsing the file, just return the manifest as-is
pass
return manifest
[docs]
class KelvinPublisherConfig(KelvinStreamConfig):
model_config = {"env_prefix": "KELVIN_PUBLISHER_"}
ip: str = "0.0.0.0"
[docs]
class PublisherError(Exception):
pass
[docs]
class PublishServer:
CYCLE_TIMEOUT_S = 0.25
NODE = "test_node"
WORKLOAD = "test_workload"
app_config: AppConfigObj
allowed_assets: Optional[List[AssetsEntry]] = None
asset_params: Dict[Tuple[str, str], Union[bool, float, str]] = {}
on_message: Callable[[Message], None]
write_queue: Queue[Message]
def __init__(self, conf: AppConfigObj, generator: DataGenerator, replay: bool = False) -> None:
self.app_config = conf
if (
isinstance(self.app_config.config, AppYaml)
and self.app_config.type == AppTypes.kelvin_app
and self.app_config.config.app.kelvin is not None
):
assets = self.app_config.config.app.kelvin.assets
if assets:
self.allowed_assets = assets
elif (
isinstance(self.app_config.config, AppYaml)
and self.app_config.type == AppTypes.bridge
and self.app_config.config.app.bridge is not None
):
metrics_map = self.app_config.config.app.bridge.metrics_map
if metrics_map:
unique_asset_names = {metric.asset_name for metric in metrics_map}
self.allowed_assets = [AssetsEntry(name=name) for name in unique_asset_names]
self.writer = None
self.on_message = log_message
self.write_queue = Queue()
self.config = KelvinPublisherConfig()
self.running = False
self.generator = generator
# replay re-runs generator if it returns
self.replay = replay
[docs]
def update_param(self, asset: str, param: str, value: Union[bool, float, str]) -> None:
"""Sets an asset parameter.
Empty asset ("") to change app default
Args:
asset (Optional[str]): asset name (empty ("") for fallback)
param (str): param name
value (Union[bool, float, str]): param value
"""
value_lower = str(value).strip().lower()
if value_lower in ["true", "false"]:
value = value_lower == "true"
elif value_lower.isnumeric():
value = float(value)
self.asset_params[(asset, param)] = value
[docs]
def bridge_app_yaml_to_runtime(self, bridge: AppBridge) -> RuntimeManifest:
asset_metrics_map: Dict[str, Resource] = {}
metric_datastream_map: Dict[str, ManifestDatastream] = {}
for metric in bridge.metrics_map:
resource = asset_metrics_map.setdefault(metric.asset_name, Resource(resource=KRNAsset(metric.asset_name)))
resource.datastreams[metric.name] = ResourceDatastream(
map_to=metric.name, access=metric.access, owned=True, configuration=metric.configuration
)
metric_datastream_map.setdefault(
metric.name, ManifestDatastream(name=metric.name, primitive_type_name=metric.data_type)
)
resources = list(asset_metrics_map.values())
datastreams = list(metric_datastream_map.values())
return RuntimeManifest(
resource=KRNWorkloadAppVersion(
node=self.NODE,
workload=self.WORKLOAD,
app=self.app_config.name,
version=self.app_config.version,
),
payload=RuntimeManifestPayload(
resources=resources, configuration=bridge.configuration, datastreams=datastreams
),
)
[docs]
def kelvin_app_yaml_to_runtime(
self, kelvin: AppKelvin, allowed_assets: List[AssetsEntry] | None
) -> RuntimeManifest:
if allowed_assets is None:
allowed_assets = kelvin.assets
manif_ds_map: Dict[str, ManifestDatastream] = {}
resource_ds_map: Dict[str, ResourceDatastream] = {}
for input in kelvin.inputs:
ds_name = input.name
owned = input.control_change
access = "WO" if owned else "RO"
resource_ds_map[ds_name] = ResourceDatastream(map_to=ds_name, access=access, owned=owned)
manif_ds_map[ds_name] = ManifestDatastream(name=ds_name, primitive_type_name=input.data_type)
for output in kelvin.outputs:
ds_name = output.name
owned = not output.control_change
access = "RO" if owned else "WO"
resource_ds = resource_ds_map.setdefault(
ds_name, ResourceDatastream(map_to=ds_name, access=access, owned=owned)
)
if resource_ds.access != access:
resource_ds.access = "RW"
manif_ds = manif_ds_map.setdefault(
ds_name, ManifestDatastream(name=ds_name, primitive_type_name=output.data_type)
)
if manif_ds.primitive_type_name != output.data_type:
raise ConfigurationError(f"data type mismatch for output {ds_name}")
resources: List[Resource] = []
for asset in allowed_assets:
asset_params = {}
for param in kelvin.parameters:
payload = (
self.asset_params.get((asset.name, param.name)) # asset override
or self.asset_params.get(("", param.name)) # asset override default ("")
or next( # asset parameter defined in configuration
(
asset.parameters.get(param.name, {}).get("value")
for asset in kelvin.assets
if asset.name == asset
),
None,
)
or (param.default.get("value", None) if param.default else None) # app defaults
)
if payload is None:
# asset has no parameter and parameter doesn't have default value
continue
try:
if param.data_type == "number":
payload = float(payload)
elif param.data_type == "string":
payload = str(payload)
elif param.data_type == "boolean":
payload = str(payload).lower() in ["true", "1"]
except ValueError:
continue
asset_params[param.name] = payload
resources.append(
Resource(
resource=KRNAsset(asset.name),
parameters=asset_params,
properties=asset.properties,
datastreams=resource_ds_map,
)
)
return RuntimeManifest(
resource=KRNWorkloadAppVersion(
node=self.NODE,
workload=self.WORKLOAD,
app=self.app_config.name,
version=self.app_config.version,
),
payload=RuntimeManifestPayload(
resources=resources, configuration=kelvin.configuration, datastreams=list(manif_ds_map.values())
),
)
[docs]
def runtime_from_app_manifest(self) -> RuntimeManifest:
app_manf = self.app_config.to_app_manifest(read_schemas=False)
configuration = {}
if app_manf.defaults and app_manf.defaults.app:
configuration = app_manf.defaults.app.configuration
datastreams = []
resources = []
for io in app_manf.io:
datastreams.append(
ManifestDatastream(
name=io.name,
primitive_type_name=io.data_type,
unit_name=io.unit,
)
)
for asset in self.allowed_assets or []:
resource_parameters = {}
resource_datastreams = {}
for io in app_manf.io:
owned = False
access = "RO"
if io.way == IOWay.output:
owned = True
access = "RO"
elif io.way == IOWay.input_cc:
owned = True
access = "WO"
elif io.way == IOWay.input_cc_output:
owned = True
access = "RW"
elif io.way == IOWay.input:
owned = False
access = "RO"
elif io.way == IOWay.output_cc:
owned = False
access = "WO"
elif io.way == IOWay.input_output_cc:
owned = False
access = "RW"
resource_datastreams[io.name] = ResourceDatastream(
map_to=io.name,
access=access,
owned=owned,
)
for param in app_manf.parameters:
payload = (
self.asset_params.get((asset.name, param.name)) # asset override
or self.asset_params.get(("", param.name)) # asset override default ("")
or asset.parameters.get(param.name) # asset parameter defined in configuration
or param.default # app defaults
)
if payload is None:
# asset has no parameter and parameter doesn't have default value
continue
try:
if param.data_type == "number":
payload = float(payload)
elif param.data_type == "string":
payload = str(payload)
elif param.data_type == "boolean":
payload = str(payload).lower() in ["true", "1"]
except ValueError:
continue
resource_parameters[param.name] = payload
resources.append(
Resource(
resource=KRNAsset(asset.name),
parameters=resource_parameters,
properties=asset.properties,
datastreams=resource_datastreams,
)
)
return RuntimeManifest(
resource=KRNWorkloadAppVersion(
node=self.NODE,
workload=self.WORKLOAD,
app=self.app_config.name,
version=self.app_config.version,
),
payload=RuntimeManifestPayload(resources=resources, configuration=configuration, datastreams=datastreams),
)
[docs]
def build_config_message(self) -> RuntimeManifest:
if (
isinstance(self.app_config.config, AppYaml)
and self.app_config.type == AppTypes.bridge
and self.app_config.config.app.bridge is not None
):
manifest = self.bridge_app_yaml_to_runtime(self.app_config.config.app.bridge)
elif (
isinstance(self.app_config.config, AppYaml)
and self.app_config.type == AppTypes.kelvin_app
and self.app_config.config.app.kelvin is not None
):
manifest = self.kelvin_app_yaml_to_runtime(self.app_config.config.app.kelvin, self.allowed_assets)
else:
manifest = self.runtime_from_app_manifest()
manifest = inject_test_config(manifest)
return manifest
[docs]
async def start_server(self) -> None:
server = await asyncio.start_server(self.new_client, self.config.ip, self.config.port, limit=self.config.limit)
print(f"Publisher started. Listening on {self.config.ip}:{self.config.port}")
async with server:
await server.serve_forever()
[docs]
async def new_client(self, reader: StreamReader, writer: StreamWriter) -> None:
if self.running is True:
writer.close()
return
print("Connected")
self.running = True
gen_task = asyncio.create_task(self.handle_generator(self.generator))
connection_tasks = {
asyncio.create_task(self.handle_read(reader)),
asyncio.create_task(self.handle_write(writer, self.write_queue)),
}
try:
config_msg = self.build_config_message()
writer.write(config_msg.encode() + b"\n")
except ConfigurationError as e:
print("Configuration error:", e)
writer.close()
self.running = False
try:
await writer.drain()
except ConnectionResetError:
pass
_, pending = await asyncio.wait(connection_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
if not gen_task.done():
gen_task.cancel()
self.running = False
print("Disconnected")
[docs]
async def handle_read(self, reader: StreamReader) -> None:
while self.running:
data = await reader.readline()
if not len(data):
break
try:
msg = Message.model_validate_json(data)
self.on_message(msg)
except Exception as e:
print("error parsing message", e)
[docs]
async def handle_write(self, writer: StreamWriter, queue: Queue[Message]) -> None:
while self.running and not writer.is_closing():
try:
msg = await asyncio.wait_for(queue.get(), timeout=self.CYCLE_TIMEOUT_S)
except asyncio.TimeoutError:
continue
writer.write(msg.encode() + b"\n")
try:
await writer.drain()
except ConnectionResetError:
pass
[docs]
async def handle_generator(self, generator: DataGenerator) -> None:
first_run = True
while first_run or self.replay:
first_run = False
async for data in generator.run():
if isinstance(data, MessageData):
await self.publish_data(data)
elif isinstance(data, Message):
await self.publish_unsafe(data)
elif isinstance(data, MessageBuilder):
await self.publish_unsafe(data.to_message())
[docs]
async def publish_unsafe(self, msg: Message) -> None:
"""Publish the message as is, do not validate it against the app configuration
Args:
msg (Message): message to publish
"""
await self.write_queue.put(msg)
[docs]
async def publish_data(self, data: MessageData) -> bool:
allowed_asset_names = []
if self.allowed_assets is not None:
allowed_asset_names = [asset.name for asset in self.allowed_assets]
if data.resource.asset and data.resource.asset not in allowed_asset_names:
print(f"error publishing: asset not allowed to app. asset={data.resource.asset}")
return False
# if data.asset is empty publish to all allowed_assets (if set)
assets = [data.resource.asset] if data.resource.asset else allowed_asset_names
if assets is None:
print("error publishing to empty asset: no allowed assets set")
return False
msg_type: KMessageType
app_resource: Union[Metric, ParameterDefinition, IOConfig, SmartAppParams, None] = None
msg_resource_builder: Optional[type[KRN]] = None
if (
isinstance(self.app_config.config, AppYaml)
and self.app_config.type == AppTypes.kelvin_app
and self.app_config.config.app.kelvin is not None
):
try:
# check is app input
app_resource = next(
i for i in self.app_config.config.app.kelvin.inputs if i.name == data.resource.data_stream
)
msg_type = KMessageTypeData(**msg_type_param_dict(app_resource.data_type))
msg_resource_builder = KRNAssetDataStream
except StopIteration:
try:
# check is app param
app_resource = next(
p for p in self.app_config.config.app.kelvin.parameters if p.name == data.resource.data_stream
)
msg_type = KMessageTypeParameter(**msg_type_param_dict(app_resource.data_type))
msg_resource_builder = KRNAssetParameter
except StopIteration:
app_resource = None
elif (
isinstance(self.app_config.config, AppYaml)
and self.app_config.type == AppTypes.bridge
and self.app_config.config.app.bridge is not None
):
try:
app_resource = next(
Metric(name=m.name, data_type=m.data_type)
for m in self.app_config.config.app.bridge.metrics_map
if m.name == data.resource.data_stream
)
msg_type = KMessageTypeData(**msg_type_param_dict(app_resource.data_type))
msg_resource_builder = KRNAssetDataStream
except StopIteration:
app_resource = None
elif isinstance(self.app_config.config, SmartAppConfig):
try:
app_resource = next(
i for i in self.app_config.config.data_streams.inputs if i.name == data.resource.data_stream
)
msg_type = KMessageTypeData(**msg_type_param_dict(app_resource.data_type))
msg_resource_builder = KRNAssetDataStream
except StopIteration:
try:
app_resource = next(
i for i in self.app_config.config.control_changes.inputs if i.name == data.resource.data_stream
)
msg_type = KMessageTypeData(**msg_type_param_dict(app_resource.data_type))
msg_resource_builder = KRNAssetDataStream
except StopIteration:
try:
app_resource = next(
i for i in self.app_config.config.parameters if i.name == data.resource.data_stream
)
msg_type = KMessageTypeParameter(**msg_type_param_dict(app_resource.data_type.value))
msg_resource_builder = KRNAssetParameter
except StopIteration:
app_resource = None
if app_resource is None or msg_resource_builder is None:
# invalid resource for this app
print(f"error publishing: invalid resource to app. resource={data.resource!s}")
return False
for asset in assets:
try:
msg = Message(
type=msg_type,
timestamp=data.timestamp or datetime.now().astimezone(),
resource=msg_resource_builder(asset, data.resource.data_stream),
)
msg.payload = string_to_strict_type(data.value, type(msg.payload))
await self.write_queue.put(msg)
except (ValidationError, ValueError) as e:
print(
(
"error publishing value: invalid value for resource."
f" resource={data.resource!s}, value={data.value}"
),
e,
)
return True
[docs]
def log_message(msg: Message) -> None:
msg_log = ""
if isinstance(msg.type, KMessageTypeData):
msg_log = "Data "
elif isinstance(msg.type, KMessageTypeControl):
msg_log = "Control Change "
elif isinstance(msg.type, KMessageTypeRecommendation):
msg_log = "Recommendation "
elif isinstance(msg.type, KMessageTypeDataTag):
msg_log = "Data Tag "
print(f"\nReceived {msg_log}Message:\n", repr(msg))
[docs]
@dataclass
class MessageData:
resource: KRNAssetDataStream
timestamp: Optional[datetime]
value: Any
[docs]
@dataclass
class AppIO:
name: str
data_type: str
asset: str
[docs]
class DataGenerator(ABC):
[docs]
@abstractmethod
async def run(self) -> AsyncGenerator[Union[MessageData, Message, MessageBuilder], None]:
if False:
yield # trick for mypy