Source code for kelvin.krn.krn

from __future__ import annotations

from collections.abc import Generator
from typing import Any, Callable, Optional

from pydantic import GetCoreSchemaHandler
from pydantic_core import core_schema
from typing_extensions import Self


[docs] class KRN: """Kelvin Resource Name representation""" _KRN_TYPES: dict[str, type[KRN]] = {} _NS_ID: Optional[str] = None ns_id: str ns_string: str def __init_subclass__(cls) -> None: if cls._NS_ID: KRN._KRN_TYPES[cls._NS_ID] = cls def __init__(self, ns_id: str, ns_string: str) -> None: self.ns_id = ns_id self.ns_string = ns_string @classmethod def __get_validators__(cls) -> Generator[Callable[..., Any], None, None]: yield cls.validate @classmethod def __get_pydantic_core_schema__(cls, source: Any, handler: GetCoreSchemaHandler) -> core_schema.CoreSchema: return core_schema.no_info_after_validator_function( cls.validate, handler(Any), serialization=core_schema.plain_serializer_function_ser_schema(cls.encode), )
[docs] @classmethod def validate(cls, v: Any) -> KRN: if isinstance(v, str): return cls.from_string(v) if isinstance(v, KRN): return v raise TypeError("Invalid type for KRN. KRN or string required.")
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: return cls(ns_id, ns_string)
[docs] @classmethod def from_string(cls, v: str) -> Self: try: krn, ns_id, ns_string = v.split(":", 2) except ValueError: raise ValueError("expected format 'krn:<nid>:<nss>'") from None if krn != "krn": raise ValueError("expected start by 'krn'") T = KRN._KRN_TYPES.get(ns_id, KRN) return T.from_krn(ns_id, ns_string) # type: ignore
def __eq__(self, other: Any) -> bool: return self.ns_id == other.ns_id and self.ns_string == other.ns_string def __hash__(self) -> int: return hash((self.ns_id, self.ns_string)) def __str__(self) -> str: return f"krn:{self.ns_id}:{self.ns_string}" def __repr__(self) -> str: return f"{self.__class__.__name__}({str(self)})"
[docs] def encode(self) -> str: return str(self)
[docs] class KRNAsset(KRN): _NS_ID: str = "asset" """Kelvin Resource Name Asset Metric""" asset: str def __init__(self, asset: str) -> None: super().__init__(self._NS_ID, asset) self.asset = asset def __repr__(self) -> str: return f"{self.__class__.__name__}(asset='{self.asset}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") return cls(ns_string)
[docs] class KRNAssetMetric(KRN): _NS_ID: str = "am" """*Deprecated* Kelvin Resource Name Asset Metric""" asset: str metric: str def __init__(self, asset: str, metric: str) -> None: super().__init__(KRNAssetDataStream._NS_ID, asset + "/" + metric) self.asset = asset self.metric = metric def __repr__(self) -> str: return f"{self.__class__.__name__}(asset='{self.asset}', metric='{self.metric}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") try: asset, metric = ns_string.split("/", 1) except ValueError: raise ValueError("expected format 'krn:am:<asset>/<metric>'") from None return cls(asset, metric)
[docs] class KRNAssetDataStream(KRN): _NS_ID: str = "ad" """Kelvin Resource Name Asset Data""" asset: str data_stream: str def __init__(self, asset: str, data_stream: str) -> None: super().__init__(self._NS_ID, asset + "/" + data_stream) self.asset = asset self.data_stream = data_stream def __repr__(self) -> str: return f"{self.__class__.__name__}(asset='{self.asset}', data_stream='{self.data_stream}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") try: asset, data_stream = ns_string.split("/", 1) except ValueError: raise ValueError("expected format 'krn:ad:<asset>/<data_stream>'") from None return cls(asset, data_stream)
@property def data(self) -> str: return self.data_stream
[docs] class KRNAssetParameter(KRN): _NS_ID: str = "ap" """Kelvin Resource Name Asset Parameter""" asset: str parameter: str def __init__(self, asset: str, parameter: str) -> None: super().__init__(self._NS_ID, asset + "/" + parameter) self.asset = asset self.parameter = parameter def __repr__(self) -> str: return f"{self.__class__.__name__}(asset='{self.asset}', parameter='{self.parameter}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") try: asset, parameter = ns_string.split("/", 1) except ValueError: raise ValueError("expected format 'krn:ap:<asset>/<parameter>'") from None return cls(asset, parameter)
[docs] class KRNParameter(KRN): _NS_ID: str = "param" """Kelvin Resource Name Parameter""" parameter: str def __init__(self, parameter: str) -> None: super().__init__(self._NS_ID, parameter) self.parameter = parameter def __repr__(self) -> str: return f"{self.__class__.__name__}(parameter='{self.parameter}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") return cls(ns_string)
[docs] class KRNWorkload(KRN): _NS_ID: str = "wl" """Kelvin Resource Name Workload""" node: str workload: str def __init__(self, node: str, workload: str) -> None: super().__init__(self._NS_ID, node + "/" + workload) self.node = node self.workload = workload @property def node_name(self) -> str: "Backwards compatibility" return self.node @property def workload_name(self) -> str: "Backwards compatibility" return self.workload def __repr__(self) -> str: return f"{self.__class__.__name__}(node='{self.node}', workload='{self.workload}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") try: node, workload = ns_string.split("/", 1) except ValueError: raise ValueError("expected format 'krn:wl:<node>/<workload>'") from None return cls(node, workload)
[docs] class KRNWorkloadAppVersion(KRN): _NS_ID: str = "wlappv" """Kelvin Resource Name Workload App""" node: str workload: str app: str version: str def __init__(self, node: str, workload: str, app: str, version: str) -> None: super().__init__(self._NS_ID, node + "/" + workload + ":" + app + "/" + version) self.node = node self.workload = workload self.app = app self.version = version @property def node_name(self) -> str: "Backwards compatibility" return self.node @property def workload_name(self) -> str: "Backwards compatibility" return self.workload def __repr__(self) -> str: return ( f"{self.__class__.__name__}(node='{self.node}', workload='{self.workload}', " f"app='{self.app}', version='{self.version}')" )
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") try: node_workload, app_version = ns_string.split(":", 1) node, workload = node_workload.split("/", 1) app, version = app_version.split("/", 1) except ValueError: raise ValueError("expected format 'krn:wl:<node>/<workload>:<app>/<version>'") from None return cls(node, workload, app, version)
[docs] class KRNRecommendation(KRN): _NS_ID: str = "recommendation" """Kelvin Resource Name Recommendation""" recommendation_id: str def __init__(self, recommendation_id: str) -> None: super().__init__(self._NS_ID, recommendation_id) self.recommendation_id = recommendation_id def __repr__(self) -> str: return f"{self.__class__.__name__}(recommendation_id='{self.recommendation_id}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") return cls(ns_string)
[docs] class KRNAppVersion(KRN): _NS_ID: str = "appversion" """Kelvin Resource Name App Version""" app: str version: str def __init__(self, app: str, version: str) -> None: super().__init__(self._NS_ID, app + "/" + version) self.app = app self.version = version def __repr__(self) -> str: return f"{self.__class__.__name__}(app='{self.app}', version='{self.version}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") try: app, version = ns_string.split("/", 1) except ValueError: raise ValueError(f"expected format 'krn:{cls._NS_ID}:<app>/<version>'") from None return cls(app, version)
[docs] class KRNApp(KRN): _NS_ID: str = "app" """Kelvin Resource Name Application""" app: str def __init__(self, app: str) -> None: super().__init__(self._NS_ID, app) self.app = app def __repr__(self) -> str: return f"{self.__class__.__name__}(app='{self.app}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") return cls(ns_string)
[docs] class KRNDatastream(KRN): _NS_ID: str = "datastream" """Kelvin Resource Name Datastream""" datastream: str def __init__(self, datastream: str) -> None: super().__init__(self._NS_ID, datastream) self.datastream = datastream def __repr__(self) -> str: return f"{self.__class__.__name__}(datastream='{self.datastream}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") return cls(ns_string)
[docs] class KRNUser(KRN): _NS_ID: str = "user" """Kelvin Resource Name User""" user: str def __init__(self, user: str) -> None: super().__init__(self._NS_ID, user) self.user = user def __repr__(self) -> str: return f"{self.__class__.__name__}(user='{self.user}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") return cls(ns_string)
[docs] class KRNServiceAccount(KRN): _NS_ID: str = "srv-acc" """Kelvin Resource Name Service Account""" service_account: str def __init__(self, service_account: str) -> None: super().__init__(self._NS_ID, service_account) self.service_account = service_account def __repr__(self) -> str: return f"{self.__class__.__name__}(service_account='{self.service_account}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") return cls(ns_string)
[docs] class KRNJob(KRN): _NS_ID: str = "job" """Kelvin Resource Name Job""" job: str job_run_id: str def __init__(self, job: str, job_run_id: str) -> None: super().__init__(self._NS_ID, job + "/" + job_run_id) self.job = job self.job_run_id = job_run_id def __repr__(self) -> str: return f"{self.__class__.__name__}(job='{self.job}', job_run_id='{self.job_run_id}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") try: job, job_run_id = ns_string.split("/", 1) except ValueError: raise ValueError(f"expected format 'krn:{cls._NS_ID}:<job>/<job_run_id>'") from None return cls(job, job_run_id)
[docs] class KRNSchedule(KRN): _NS_ID: str = "schedule" """Kelvin Resource Name Schedule""" schedule: str def __init__(self, schedule: str) -> None: super().__init__(self._NS_ID, schedule) self.schedule = schedule def __repr__(self) -> str: return f"{self.__class__.__name__}(schedule='{self.schedule}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") return cls(ns_string)
[docs] class KRNAssetDataQuality(KRN): _NS_ID: str = "dqasset" """Kelvin Resource Name Asset DataQuality eg: krn:dqasset:score:pcp_01 """ asset: str data_quality: str def __init__(self, asset: str, data_quality: str) -> None: super().__init__(self._NS_ID, data_quality + ":" + asset) self.asset = asset self.data_quality = data_quality def __repr__(self) -> str: return f"{self.__class__.__name__}(asset='{self.asset}', data_quality='{self.data_quality}')"
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") try: data_quality, asset = ns_string.split(":", 1) except ValueError: raise ValueError("expected format 'krn:dqasset:<data_quality>:<asset>'") from None return cls(asset, data_quality)
[docs] class KRNAssetDataStreamDataQuality(KRN): _NS_ID: str = "dqad" """Kelvin Resource Name Asset DataStream DataQuality eg: krn:dqad:timestamp_anomaly:pcp_01/gas_flow """ asset: str data_stream: str data_quality: str def __init__(self, asset: str, data_stream: str, data_quality: str) -> None: super().__init__(self._NS_ID, data_quality + ":" + asset + "/" + data_stream) self.asset = asset self.data_stream = data_stream self.data_quality = data_quality def __repr__(self) -> str: return ( f"{self.__class__.__name__}(" f"asset='{self.asset}', " f"data_stream='{self.data_stream}', " f"data_quality='{self.data_quality}'" f")" )
[docs] @classmethod def from_krn(cls, ns_id: str, ns_string: str) -> Self: if ns_id != cls._NS_ID: raise ValueError(f"Error parsing {cls.__name__}. Expected {cls._NS_ID}, got {ns_id}") try: data_quality, ad = ns_string.split(":", 1) asset, data_stream = ad.split("/", 1) except ValueError: raise ValueError("expected format 'krn:dqad:<data_quality>:<asset>/<data_stream>'") from None return cls(asset, data_stream, data_quality)