Source code for kelvin.api.client.async_api.orchestration

# Code generated by builder. DO NOT EDIT.
"""
Kelvin API Client.
"""

from __future__ import annotations

from collections.abc import AsyncIterator, Mapping, Sequence
from typing import Optional, Union, overload

from typing_extensions import Literal

from kelvin.api.base.api_service_model import AsyncApiServiceModel
from kelvin.api.base.data_model import KList
from kelvin.api.base.error import ResponseError
from kelvin.api.base.http_client.base_client import AsyncBaseClient, Response

from ..model import requests, response, responses


[docs] class Orchestration(AsyncApiServiceModel): @overload @classmethod async def create_orchestration_clusters( cls, data: Optional[Union[requests.OrchestrationClustersCreate, Mapping[str, object]]] = None, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, **kwargs: object, ) -> dict[str, object]: ... @overload @classmethod async def create_orchestration_clusters( cls, data: Optional[Union[requests.OrchestrationClustersCreate, Mapping[str, object]]] = None, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, **kwargs: object, ) -> Response: ... @overload @classmethod async def create_orchestration_clusters( cls, data: Optional[Union[requests.OrchestrationClustersCreate, Mapping[str, object]]] = None, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, **kwargs: object, ) -> responses.OrchestrationClustersCreate: ...
[docs] @classmethod async def create_orchestration_clusters( cls, data: Optional[Union[requests.OrchestrationClustersCreate, Mapping[str, object]]] = None, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, **kwargs: object, ) -> Union[responses.OrchestrationClustersCreate, dict[str, object], Response]: """Create a new Cluster. This only creates the Cloud registration, the actual installation still needs to be manually performed on the server/kubernetes cluster. The provision script to run locally on the server for `type` registered as `k3s` or on an existing kubernetes cluster for `type` registered as `kubernetes` will be in the key `provision_script ` in the 201 response body. **Permission Required:** `kelvin.permission.cluster.create`. ``createOrchestrationClusters``: ``POST`` ``/api/v4/orchestration/clusters/create`` Args: data: requests.OrchestrationClustersCreate, optional **kwargs: Extra parameters for requests.OrchestrationClustersCreate - create_orchestration_clusters: dict """ result_types = { "201": responses.OrchestrationClustersCreate, "400": response.Error, "401": response.Error, "404": response.Error, "409": response.Error, "500": response.Error, } _request = cls._prepare_request( method="POST", path="/api/v4/orchestration/clusters/create", values={}, params={}, files={}, headers={}, data=data, body_type=requests.OrchestrationClustersCreate, array_body=False, **kwargs, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) return cls._process_response( response=_response, result_types=result_types, result_type=responses.OrchestrationClustersCreate, )
@overload @classmethod async def list_orchestration_clusters( cls, names: Optional[Sequence[str]] = None, search: Optional[Sequence[str]] = None, type: Optional[Sequence[str]] = None, ready: Optional[bool] = None, status: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod async def list_orchestration_clusters( cls, names: Optional[Sequence[str]] = None, search: Optional[Sequence[str]] = None, type: Optional[Sequence[str]] = None, ready: Optional[bool] = None, status: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, ) -> Response: ... @overload @classmethod async def list_orchestration_clusters( cls, names: Optional[Sequence[str]] = None, search: Optional[Sequence[str]] = None, type: Optional[Sequence[str]] = None, ready: Optional[bool] = None, status: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, *, fetch: Literal[False], _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[ responses.OrchestrationClustersListPaginatedResponseCursor, responses.OrchestrationClustersListPaginatedResponseLimits, ]: ... @overload @classmethod async def list_orchestration_clusters( cls, names: Optional[Sequence[str]] = None, search: Optional[Sequence[str]] = None, type: Optional[Sequence[str]] = None, ready: Optional[bool] = None, status: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: Literal[True] = True, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> KList[responses.OrchestrationClustersCreateItem]: ...
[docs] @classmethod async def list_orchestration_clusters( cls, names: Optional[Sequence[str]] = None, search: Optional[Sequence[str]] = None, type: Optional[Sequence[str]] = None, ready: Optional[bool] = None, status: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[ Union[ KList[responses.OrchestrationClustersCreateItem], responses.OrchestrationClustersListPaginatedResponseCursor, responses.OrchestrationClustersListPaginatedResponseLimits, ], dict[str, object], Response, ]: """Returns a list of Cluster objects. The list can be optionally filtered and sorted on the server before being returned. **Permission Required:** `kelvin.permission.cluster.read`. ``listOrchestrationClusters``: ``GET`` ``/api/v4/orchestration/clusters/list`` Args: names : :obj:`Sequence[str]` A filter on the list based on the Cluster key `name`. The filter is on the full name only. All strings in the array are treated as `OR`. Can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. search : :obj:`Sequence[str]` Search and filter on the list based on the Cluster keys `title` (Display Name) or `name`. The search is case insensitive and will find partial matches as well. type : :obj:`Sequence[str]` A filter on the list based on the Cluster key `type`. The filter is on the full name only. All strings in the array are treated as `OR`. Options are `k3s` and `kubernetes` ready : :obj:`bool` A filter on the list based on the Cluster key `ready`. Options are `true` and `false` status : :obj:`Sequence[str]` A filter on the list based on the Cluster key `status`. The filter is on the full name only. All strings in the array are treated as `OR`. Options are `pending_provision`, `pending`, `online`, `unreachable` and `requires_attention` pagination_type : :obj:`Literal['limits', 'cursor', 'stream']` Method of pagination to use for return results where `total_items` is greater than `page_size`. `cursor` and `limits` will return one `page` of results, `stream` will return all results. ('limits', 'cursor', 'stream') page_size : :obj:`int` Number of objects to be returned in each page. Page size can range between 1 and 10000 objects. page : :obj:`int` An integer for the wanted page of results. Used only with `pagination_type` set as `limits`. next : :obj:`str` An alphanumeric string bookmark to indicate where to start for the next page. Used only with `pagination_type` set as `cursor`. previous : :obj:`str` An alphanumeric string bookmark to indicate where to end for the previous page. Used only with `pagination_type` set as `cursor`. direction : :obj:`Literal['asc', 'desc']` Sorting order according to the `sort_by` parameter. ('asc', 'desc') nulls : :obj:`Literal['first', 'last']` Null ordering according to the `sort_by` parameter. Defaults to `first` for ascending order and `last` for descending order. ('first', 'last') sort_by : :obj:`Sequence[str]` Sort the results by one of the Cluster parameters. Only one parameter can be selected. Options: `name`, `title`, `ready`, `type`, `status`, `last_seen`, `created`, `updated`, `kelvin_version`, `container_version`. """ result_types = { "200": responses.OrchestrationClustersListPaginatedResponseCursor, "400": response.Error, "401": response.Error, "500": response.Error, } # override pagination_type # stream type is only supported for raw responses if not _get_response and pagination_type == "stream": pagination_type = "cursor" _request = cls._prepare_request( method="GET", path="/api/v4/orchestration/clusters/list", values={}, params={ "names": names, "search": search, "type": type, "ready": ready, "status": status, "pagination_type": pagination_type, "page_size": page_size, "page": page, "next": next, "previous": previous, "direction": direction, "nulls": nulls, "sort_by": sort_by, }, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) if pagination_type == "limits": result = cls._process_response( response=_response, result_types=result_types, result_type=responses.OrchestrationClustersListPaginatedResponseLimits, ) else: # default pagination_type is cursor result = cls._process_response( response=_response, result_types=result_types, result_type=responses.OrchestrationClustersListPaginatedResponseCursor, ) if fetch: return await cls._fetch_pages( client=_client, path=_request.path, api_response=result, method="GET", params=_request.params, data=_request.data, result_types=result_types, error_type=response.Error, response_type=KList[responses.OrchestrationClustersCreateItem], ) return result
@overload @classmethod async def download_orchestration_cluster_provision_binary( cls, arch: Optional[Literal["amd64", "arm64", "armv7"]] = None, type: Optional[Literal["kubernetes", "docker"]] = None, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod async def download_orchestration_cluster_provision_binary( cls, arch: Optional[Literal["amd64", "arm64", "armv7"]] = None, type: Optional[Literal["kubernetes", "docker"]] = None, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, ) -> Response: ... @overload @classmethod async def download_orchestration_cluster_provision_binary( cls, arch: Optional[Literal["amd64", "arm64", "armv7"]] = None, type: Optional[Literal["kubernetes", "docker"]] = None, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> AsyncIterator[bytes]: ...
[docs] @classmethod async def download_orchestration_cluster_provision_binary( cls, arch: Optional[Literal["amd64", "arm64", "armv7"]] = None, type: Optional[Literal["kubernetes", "docker"]] = None, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[AsyncIterator[bytes], dict[str, object], Response]: """Download Cluster Provision Binary **Permission Required:** `kelvin.permission.cluster.read`. ``downloadOrchestrationClusterProvisionBinary``: ``GET`` ``/api/v4/orchestration/clusters/provision/bin/download`` Args: arch : :obj:`Literal['amd64', 'arm64', 'armv7']` Architecture of the binary to download. Options are `amd64`, `arm64`, `armv7` (default: `amd64`) ('amd64', 'arm64', 'armv7') type : :obj:`Literal['kubernetes', 'docker']` Type of the binary to download. Options are `kubernetes`, `docker` (default: `kubernetes`) ('kubernetes', 'docker') """ result_types = {"200": bytes, "400": response.Error, "401": response.Error, "500": response.Error} _request = cls._prepare_request( method="GET", path="/api/v4/orchestration/clusters/provision/bin/download", values={}, params={"arch": arch, "type": type}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=True, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) result = await cls._process_response_stream_bytes(response=_response) if result is None: raise ResponseError( f"Unexpected empty value or invalid content-type response", _response, ) return result
@overload @classmethod async def delete_orchestration_clusters( cls, cluster_name: str, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod async def delete_orchestration_clusters( cls, cluster_name: str, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, ) -> Response: ... @overload @classmethod async def delete_orchestration_clusters( cls, cluster_name: str, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> None: ...
[docs] @classmethod async def delete_orchestration_clusters( cls, cluster_name: str, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> Optional[Union[dict[str, object], Response]]: """Permanently delete an existing Cluster and its Nodes. This will also delete Workloads, Bridges and Services residing on the Cluster. This cannot be undone once the API request has been submitted. **Permission Required:** `kelvin.permission.cluster.delete`. ``deleteOrchestrationClusters``: ``POST`` ``/api/v4/orchestration/clusters/{cluster_name}/delete`` Args: cluster_name : :obj:`str`, optional Cluster key `name` to delete. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. """ result_types = { "200": None, "400": response.Error, "401": response.Error, "404": response.Error, "500": response.Error, } _request = cls._prepare_request( method="POST", path="/api/v4/orchestration/clusters/{cluster_name}/delete", values={"cluster_name": cluster_name}, params={}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, )
@overload @classmethod async def apply_orchestration_clusters_edge_apps_version( cls, cluster_name: str, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod async def apply_orchestration_clusters_edge_apps_version( cls, cluster_name: str, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, ) -> Response: ... @overload @classmethod async def apply_orchestration_clusters_edge_apps_version( cls, cluster_name: str, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> str: ...
[docs] @classmethod async def apply_orchestration_clusters_edge_apps_version( cls, cluster_name: str, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[str, dict[str, object], Response]: """Initiates available cluster upgrades; requires cluster to be online and ready. **Permission Required:** `kelvin.permission.cluster.update`. ``applyOrchestrationClustersEdgeAppsVersion``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/edge-apps/version/apply`` Args: cluster_name : :obj:`str`, optional Cluster key `name` to initiate upgrades. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. """ result_types = { "200": str, "400": response.Error, "401": response.Error, "404": response.Error, "406": response.Error, "500": response.Error, } _request = cls._prepare_request( method="GET", path="/api/v4/orchestration/clusters/{cluster_name}/edge-apps/version/apply", values={"cluster_name": cluster_name}, params={}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) result = cls._process_str_response(response=_response) if result is None: raise ResponseError( f"Unexpected empty response", _response, ) return result
@overload @classmethod async def update_orchestration_clusters_edge_apps_version_force( cls, cluster_name: str, persist: Optional[bool] = None, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod async def update_orchestration_clusters_edge_apps_version_force( cls, cluster_name: str, persist: Optional[bool] = None, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, ) -> Response: ... @overload @classmethod async def update_orchestration_clusters_edge_apps_version_force( cls, cluster_name: str, persist: Optional[bool] = None, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> str: ...
[docs] @classmethod async def update_orchestration_clusters_edge_apps_version_force( cls, cluster_name: str, persist: Optional[bool] = None, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[str, dict[str, object], Response]: """Force available updates to Cluster even if update setting is disabled. Optionally keep a force update action on standby if the Cluster is offline. This ensures the update is applied once the Cluster returns online. **Permission Required:** `kelvin.permission.cluster.update`. ``updateOrchestrationClustersEdgeAppsVersionForce``: ``POST`` ``/api/v4/orchestration/clusters/{cluster_name}/edge-apps/version/force-update`` Args: cluster_name : :obj:`str`, optional Cluster key `name` to initiate upgrades. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. persist : :obj:`bool` Optional setting to wait if the Cluster is currently unreachable and force the update when the Cluster is next online. """ result_types = { "200": str, "202": str, "400": response.Error, "401": response.Error, "404": response.Error, "500": response.Error, } _request = cls._prepare_request( method="POST", path="/api/v4/orchestration/clusters/{cluster_name}/edge-apps/version/force-update", values={"cluster_name": cluster_name}, params={"persist": persist}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) result = cls._process_str_response(response=_response) if result is None: raise ResponseError( f"Unexpected empty response", _response, ) return result
@overload @classmethod async def get_orchestration_clusters( cls, cluster_name: str, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod async def get_orchestration_clusters( cls, cluster_name: str, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, ) -> Response: ... @overload @classmethod async def get_orchestration_clusters( cls, cluster_name: str, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> responses.OrchestrationClustersGet: ...
[docs] @classmethod async def get_orchestration_clusters( cls, cluster_name: str, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[responses.OrchestrationClustersGet, dict[str, object], Response]: """Retrieve the parameters and status of a Cluster. **Permission Required:** `kelvin.permission.cluster.read`. ``getOrchestrationClusters``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/get`` Args: cluster_name : :obj:`str`, optional Cluster key `name` to get. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. """ result_types = { "200": responses.OrchestrationClustersGet, "400": response.Error, "401": response.Error, "404": response.Error, "500": response.Error, } _request = cls._prepare_request( method="GET", path="/api/v4/orchestration/clusters/{cluster_name}/get", values={"cluster_name": cluster_name}, params={}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) return cls._process_response( response=_response, result_types=result_types, result_type=responses.OrchestrationClustersGet, )
@overload @classmethod async def get_orchestration_clusters_manifests( cls, cluster_name: str, version: str, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod async def get_orchestration_clusters_manifests( cls, cluster_name: str, version: str, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, ) -> Response: ... @overload @classmethod async def get_orchestration_clusters_manifests( cls, cluster_name: str, version: str, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> responses.OrchestrationClustersManifestsGet: ...
[docs] @classmethod async def get_orchestration_clusters_manifests( cls, cluster_name: str, version: str, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> Optional[Union[responses.OrchestrationClustersManifestsGet, dict[str, object], Response]]: """Get Cluster Manifests **Permission Required:** `kelvin.permission.cluster.read`. ``getOrchestrationClustersManifests``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/manifests/get`` Args: cluster_name : :obj:`str`, optional Cluster key `name` to retrieve provision yaml file. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. version : :obj:`str`, optional Current version of the key `kelvin_version` in `version` object of the Cluster parameters. """ result_types = { "200": responses.OrchestrationClustersManifestsGet, "204": None, "400": response.Error, "401": response.Error, "404": response.Error, "500": response.Error, } _request = cls._prepare_request( method="GET", path="/api/v4/orchestration/clusters/{cluster_name}/manifests/get", values={"cluster_name": cluster_name}, params={"version": version}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) return cls._process_response( response=_response, result_types=result_types, result_type=responses.OrchestrationClustersManifestsGet, )
@overload @classmethod async def list_orchestration_clusters_node( cls, cluster_name: str, search: Optional[Sequence[str]] = None, status: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod async def list_orchestration_clusters_node( cls, cluster_name: str, search: Optional[Sequence[str]] = None, status: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, ) -> Response: ... @overload @classmethod async def list_orchestration_clusters_node( cls, cluster_name: str, search: Optional[Sequence[str]] = None, status: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, *, fetch: Literal[False], _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[ responses.OrchestrationClustersNodeListPaginatedResponseCursor, responses.OrchestrationClustersNodeListPaginatedResponseLimits, ]: ... @overload @classmethod async def list_orchestration_clusters_node( cls, cluster_name: str, search: Optional[Sequence[str]] = None, status: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: Literal[True] = True, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> KList[responses.OrchestrationClustersNodesGetItem]: ...
[docs] @classmethod async def list_orchestration_clusters_node( cls, cluster_name: str, search: Optional[Sequence[str]] = None, status: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[ Union[ KList[responses.OrchestrationClustersNodesGetItem], responses.OrchestrationClustersNodeListPaginatedResponseCursor, responses.OrchestrationClustersNodeListPaginatedResponseLimits, ], dict[str, object], Response, ]: """Returns a list of Node objects in a Cluster. The list can be optionally filtered and sorted on the server before being returned. **Permission Required:** `kelvin.permission.cluster.read`. ``listOrchestrationClustersNode``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/nodes/list`` Args: cluster_name : :obj:`str`, optional Cluster key `name` containing the Nodes to list. The filter is on the full name only. Can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. search : :obj:`Sequence[str]` Search and filter on the list based on the Node `name`. The search is case insensitive and will find partial matches as well. status : :obj:`Sequence[str]` A filter on the list based on the Node key `status`. The filter is on the full name only. All strings in the array are treated as `OR`. Options are `online`, `unreachable` and `not_ready` pagination_type : :obj:`Literal['limits', 'cursor', 'stream']` Method of pagination to use for return results where `total_items` is greater than `page_size`. `cursor` and `limits` will return one `page` of results, `stream` will return all results. ('limits', 'cursor', 'stream') page_size : :obj:`int` Number of objects to be returned in each page. Page size can range between 1 and 10000 objects. page : :obj:`int` An integer for the wanted page of results. Used only with `pagination_type` set as `limits`. next : :obj:`str` An alphanumeric string bookmark to indicate where to start for the next page. Used only with `pagination_type` set as `cursor`. previous : :obj:`str` An alphanumeric string bookmark to indicate where to end for the previous page. Used only with `pagination_type` set as `cursor`. direction : :obj:`Literal['asc', 'desc']` Sorting order according to the `sort_by` parameter. ('asc', 'desc') nulls : :obj:`Literal['first', 'last']` Null ordering according to the `sort_by` parameter. Defaults to `first` for ascending order and `last` for descending order. ('first', 'last') sort_by : :obj:`Sequence[str]` Sort the results by one of the Cluster parameters. Only one parameter can be selected. Options: `id`, `name`, `status`, `last_seen`, `created` and `updated`. """ result_types = { "200": responses.OrchestrationClustersNodeListPaginatedResponseCursor, "400": response.Error, "401": response.Error, "500": response.Error, } # override pagination_type # stream type is only supported for raw responses if not _get_response and pagination_type == "stream": pagination_type = "cursor" _request = cls._prepare_request( method="GET", path="/api/v4/orchestration/clusters/{cluster_name}/nodes/list", values={"cluster_name": cluster_name}, params={ "search": search, "status": status, "pagination_type": pagination_type, "page_size": page_size, "page": page, "next": next, "previous": previous, "direction": direction, "nulls": nulls, "sort_by": sort_by, }, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) if pagination_type == "limits": result = cls._process_response( response=_response, result_types=result_types, result_type=responses.OrchestrationClustersNodeListPaginatedResponseLimits, ) else: # default pagination_type is cursor result = cls._process_response( response=_response, result_types=result_types, result_type=responses.OrchestrationClustersNodeListPaginatedResponseCursor, ) if fetch: return await cls._fetch_pages( client=_client, path=_request.path, api_response=result, method="GET", params=_request.params, data=_request.data, result_types=result_types, error_type=response.Error, response_type=KList[responses.OrchestrationClustersNodesGetItem], ) return result
@overload @classmethod async def get_orchestration_clusters_nodes( cls, cluster_name: str, node_name: str, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod async def get_orchestration_clusters_nodes( cls, cluster_name: str, node_name: str, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, ) -> Response: ... @overload @classmethod async def get_orchestration_clusters_nodes( cls, cluster_name: str, node_name: str, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> responses.OrchestrationClustersNodesGet: ...
[docs] @classmethod async def get_orchestration_clusters_nodes( cls, cluster_name: str, node_name: str, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[responses.OrchestrationClustersNodesGet, dict[str, object], Response]: """Retrieve the parameters and status of a specific Node on a Cluster. **Permission Required:** `kelvin.permission.cluster.read`. ``getOrchestrationClustersNodes``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/nodes/{node_name}/get`` Args: cluster_name : :obj:`str`, optional Cluster key `name` to look for Node. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. node_name : :obj:`str`, optional Node key `name` to get. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. """ result_types = { "200": responses.OrchestrationClustersNodesGet, "400": response.Error, "401": response.Error, "404": response.Error, "500": response.Error, } _request = cls._prepare_request( method="GET", path="/api/v4/orchestration/clusters/{cluster_name}/nodes/{node_name}/get", values={"cluster_name": cluster_name, "node_name": node_name}, params={}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) return cls._process_response( response=_response, result_types=result_types, result_type=responses.OrchestrationClustersNodesGet, )
@overload @classmethod async def get_orchestration_clusters_provision( cls, cluster_name: str, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod async def get_orchestration_clusters_provision( cls, cluster_name: str, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, ) -> Response: ... @overload @classmethod async def get_orchestration_clusters_provision( cls, cluster_name: str, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> str: ...
[docs] @classmethod async def get_orchestration_clusters_provision( cls, cluster_name: str, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[str, dict[str, object], Response]: """Get Clusters Provision YAML Specifications **Permission Required:** `kelvin.permission.cluster.read`. ``getOrchestrationClustersProvision``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/provision/get`` Args: cluster_name : :obj:`str`, optional Cluster key `name` to retrieve provision yaml file. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. """ result_types = { "200": str, "400": response.Error, "401": response.Error, "404": response.Error, "500": response.Error, } _request = cls._prepare_request( method="GET", path="/api/v4/orchestration/clusters/{cluster_name}/provision/get", values={"cluster_name": cluster_name}, params={}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) result = cls._process_str_response(response=_response) if result is None: raise ResponseError( f"Unexpected empty response", _response, ) return result
@overload @classmethod async def list_orchestration_clusters_service( cls, cluster_name: str, search: Optional[Sequence[str]] = None, workload_name: Optional[Sequence[str]] = None, service_type: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod async def list_orchestration_clusters_service( cls, cluster_name: str, search: Optional[Sequence[str]] = None, workload_name: Optional[Sequence[str]] = None, service_type: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, ) -> Response: ... @overload @classmethod async def list_orchestration_clusters_service( cls, cluster_name: str, search: Optional[Sequence[str]] = None, workload_name: Optional[Sequence[str]] = None, service_type: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, *, fetch: Literal[False], _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[ responses.OrchestrationClustersServiceListPaginatedResponseCursor, responses.OrchestrationClustersServiceListPaginatedResponseLimits, ]: ... @overload @classmethod async def list_orchestration_clusters_service( cls, cluster_name: str, search: Optional[Sequence[str]] = None, workload_name: Optional[Sequence[str]] = None, service_type: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: Literal[True] = True, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, ) -> KList[responses.ServiceItem]: ...
[docs] @classmethod async def list_orchestration_clusters_service( cls, cluster_name: str, search: Optional[Sequence[str]] = None, workload_name: Optional[Sequence[str]] = None, service_type: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, nulls: Optional[Literal["first", "last"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, ) -> Union[ Union[ KList[responses.ServiceItem], responses.OrchestrationClustersServiceListPaginatedResponseCursor, responses.OrchestrationClustersServiceListPaginatedResponseLimits, ], dict[str, object], Response, ]: """Returns a list of Service objects in a Cluster. The list can be optionally filtered and sorted on the server before being returned. **Permission Required:** `kelvin.permission.cluster.read`. ``listOrchestrationClustersService``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/services/list`` Args: cluster_name : :obj:`str`, optional Cluster key `name` containing the Services to list. The filter is on the full name only. Can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. search : :obj:`Sequence[str]` Search and filter on the list based on any of the Service keys `name`, `workload_name`, `network_interface`, `address`, and `service_type`. The search is case insensitive and will find partial matches as well. All strings in the array are treated as `OR`. workload_name : :obj:`Sequence[str]` A filter on the list based on the Workload key `name`. The filter is on the full name only. All strings in the array are treated as `OR`. service_type : :obj:`Sequence[str]` A filter on the list based on the Service Type key `service_type`. The filter is on the full name only. All strings in the array are treated as `OR`. Options are `cluster_ip`, `node_port` and `host_port`. pagination_type : :obj:`Literal['limits', 'cursor', 'stream']` Method of pagination to use for return results where `total_items` is greater than `page_size`. `cursor` and `limits` will return one `page` of results, `stream` will return all results. ('limits', 'cursor', 'stream') page_size : :obj:`int` Number of objects to be returned in each page. Page size can range between 1 and 10000 objects. page : :obj:`int` An integer for the wanted page of results. Used only with `pagination_type` set as `limits`. next : :obj:`str` An alphanumeric string bookmark to indicate where to start for the next page. Used only with `pagination_type` set as `cursor`. previous : :obj:`str` An alphanumeric string bookmark to indicate where to end for the previous page. Used only with `pagination_type` set as `cursor`. direction : :obj:`Literal['asc', 'desc']` Sorting order according to the `sort_by` parameter. ('asc', 'desc') nulls : :obj:`Literal['first', 'last']` Null ordering according to the `sort_by` parameter. Defaults to `first` for ascending order and `last` for descending order. ('first', 'last') sort_by : :obj:`Sequence[str]` Sort the results by one of the Cluster parameters. Only one parameter can be selected. Options: `name`, `workload_name`, `network_interface`, `service_type`, `address`, `created` and `updated`. """ result_types = { "200": responses.OrchestrationClustersServiceListPaginatedResponseCursor, "400": response.Error, "401": response.Error, "500": response.Error, } # override pagination_type # stream type is only supported for raw responses if not _get_response and pagination_type == "stream": pagination_type = "cursor" _request = cls._prepare_request( method="GET", path="/api/v4/orchestration/clusters/{cluster_name}/services/list", values={"cluster_name": cluster_name}, params={ "search": search, "workload_name": workload_name, "service_type": service_type, "pagination_type": pagination_type, "page_size": page_size, "page": page, "next": next, "previous": previous, "direction": direction, "nulls": nulls, "sort_by": sort_by, }, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) if pagination_type == "limits": result = cls._process_response( response=_response, result_types=result_types, result_type=responses.OrchestrationClustersServiceListPaginatedResponseLimits, ) else: # default pagination_type is cursor result = cls._process_response( response=_response, result_types=result_types, result_type=responses.OrchestrationClustersServiceListPaginatedResponseCursor, ) if fetch: return await cls._fetch_pages( client=_client, path=_request.path, api_response=result, method="GET", params=_request.params, data=_request.data, result_types=result_types, error_type=response.Error, response_type=KList[responses.ServiceItem], ) return result
@overload @classmethod async def update_orchestration_clusters( cls, cluster_name: str, data: Optional[Union[requests.OrchestrationClustersUpdate, Mapping[str, object]]] = None, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, **kwargs: object, ) -> dict[str, object]: ... @overload @classmethod async def update_orchestration_clusters( cls, cluster_name: str, data: Optional[Union[requests.OrchestrationClustersUpdate, Mapping[str, object]]] = None, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[AsyncBaseClient] = None, **kwargs: object, ) -> Response: ... @overload @classmethod async def update_orchestration_clusters( cls, cluster_name: str, data: Optional[Union[requests.OrchestrationClustersUpdate, Mapping[str, object]]] = None, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[AsyncBaseClient] = None, **kwargs: object, ) -> responses.OrchestrationClustersUpdate: ...
[docs] @classmethod async def update_orchestration_clusters( cls, cluster_name: str, data: Optional[Union[requests.OrchestrationClustersUpdate, Mapping[str, object]]] = None, _dry_run: bool = False, _get_response: bool = False, _client: Optional[AsyncBaseClient] = None, **kwargs: object, ) -> Union[responses.OrchestrationClustersUpdate, dict[str, object], Response]: """Update the configuration settings of the Cluster. Only key/values provided in the request body will be updated, all other settings will remain unchanged. **Permission Required:** `kelvin.permission.cluster.update`. ``updateOrchestrationClusters``: ``POST`` ``/api/v4/orchestration/clusters/{cluster_name}/update`` Args: cluster_name : :obj:`str`, optional Cluster key `name` target for sending parameter updates. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. data: requests.OrchestrationClustersUpdate, optional **kwargs: Extra parameters for requests.OrchestrationClustersUpdate - update_orchestration_clusters: dict """ result_types = { "200": responses.OrchestrationClustersUpdate, "400": response.Error, "401": response.Error, "404": response.Error, "409": response.Error, "500": response.Error, } _request = cls._prepare_request( method="POST", path="/api/v4/orchestration/clusters/{cluster_name}/update", values={"cluster_name": cluster_name}, params={}, files={}, headers={}, data=data, body_type=requests.OrchestrationClustersUpdate, array_body=False, **kwargs, ) if _dry_run: return _request.to_dict() _response = await cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) return cls._process_response( response=_response, result_types=result_types, result_type=responses.OrchestrationClustersUpdate, )