Publisher¶
The publisher module provides data publishing capabilities for Kelvin applications.
Kelvin Publisher for testing and simulating message streams.
This module provides utilities for testing Kelvin applications by simulating message streams, generating test data, and publishing messages without connecting to a live Kelvin platform.
- Main Components:
MessageData: Data structure for message payloads. DataGenerator: Abstract base class for implementing custom data generators.
- class kelvin.publisher.MessageData(*args, **kwargs)[source]¶
Bases:
object- resource: KRNAssetDataStream¶
- class kelvin.publisher.DataGenerator[source]¶
Bases:
ABC- abstractmethod async run()[source]¶
- Return type:
AsyncGenerator[Union[MessageData,Message,MessageBuilder],None]
CSV Publisher¶
- class kelvin.publisher.csv_publisher.CSVPublisher(csv_file_path, publish_interval=None, playback=False, ignore_timestamps=False, now_offset=False, logger=<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>)[source]¶
Bases:
DataGenerator- Parameters:
- CSV_ASSET_KEYS = ['asset', 'asset name', 'asset_name']¶
Main¶
- class kelvin.publisher.main.ClickUnion(types)[source]¶
Bases:
ParamType- convert(value, param, ctx)[source]¶
Convert the value to the correct type. This is not called if the value is
None(the missing value).This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.
The
paramandctxarguments may beNonein certain situations, such as when converting prompt input.If the value cannot be converted, call
fail()with a descriptive message.- Parameters:
value – The value to convert.
param – The parameter that is using this type to convert its value. May be
None.ctx – The current context that arrived at this value. May be
None.
Server¶
- kelvin.publisher.server.parse_assets_csv(csv_file_path)[source]¶
- Return type:
- Parameters:
csv_file_path (str)
- kelvin.publisher.server.msg_type_param_dict(msg_type_on_config)[source]¶
To parse different arguments of KMessageTypePrimitive if type is a object with icd
- kelvin.publisher.server.inject_test_config(manifest, config_file=PosixPath('config.yaml'))[source]¶
- Return type:
- Parameters:
manifest (RuntimeManifest)
config_file (Path)
- class kelvin.publisher.server.KelvinPublisherConfig(_case_sensitive=None, _nested_model_default_partial_update=None, _env_prefix=None, _env_prefix_target=None, _env_file=PosixPath('.'), _env_file_encoding=None, _env_ignore_empty=None, _env_nested_delimiter=None, _env_nested_max_split=None, _env_parse_none_str=None, _env_parse_enums=None, _cli_prog_name=None, _cli_parse_args=None, _cli_settings_source=None, _cli_parse_none_str=None, _cli_hide_none_type=None, _cli_avoid_json=None, _cli_enforce_required=None, _cli_use_class_docs_for_groups=None, _cli_exit_on_error=None, _cli_prefix=None, _cli_flag_prefix_char=None, _cli_implicit_flags=None, _cli_ignore_unknown_args=None, _cli_kebab_case=None, _cli_shortcuts=None, _secrets_dir=None, _build_sources=None, **values)[source]¶
Bases:
KelvinStreamConfig- Parameters:
_case_sensitive (bool | None)
_nested_model_default_partial_update (bool | None)
_env_prefix (str | None)
_env_prefix_target (EnvPrefixTarget | None)
_env_file (DotenvType | None)
_env_file_encoding (str | None)
_env_ignore_empty (bool | None)
_env_nested_delimiter (str | None)
_env_nested_max_split (int | None)
_env_parse_none_str (str | None)
_env_parse_enums (bool | None)
_cli_prog_name (str | None)
_cli_settings_source (CliSettingsSource[Any] | None)
_cli_parse_none_str (str | None)
_cli_hide_none_type (bool | None)
_cli_avoid_json (bool | None)
_cli_enforce_required (bool | None)
_cli_use_class_docs_for_groups (bool | None)
_cli_exit_on_error (bool | None)
_cli_prefix (str | None)
_cli_flag_prefix_char (str | None)
_cli_implicit_flags (bool | Literal['dual', 'toggle'] | None)
_cli_ignore_unknown_args (bool | None)
_cli_kebab_case (bool | Literal['all', 'no_enums'] | None)
_secrets_dir (PathType | None)
_build_sources (tuple[tuple[PydanticBaseSettingsSource, ...], dict[str, Any]] | None)
ip (str)
port (int)
limit (int)
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'KELVIN_PUBLISHER_', 'env_prefix_target': 'variable', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class kelvin.publisher.server.PublishServer(conf, generator, replay=False)[source]¶
Bases:
object- Parameters:
conf (AppConfigObj)
generator (DataGenerator)
replay (bool)
- CYCLE_TIMEOUT_S = 0.25¶
- NODE = 'test_node'¶
- WORKLOAD = 'test_workload'¶
- app_config: AppConfigObj¶
- allowed_assets: List[AssetsEntry] | None = None¶
- update_param(asset, param, value)[source]¶
Sets an asset parameter. Empty asset (“”) to change app default
- add_extra_assets(assets_extra)[source]¶
- Return type:
- Parameters:
assets_extra (List[AssetsEntry])
- kelvin_app_yaml_to_runtime(kelvin, allowed_assets)[source]¶
- Return type:
- Parameters:
kelvin (AppKelvin)
allowed_assets (List[AssetsEntry] | None)
- async new_client(reader, writer)[source]¶
- Return type:
- Parameters:
reader (StreamReader)
writer (StreamWriter)
- async handle_generator(generator)[source]¶
- Return type:
- Parameters:
generator (DataGenerator)
- async publish_unsafe(msg)[source]¶
Publish the message as is, do not validate it against the app configuration
- async publish_data(data)[source]¶
- Return type:
- Parameters:
data (MessageData)
- class kelvin.publisher.server.MessageData(*args, **kwargs)[source]¶
Bases:
object- resource: KRNAssetDataStream¶
- class kelvin.publisher.server.DataGenerator[source]¶
Bases:
ABC- abstractmethod async run()[source]¶
- Return type:
AsyncGenerator[Union[MessageData,Message,MessageBuilder],None]
Simulator¶
- class kelvin.publisher.simulator.Simulator(app_config, period, rand_min=0, rand_max=100, random=True, assets_extra=[], parameters_override=[])[source]¶
Bases:
DataGenerator- Parameters:
app_config (ExporterConfig | ImporterConfig | SmartAppConfig | ExternalConfig | AppYaml)
period (float)
rand_min (float)
rand_max (float)
random (bool)
assets_extra (List[AssetsEntry])
parameters_override (List[str])
- app_config: ExporterConfig | ImporterConfig | SmartAppConfig | ExternalConfig | AppYaml¶
- assets: List[AssetsEntry]¶