Source code for kelvin.publisher.simulator

from __future__ import annotations

import asyncio
import random
from typing import AsyncGenerator, Dict, List, Union

from kelvin.application.config import ConfigurationError
from kelvin.config.appyaml import (
    AppYaml,
    AssetsEntry,
)
from kelvin.config.common import AppTypes
from kelvin.config.exporter import ExporterConfig
from kelvin.config.external import ExternalConfig
from kelvin.config.importer import ImporterConfig
from kelvin.config.smart_app import SmartAppConfig
from kelvin.krn import KRNAssetDataStream
from kelvin.publisher.server import AppIO, DataGenerator, MessageData


[docs] class Simulator(DataGenerator): app_yaml: str app_config: ExporterConfig | ImporterConfig | SmartAppConfig | ExternalConfig | AppYaml rand_min: float rand_max: float random: bool current_value: float assets: List[AssetsEntry] params_override: Dict[str, Union[bool, float, str]] def __init__( self, app_config: ExporterConfig | ImporterConfig | SmartAppConfig | ExternalConfig | AppYaml, period: float, rand_min: float = 0, rand_max: float = 100, random: bool = True, assets_extra: List[AssetsEntry] = [], parameters_override: List[str] = [], ): self.app_config = app_config self.period = period self.rand_min = rand_min self.rand_max = rand_max self.random = random self.current_value = self.rand_min - 1 self.params_override: Dict[str, Union[bool, float, str]] = {} for override in parameters_override: param, value = override.split("=", 1) self.params_override[param] = value if len(assets_extra) > 0: self.assets = assets_extra
[docs] def generate_random_value(self, data_type: str) -> Union[bool, float, str, dict]: if data_type == "boolean": return random.choice([True, False]) if self.random: number = round(random.random() * (self.rand_max - self.rand_min) + self.rand_min, 2) else: if self.current_value >= self.rand_max: self.current_value = self.rand_min else: self.current_value += 1 number = self.current_value if data_type == "number": return number if data_type == "string": return f"str_{number}" # object or other icd return {"key": number}
[docs] async def run(self) -> AsyncGenerator[MessageData, None]: app_inputs: List[AppIO] = [] if ( isinstance(self.app_config, AppYaml) and self.app_config.app.type == AppTypes.kelvin_app and self.app_config.app.kelvin is not None ): for asset in self.assets: for app_input in self.app_config.app.kelvin.inputs: app_inputs.append(AppIO(name=app_input.name, data_type=app_input.data_type, asset=asset.name)) elif ( isinstance(self.app_config, AppYaml) and self.app_config.app.type == AppTypes.bridge and self.app_config.app.bridge is not None ): app_inputs = [ AppIO(name=metric.name, data_type=metric.data_type, asset=metric.asset_name) for metric in self.app_config.app.bridge.metrics_map if metric.access == "RW" ] elif isinstance(self.app_config, SmartAppConfig): for asset in self.assets: for inpt in self.app_config.data_streams.inputs: app_inputs.append(AppIO(name=inpt.name, data_type=inpt.data_type, asset=asset.name)) for cc in self.app_config.control_changes.inputs: app_inputs.append(AppIO(name=cc.name, data_type=cc.data_type, asset=asset.name)) else: raise ConfigurationError("invalid app type") while True: for i in app_inputs: yield MessageData( resource=KRNAssetDataStream(i.asset, i.name), value=self.generate_random_value(i.data_type), timestamp=None, ) await asyncio.sleep(self.period)