Publisher

The publisher module provides data publishing capabilities for Kelvin applications.

Kelvin Publisher for testing and simulating message streams.

This module provides utilities for testing Kelvin applications by simulating message streams, generating test data, and publishing messages without connecting to a live Kelvin platform.

Main Components:

MessageData: Data structure for message payloads. DataGenerator: Abstract base class for implementing custom data generators.

class kelvin.publisher.MessageData(*args, **kwargs)[source]

Bases: object

Parameters:
  • resource (KRNAssetDataStream)

  • timestamp (datetime | None)

  • value (Any)

resource: KRNAssetDataStream
timestamp: datetime | None
value: Any
class kelvin.publisher.DataGenerator[source]

Bases: ABC

abstractmethod async run()[source]
Return type:

AsyncGenerator[Union[MessageData, Message, MessageBuilder], None]

CSV Publisher

class kelvin.publisher.csv_publisher.CSVPublisher(csv_file_path, publish_interval=None, playback=False, ignore_timestamps=False, now_offset=False, logger=<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>)[source]

Bases: DataGenerator

Parameters:
  • csv_file_path (str)

  • publish_interval (Optional[float])

  • playback (bool)

  • ignore_timestamps (bool)

  • now_offset (bool)

  • logger (structlog.stdlib.BoundLogger)

CSV_ASSET_KEYS = ['asset', 'asset name', 'asset_name']
parse_timestamp(ts_str, offset)[source]
Return type:

Optional[datetime]

Parameters:
pop_asset_key(row_dict)[source]
Return type:

str

Parameters:

row_dict (dict[str, str])

async run()[source]
Return type:

AsyncGenerator[MessageData, None]

Main

kelvin.publisher.main.coro(f)[source]

Decorator to allow async click commands.

Return type:

Any

Parameters:

f (Callable)

class kelvin.publisher.main.AssetParameterOverride(asset, param, value)[source]

Bases: object

Parameters:
asset: str
param: str
value: str
class kelvin.publisher.main.ClickUnion(types)[source]

Bases: ParamType

name: str = '"csv"|float'

the descriptive name of this type

convert(value, param, ctx)[source]

Convert the value to the correct type. This is not called if the value is None (the missing value).

This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.

The param and ctx arguments may be None in certain situations, such as when converting prompt input.

If the value cannot be converted, call fail() with a descriptive message.

Parameters:
  • value – The value to convert.

  • param – The parameter that is using this type to convert its value. May be None.

  • ctx – The current context that arrived at this value. May be None.

kelvin.publisher.main.load_class(entry_point)[source]

Load a class from a module or file path in the format ‘module:ClassName’

Return type:

Type

Parameters:

entry_point (str)

kelvin.publisher.main.main()[source]
Return type:

None

Server

kelvin.publisher.server.flatten_dict(d, parent_key='', sep='.')[source]
Return type:

Dict

Parameters:
kelvin.publisher.server.parse_assets_csv(csv_file_path)[source]
Return type:

List[AssetsEntry]

Parameters:

csv_file_path (str)

kelvin.publisher.server.string_to_strict_type(value, data_type)[source]
Return type:

Union[bool, float, str, dict]

Parameters:
kelvin.publisher.server.msg_type_param_dict(msg_type_on_config)[source]

To parse different arguments of KMessageTypePrimitive if type is a object with icd

Return type:

Dict

Parameters:

msg_type_on_config (str)

kelvin.publisher.server.inject_test_config(manifest, config_file=PosixPath('config.yaml'))[source]
Return type:

RuntimeManifest

Parameters:
class kelvin.publisher.server.KelvinPublisherConfig(_case_sensitive=None, _nested_model_default_partial_update=None, _env_prefix=None, _env_prefix_target=None, _env_file=PosixPath('.'), _env_file_encoding=None, _env_ignore_empty=None, _env_nested_delimiter=None, _env_nested_max_split=None, _env_parse_none_str=None, _env_parse_enums=None, _cli_prog_name=None, _cli_parse_args=None, _cli_settings_source=None, _cli_parse_none_str=None, _cli_hide_none_type=None, _cli_avoid_json=None, _cli_enforce_required=None, _cli_use_class_docs_for_groups=None, _cli_exit_on_error=None, _cli_prefix=None, _cli_flag_prefix_char=None, _cli_implicit_flags=None, _cli_ignore_unknown_args=None, _cli_kebab_case=None, _cli_shortcuts=None, _secrets_dir=None, _build_sources=None, **values)[source]

Bases: KelvinStreamConfig

Parameters:
  • _case_sensitive (bool | None)

  • _nested_model_default_partial_update (bool | None)

  • _env_prefix (str | None)

  • _env_prefix_target (EnvPrefixTarget | None)

  • _env_file (DotenvType | None)

  • _env_file_encoding (str | None)

  • _env_ignore_empty (bool | None)

  • _env_nested_delimiter (str | None)

  • _env_nested_max_split (int | None)

  • _env_parse_none_str (str | None)

  • _env_parse_enums (bool | None)

  • _cli_prog_name (str | None)

  • _cli_parse_args (bool | list[str] | tuple[str, ...] | None)

  • _cli_settings_source (CliSettingsSource[Any] | None)

  • _cli_parse_none_str (str | None)

  • _cli_hide_none_type (bool | None)

  • _cli_avoid_json (bool | None)

  • _cli_enforce_required (bool | None)

  • _cli_use_class_docs_for_groups (bool | None)

  • _cli_exit_on_error (bool | None)

  • _cli_prefix (str | None)

  • _cli_flag_prefix_char (str | None)

  • _cli_implicit_flags (bool | Literal['dual', 'toggle'] | None)

  • _cli_ignore_unknown_args (bool | None)

  • _cli_kebab_case (bool | Literal['all', 'no_enums'] | None)

  • _cli_shortcuts (Mapping[str, str | list[str]] | None)

  • _secrets_dir (PathType | None)

  • _build_sources (tuple[tuple[PydanticBaseSettingsSource, ...], dict[str, Any]] | None)

  • ip (str)

  • port (int)

  • limit (int)

model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'KELVIN_PUBLISHER_', 'env_prefix_target': 'variable', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

ip: str
exception kelvin.publisher.server.PublisherError[source]

Bases: Exception

class kelvin.publisher.server.PublishServer(conf, generator, replay=False)[source]

Bases: object

Parameters:
CYCLE_TIMEOUT_S = 0.25
NODE = 'test_node'
WORKLOAD = 'test_workload'
asset_params: Dict[Tuple[str, str], bool | float | str] = {}
app_config: AppConfigObj
allowed_assets: List[AssetsEntry] | None = None
on_message: Callable[[Message], None]
write_queue: Queue[Message]
update_param(asset, param, value)[source]

Sets an asset parameter. Empty asset (“”) to change app default

Parameters:
  • asset (str) – asset name (empty (“”) for fallback)

  • param (str) – param name

  • value (Union[bool, float, str]) – param value

Return type:

None

add_extra_assets(assets_extra)[source]
Return type:

None

Parameters:

assets_extra (List[AssetsEntry])

bridge_app_yaml_to_runtime(bridge)[source]
Return type:

RuntimeManifest

Parameters:

bridge (AppBridge)

kelvin_app_yaml_to_runtime(kelvin, allowed_assets)[source]
Return type:

RuntimeManifest

Parameters:
runtime_from_app_manifest()[source]
Return type:

RuntimeManifest

build_config_message()[source]
Return type:

RuntimeManifest

async start_server()[source]
Return type:

None

async new_client(reader, writer)[source]
Return type:

None

Parameters:
  • reader (StreamReader)

  • writer (StreamWriter)

async handle_read(reader)[source]
Return type:

None

Parameters:

reader (StreamReader)

async handle_write(writer, queue)[source]
Return type:

None

Parameters:
  • writer (StreamWriter)

  • queue (Queue[Message])

async handle_generator(generator)[source]
Return type:

None

Parameters:

generator (DataGenerator)

async publish_unsafe(msg)[source]

Publish the message as is, do not validate it against the app configuration

Parameters:

msg (Message) – message to publish

Return type:

None

async publish_data(data)[source]
Return type:

bool

Parameters:

data (MessageData)

kelvin.publisher.server.log_message(msg)[source]
Return type:

None

Parameters:

msg (Message)

class kelvin.publisher.server.MessageData(*args, **kwargs)[source]

Bases: object

Parameters:
  • resource (KRNAssetDataStream)

  • timestamp (datetime | None)

  • value (Any)

resource: KRNAssetDataStream
timestamp: datetime | None
value: Any
class kelvin.publisher.server.AppIO(*args, **kwargs)[source]

Bases: object

Parameters:
name: str
data_type: str
asset: str
class kelvin.publisher.server.DataGenerator[source]

Bases: ABC

abstractmethod async run()[source]
Return type:

AsyncGenerator[Union[MessageData, Message, MessageBuilder], None]

Simulator

class kelvin.publisher.simulator.Simulator(app_config, period, rand_min=0, rand_max=100, random=True, assets_extra=[], parameters_override=[])[source]

Bases: DataGenerator

Parameters:
app_yaml: str
app_config: ExporterConfig | ImporterConfig | SmartAppConfig | ExternalConfig | AppYaml
rand_min: float
rand_max: float
random: bool
current_value: float
params_override: Dict[str, bool | float | str]
assets: List[AssetsEntry]
generate_random_value(data_type)[source]
Return type:

Union[bool, float, str, dict]

Parameters:

data_type (str)

async run()[source]
Return type:

AsyncGenerator[MessageData, None]