# Code generated by builder. DO NOT EDIT.
"""
Kelvin API Client.
"""
from __future__ import annotations
from collections.abc import AsyncIterator, Mapping, Sequence
from typing import Optional, Union, overload
from typing_extensions import Literal
from kelvin.api.base.api_service_model import AsyncApiServiceModel
from kelvin.api.base.data_model import KList
from kelvin.api.base.error import ResponseError
from kelvin.api.base.http_client.base_client import AsyncBaseClient, Response
from ..model import requests, response, responses
[docs]
class Orchestration(AsyncApiServiceModel):
@overload
@classmethod
async def create_orchestration_clusters(
cls,
data: Optional[Union[requests.OrchestrationClustersCreate, Mapping[str, object]]] = None,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
**kwargs: object,
) -> dict[str, object]: ...
@overload
@classmethod
async def create_orchestration_clusters(
cls,
data: Optional[Union[requests.OrchestrationClustersCreate, Mapping[str, object]]] = None,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
**kwargs: object,
) -> Response: ...
@overload
@classmethod
async def create_orchestration_clusters(
cls,
data: Optional[Union[requests.OrchestrationClustersCreate, Mapping[str, object]]] = None,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
**kwargs: object,
) -> responses.OrchestrationClustersCreate: ...
[docs]
@classmethod
async def create_orchestration_clusters(
cls,
data: Optional[Union[requests.OrchestrationClustersCreate, Mapping[str, object]]] = None,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
**kwargs: object,
) -> Union[responses.OrchestrationClustersCreate, dict[str, object], Response]:
"""Create a new Cluster. This only creates the Cloud registration, the actual installation still needs to be manually performed on the server/kubernetes cluster. The provision script to run locally on the server for `type` registered as `k3s` or on an existing kubernetes cluster for `type` registered as `kubernetes` will be in the key `provision_script ` in the 201 response body.
**Permission Required:** `kelvin.permission.cluster.create`.
``createOrchestrationClusters``: ``POST`` ``/api/v4/orchestration/clusters/create``
Args:
data: requests.OrchestrationClustersCreate, optional
**kwargs:
Extra parameters for requests.OrchestrationClustersCreate
- create_orchestration_clusters: dict
"""
result_types = {
"201": responses.OrchestrationClustersCreate,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"409": response.Error,
"500": response.Error,
}
_request = cls._prepare_request(
method="POST",
path="/api/v4/orchestration/clusters/create",
values={},
params={},
files={},
headers={},
data=data,
body_type=requests.OrchestrationClustersCreate,
array_body=False,
**kwargs,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
return cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.OrchestrationClustersCreate,
)
@overload
@classmethod
async def list_orchestration_clusters(
cls,
names: Optional[Sequence[str]] = None,
search: Optional[Sequence[str]] = None,
type: Optional[Sequence[str]] = None,
ready: Optional[bool] = None,
status: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
async def list_orchestration_clusters(
cls,
names: Optional[Sequence[str]] = None,
search: Optional[Sequence[str]] = None,
type: Optional[Sequence[str]] = None,
ready: Optional[bool] = None,
status: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
async def list_orchestration_clusters(
cls,
names: Optional[Sequence[str]] = None,
search: Optional[Sequence[str]] = None,
type: Optional[Sequence[str]] = None,
ready: Optional[bool] = None,
status: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
*,
fetch: Literal[False],
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[
responses.OrchestrationClustersListPaginatedResponseCursor,
responses.OrchestrationClustersListPaginatedResponseLimits,
]: ...
@overload
@classmethod
async def list_orchestration_clusters(
cls,
names: Optional[Sequence[str]] = None,
search: Optional[Sequence[str]] = None,
type: Optional[Sequence[str]] = None,
ready: Optional[bool] = None,
status: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: Literal[True] = True,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> KList[responses.OrchestrationClustersCreateItem]: ...
[docs]
@classmethod
async def list_orchestration_clusters(
cls,
names: Optional[Sequence[str]] = None,
search: Optional[Sequence[str]] = None,
type: Optional[Sequence[str]] = None,
ready: Optional[bool] = None,
status: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[
Union[
KList[responses.OrchestrationClustersCreateItem],
responses.OrchestrationClustersListPaginatedResponseCursor,
responses.OrchestrationClustersListPaginatedResponseLimits,
],
dict[str, object],
Response,
]:
"""Returns a list of Cluster objects. The list can be optionally filtered and sorted on the server before being returned.
**Permission Required:** `kelvin.permission.cluster.read`.
``listOrchestrationClusters``: ``GET`` ``/api/v4/orchestration/clusters/list``
Args:
names : :obj:`Sequence[str]`
A filter on the list based on the Cluster key `name`. The filter is on
the full name only. All strings in the array are treated as `OR`. Can
only contain lowercase alphanumeric characters and `.`, `_` or `-`
characters.
search : :obj:`Sequence[str]`
Search and filter on the list based on the Cluster keys `title`
(Display Name) or `name`. The search is case insensitive and will find
partial matches as well.
type : :obj:`Sequence[str]`
A filter on the list based on the Cluster key `type`. The filter is on
the full name only. All strings in the array are treated as `OR`.
Options are `k3s` and `kubernetes`
ready : :obj:`bool`
A filter on the list based on the Cluster key `ready`. Options are
`true` and `false`
status : :obj:`Sequence[str]`
A filter on the list based on the Cluster key `status`. The filter is
on the full name only. All strings in the array are treated as `OR`.
Options are `pending_provision`, `pending`, `online`, `unreachable`
and `requires_attention`
pagination_type : :obj:`Literal['limits', 'cursor', 'stream']`
Method of pagination to use for return results where `total_items` is
greater than `page_size`. `cursor` and `limits` will return one `page`
of results, `stream` will return all results. ('limits', 'cursor',
'stream')
page_size : :obj:`int`
Number of objects to be returned in each page. Page size can range
between 1 and 10000 objects.
page : :obj:`int`
An integer for the wanted page of results. Used only with
`pagination_type` set as `limits`.
next : :obj:`str`
An alphanumeric string bookmark to indicate where to start for the
next page. Used only with `pagination_type` set as `cursor`.
previous : :obj:`str`
An alphanumeric string bookmark to indicate where to end for the
previous page. Used only with `pagination_type` set as `cursor`.
direction : :obj:`Literal['asc', 'desc']`
Sorting order according to the `sort_by` parameter. ('asc', 'desc')
nulls : :obj:`Literal['first', 'last']`
Null ordering according to the `sort_by` parameter. Defaults to
`first` for ascending order and `last` for descending order. ('first',
'last')
sort_by : :obj:`Sequence[str]`
Sort the results by one of the Cluster parameters. Only one parameter
can be selected. Options: `name`, `title`, `ready`, `type`, `status`,
`last_seen`, `created`, `updated`, `kelvin_version`,
`container_version`.
"""
result_types = {
"200": responses.OrchestrationClustersListPaginatedResponseCursor,
"400": response.Error,
"401": response.Error,
"500": response.Error,
}
# override pagination_type
# stream type is only supported for raw responses
if not _get_response and pagination_type == "stream":
pagination_type = "cursor"
_request = cls._prepare_request(
method="GET",
path="/api/v4/orchestration/clusters/list",
values={},
params={
"names": names,
"search": search,
"type": type,
"ready": ready,
"status": status,
"pagination_type": pagination_type,
"page_size": page_size,
"page": page,
"next": next,
"previous": previous,
"direction": direction,
"nulls": nulls,
"sort_by": sort_by,
},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
if pagination_type == "limits":
result = cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.OrchestrationClustersListPaginatedResponseLimits,
)
else: # default pagination_type is cursor
result = cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.OrchestrationClustersListPaginatedResponseCursor,
)
if fetch:
return await cls._fetch_pages(
client=_client,
path=_request.path,
api_response=result,
method="GET",
params=_request.params,
data=_request.data,
result_types=result_types,
error_type=response.Error,
response_type=KList[responses.OrchestrationClustersCreateItem],
)
return result
@overload
@classmethod
async def download_orchestration_cluster_provision_binary(
cls,
arch: Optional[Literal["amd64", "arm64", "armv7"]] = None,
type: Optional[Literal["kubernetes", "docker"]] = None,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
async def download_orchestration_cluster_provision_binary(
cls,
arch: Optional[Literal["amd64", "arm64", "armv7"]] = None,
type: Optional[Literal["kubernetes", "docker"]] = None,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
async def download_orchestration_cluster_provision_binary(
cls,
arch: Optional[Literal["amd64", "arm64", "armv7"]] = None,
type: Optional[Literal["kubernetes", "docker"]] = None,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> AsyncIterator[bytes]: ...
[docs]
@classmethod
async def download_orchestration_cluster_provision_binary(
cls,
arch: Optional[Literal["amd64", "arm64", "armv7"]] = None,
type: Optional[Literal["kubernetes", "docker"]] = None,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[AsyncIterator[bytes], dict[str, object], Response]:
"""Download Cluster Provision Binary
**Permission Required:** `kelvin.permission.cluster.read`.
``downloadOrchestrationClusterProvisionBinary``: ``GET`` ``/api/v4/orchestration/clusters/provision/bin/download``
Args:
arch : :obj:`Literal['amd64', 'arm64', 'armv7']`
Architecture of the binary to download. Options are `amd64`, `arm64`,
`armv7` (default: `amd64`) ('amd64', 'arm64', 'armv7')
type : :obj:`Literal['kubernetes', 'docker']`
Type of the binary to download. Options are `kubernetes`, `docker`
(default: `kubernetes`) ('kubernetes', 'docker')
"""
result_types = {"200": bytes, "400": response.Error, "401": response.Error, "500": response.Error}
_request = cls._prepare_request(
method="GET",
path="/api/v4/orchestration/clusters/provision/bin/download",
values={},
params={"arch": arch, "type": type},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=True,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
result = await cls._process_response_stream_bytes(response=_response)
if result is None:
raise ResponseError(
f"Unexpected empty value or invalid content-type response",
_response,
)
return result
@overload
@classmethod
async def delete_orchestration_clusters(
cls,
cluster_name: str,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
async def delete_orchestration_clusters(
cls,
cluster_name: str,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
async def delete_orchestration_clusters(
cls,
cluster_name: str,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> None: ...
[docs]
@classmethod
async def delete_orchestration_clusters(
cls,
cluster_name: str,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> Optional[Union[dict[str, object], Response]]:
"""Permanently delete an existing Cluster and its Nodes. This will also delete Workloads, Bridges and Services residing on the Cluster. This cannot be undone once the API request has been submitted.
**Permission Required:** `kelvin.permission.cluster.delete`.
``deleteOrchestrationClusters``: ``POST`` ``/api/v4/orchestration/clusters/{cluster_name}/delete``
Args:
cluster_name : :obj:`str`, optional
Cluster key `name` to delete. The string can only contain lowercase
alphanumeric characters and `.`, `_` or `-` characters.
"""
result_types = {
"200": None,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"500": response.Error,
}
_request = cls._prepare_request(
method="POST",
path="/api/v4/orchestration/clusters/{cluster_name}/delete",
values={"cluster_name": cluster_name},
params={},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
@overload
@classmethod
async def apply_orchestration_clusters_edge_apps_version(
cls,
cluster_name: str,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
async def apply_orchestration_clusters_edge_apps_version(
cls,
cluster_name: str,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
async def apply_orchestration_clusters_edge_apps_version(
cls,
cluster_name: str,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> str: ...
[docs]
@classmethod
async def apply_orchestration_clusters_edge_apps_version(
cls,
cluster_name: str,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[str, dict[str, object], Response]:
"""Initiates available cluster upgrades; requires cluster to be online and ready.
**Permission Required:** `kelvin.permission.cluster.update`.
``applyOrchestrationClustersEdgeAppsVersion``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/edge-apps/version/apply``
Args:
cluster_name : :obj:`str`, optional
Cluster key `name` to initiate upgrades. The string can only contain
lowercase alphanumeric characters and `.`, `_` or `-` characters.
"""
result_types = {
"200": str,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"406": response.Error,
"500": response.Error,
}
_request = cls._prepare_request(
method="GET",
path="/api/v4/orchestration/clusters/{cluster_name}/edge-apps/version/apply",
values={"cluster_name": cluster_name},
params={},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
result = cls._process_str_response(response=_response)
if result is None:
raise ResponseError(
f"Unexpected empty response",
_response,
)
return result
@overload
@classmethod
async def update_orchestration_clusters_edge_apps_version_force(
cls,
cluster_name: str,
persist: Optional[bool] = None,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
async def update_orchestration_clusters_edge_apps_version_force(
cls,
cluster_name: str,
persist: Optional[bool] = None,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
async def update_orchestration_clusters_edge_apps_version_force(
cls,
cluster_name: str,
persist: Optional[bool] = None,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> str: ...
[docs]
@classmethod
async def update_orchestration_clusters_edge_apps_version_force(
cls,
cluster_name: str,
persist: Optional[bool] = None,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[str, dict[str, object], Response]:
"""Force available updates to Cluster even if update setting is disabled. Optionally keep a force update action on standby if the Cluster is offline. This ensures the update is applied once the Cluster returns online.
**Permission Required:** `kelvin.permission.cluster.update`.
``updateOrchestrationClustersEdgeAppsVersionForce``: ``POST`` ``/api/v4/orchestration/clusters/{cluster_name}/edge-apps/version/force-update``
Args:
cluster_name : :obj:`str`, optional
Cluster key `name` to initiate upgrades. The string can only contain
lowercase alphanumeric characters and `.`, `_` or `-` characters.
persist : :obj:`bool`
Optional setting to wait if the Cluster is currently unreachable and
force the update when the Cluster is next online.
"""
result_types = {
"200": str,
"202": str,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"500": response.Error,
}
_request = cls._prepare_request(
method="POST",
path="/api/v4/orchestration/clusters/{cluster_name}/edge-apps/version/force-update",
values={"cluster_name": cluster_name},
params={"persist": persist},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
result = cls._process_str_response(response=_response)
if result is None:
raise ResponseError(
f"Unexpected empty response",
_response,
)
return result
@overload
@classmethod
async def get_orchestration_clusters(
cls,
cluster_name: str,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
async def get_orchestration_clusters(
cls,
cluster_name: str,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
async def get_orchestration_clusters(
cls,
cluster_name: str,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> responses.OrchestrationClustersGet: ...
[docs]
@classmethod
async def get_orchestration_clusters(
cls,
cluster_name: str,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[responses.OrchestrationClustersGet, dict[str, object], Response]:
"""Retrieve the parameters and status of a Cluster.
**Permission Required:** `kelvin.permission.cluster.read`.
``getOrchestrationClusters``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/get``
Args:
cluster_name : :obj:`str`, optional
Cluster key `name` to get. The string can only contain lowercase
alphanumeric characters and `.`, `_` or `-` characters.
"""
result_types = {
"200": responses.OrchestrationClustersGet,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"500": response.Error,
}
_request = cls._prepare_request(
method="GET",
path="/api/v4/orchestration/clusters/{cluster_name}/get",
values={"cluster_name": cluster_name},
params={},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
return cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.OrchestrationClustersGet,
)
@overload
@classmethod
async def get_orchestration_clusters_manifests(
cls,
cluster_name: str,
version: str,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
async def get_orchestration_clusters_manifests(
cls,
cluster_name: str,
version: str,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
async def get_orchestration_clusters_manifests(
cls,
cluster_name: str,
version: str,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> responses.OrchestrationClustersManifestsGet: ...
[docs]
@classmethod
async def get_orchestration_clusters_manifests(
cls,
cluster_name: str,
version: str,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> Optional[Union[responses.OrchestrationClustersManifestsGet, dict[str, object], Response]]:
"""Get Cluster Manifests
**Permission Required:** `kelvin.permission.cluster.read`.
``getOrchestrationClustersManifests``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/manifests/get``
Args:
cluster_name : :obj:`str`, optional
Cluster key `name` to retrieve provision yaml file. The string can
only contain lowercase alphanumeric characters and `.`, `_` or `-`
characters.
version : :obj:`str`, optional
Current version of the key `kelvin_version` in `version` object of the
Cluster parameters.
"""
result_types = {
"200": responses.OrchestrationClustersManifestsGet,
"204": None,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"500": response.Error,
}
_request = cls._prepare_request(
method="GET",
path="/api/v4/orchestration/clusters/{cluster_name}/manifests/get",
values={"cluster_name": cluster_name},
params={"version": version},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
return cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.OrchestrationClustersManifestsGet,
)
@overload
@classmethod
async def list_orchestration_clusters_node(
cls,
cluster_name: str,
search: Optional[Sequence[str]] = None,
status: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
async def list_orchestration_clusters_node(
cls,
cluster_name: str,
search: Optional[Sequence[str]] = None,
status: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
async def list_orchestration_clusters_node(
cls,
cluster_name: str,
search: Optional[Sequence[str]] = None,
status: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
*,
fetch: Literal[False],
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[
responses.OrchestrationClustersNodeListPaginatedResponseCursor,
responses.OrchestrationClustersNodeListPaginatedResponseLimits,
]: ...
@overload
@classmethod
async def list_orchestration_clusters_node(
cls,
cluster_name: str,
search: Optional[Sequence[str]] = None,
status: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: Literal[True] = True,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> KList[responses.OrchestrationClustersNodesGetItem]: ...
[docs]
@classmethod
async def list_orchestration_clusters_node(
cls,
cluster_name: str,
search: Optional[Sequence[str]] = None,
status: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[
Union[
KList[responses.OrchestrationClustersNodesGetItem],
responses.OrchestrationClustersNodeListPaginatedResponseCursor,
responses.OrchestrationClustersNodeListPaginatedResponseLimits,
],
dict[str, object],
Response,
]:
"""Returns a list of Node objects in a Cluster. The list can be optionally filtered and sorted on the server before being returned.
**Permission Required:** `kelvin.permission.cluster.read`.
``listOrchestrationClustersNode``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/nodes/list``
Args:
cluster_name : :obj:`str`, optional
Cluster key `name` containing the Nodes to list. The filter is on the
full name only. Can only contain lowercase alphanumeric characters and
`.`, `_` or `-` characters.
search : :obj:`Sequence[str]`
Search and filter on the list based on the Node `name`. The search is
case insensitive and will find partial matches as well.
status : :obj:`Sequence[str]`
A filter on the list based on the Node key `status`. The filter is on
the full name only. All strings in the array are treated as `OR`.
Options are `online`, `unreachable` and `not_ready`
pagination_type : :obj:`Literal['limits', 'cursor', 'stream']`
Method of pagination to use for return results where `total_items` is
greater than `page_size`. `cursor` and `limits` will return one `page`
of results, `stream` will return all results. ('limits', 'cursor',
'stream')
page_size : :obj:`int`
Number of objects to be returned in each page. Page size can range
between 1 and 10000 objects.
page : :obj:`int`
An integer for the wanted page of results. Used only with
`pagination_type` set as `limits`.
next : :obj:`str`
An alphanumeric string bookmark to indicate where to start for the
next page. Used only with `pagination_type` set as `cursor`.
previous : :obj:`str`
An alphanumeric string bookmark to indicate where to end for the
previous page. Used only with `pagination_type` set as `cursor`.
direction : :obj:`Literal['asc', 'desc']`
Sorting order according to the `sort_by` parameter. ('asc', 'desc')
nulls : :obj:`Literal['first', 'last']`
Null ordering according to the `sort_by` parameter. Defaults to
`first` for ascending order and `last` for descending order. ('first',
'last')
sort_by : :obj:`Sequence[str]`
Sort the results by one of the Cluster parameters. Only one parameter
can be selected. Options: `id`, `name`, `status`, `last_seen`,
`created` and `updated`.
"""
result_types = {
"200": responses.OrchestrationClustersNodeListPaginatedResponseCursor,
"400": response.Error,
"401": response.Error,
"500": response.Error,
}
# override pagination_type
# stream type is only supported for raw responses
if not _get_response and pagination_type == "stream":
pagination_type = "cursor"
_request = cls._prepare_request(
method="GET",
path="/api/v4/orchestration/clusters/{cluster_name}/nodes/list",
values={"cluster_name": cluster_name},
params={
"search": search,
"status": status,
"pagination_type": pagination_type,
"page_size": page_size,
"page": page,
"next": next,
"previous": previous,
"direction": direction,
"nulls": nulls,
"sort_by": sort_by,
},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
if pagination_type == "limits":
result = cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.OrchestrationClustersNodeListPaginatedResponseLimits,
)
else: # default pagination_type is cursor
result = cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.OrchestrationClustersNodeListPaginatedResponseCursor,
)
if fetch:
return await cls._fetch_pages(
client=_client,
path=_request.path,
api_response=result,
method="GET",
params=_request.params,
data=_request.data,
result_types=result_types,
error_type=response.Error,
response_type=KList[responses.OrchestrationClustersNodesGetItem],
)
return result
@overload
@classmethod
async def get_orchestration_clusters_nodes(
cls,
cluster_name: str,
node_name: str,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
async def get_orchestration_clusters_nodes(
cls,
cluster_name: str,
node_name: str,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
async def get_orchestration_clusters_nodes(
cls,
cluster_name: str,
node_name: str,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> responses.OrchestrationClustersNodesGet: ...
[docs]
@classmethod
async def get_orchestration_clusters_nodes(
cls,
cluster_name: str,
node_name: str,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[responses.OrchestrationClustersNodesGet, dict[str, object], Response]:
"""Retrieve the parameters and status of a specific Node on a Cluster.
**Permission Required:** `kelvin.permission.cluster.read`.
``getOrchestrationClustersNodes``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/nodes/{node_name}/get``
Args:
cluster_name : :obj:`str`, optional
Cluster key `name` to look for Node. The string can only contain
lowercase alphanumeric characters and `.`, `_` or `-` characters.
node_name : :obj:`str`, optional
Node key `name` to get. The string can only contain lowercase
alphanumeric characters and `.`, `_` or `-` characters.
"""
result_types = {
"200": responses.OrchestrationClustersNodesGet,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"500": response.Error,
}
_request = cls._prepare_request(
method="GET",
path="/api/v4/orchestration/clusters/{cluster_name}/nodes/{node_name}/get",
values={"cluster_name": cluster_name, "node_name": node_name},
params={},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
return cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.OrchestrationClustersNodesGet,
)
@overload
@classmethod
async def get_orchestration_clusters_provision(
cls,
cluster_name: str,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
async def get_orchestration_clusters_provision(
cls,
cluster_name: str,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
async def get_orchestration_clusters_provision(
cls,
cluster_name: str,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> str: ...
[docs]
@classmethod
async def get_orchestration_clusters_provision(
cls,
cluster_name: str,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[str, dict[str, object], Response]:
"""Get Clusters Provision YAML Specifications
**Permission Required:** `kelvin.permission.cluster.read`.
``getOrchestrationClustersProvision``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/provision/get``
Args:
cluster_name : :obj:`str`, optional
Cluster key `name` to retrieve provision yaml file. The string can
only contain lowercase alphanumeric characters and `.`, `_` or `-`
characters.
"""
result_types = {
"200": str,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"500": response.Error,
}
_request = cls._prepare_request(
method="GET",
path="/api/v4/orchestration/clusters/{cluster_name}/provision/get",
values={"cluster_name": cluster_name},
params={},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
result = cls._process_str_response(response=_response)
if result is None:
raise ResponseError(
f"Unexpected empty response",
_response,
)
return result
@overload
@classmethod
async def list_orchestration_clusters_service(
cls,
cluster_name: str,
search: Optional[Sequence[str]] = None,
workload_name: Optional[Sequence[str]] = None,
service_type: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
async def list_orchestration_clusters_service(
cls,
cluster_name: str,
search: Optional[Sequence[str]] = None,
workload_name: Optional[Sequence[str]] = None,
service_type: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
async def list_orchestration_clusters_service(
cls,
cluster_name: str,
search: Optional[Sequence[str]] = None,
workload_name: Optional[Sequence[str]] = None,
service_type: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
*,
fetch: Literal[False],
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[
responses.OrchestrationClustersServiceListPaginatedResponseCursor,
responses.OrchestrationClustersServiceListPaginatedResponseLimits,
]: ...
@overload
@classmethod
async def list_orchestration_clusters_service(
cls,
cluster_name: str,
search: Optional[Sequence[str]] = None,
workload_name: Optional[Sequence[str]] = None,
service_type: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: Literal[True] = True,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
) -> KList[responses.ServiceItem]: ...
[docs]
@classmethod
async def list_orchestration_clusters_service(
cls,
cluster_name: str,
search: Optional[Sequence[str]] = None,
workload_name: Optional[Sequence[str]] = None,
service_type: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
nulls: Optional[Literal["first", "last"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
) -> Union[
Union[
KList[responses.ServiceItem],
responses.OrchestrationClustersServiceListPaginatedResponseCursor,
responses.OrchestrationClustersServiceListPaginatedResponseLimits,
],
dict[str, object],
Response,
]:
"""Returns a list of Service objects in a Cluster. The list can be optionally filtered and sorted on the server before being returned.
**Permission Required:** `kelvin.permission.cluster.read`.
``listOrchestrationClustersService``: ``GET`` ``/api/v4/orchestration/clusters/{cluster_name}/services/list``
Args:
cluster_name : :obj:`str`, optional
Cluster key `name` containing the Services to list. The filter is on
the full name only. Can only contain lowercase alphanumeric characters
and `.`, `_` or `-` characters.
search : :obj:`Sequence[str]`
Search and filter on the list based on any of the Service keys `name`,
`workload_name`, `network_interface`, `address`, and `service_type`.
The search is case insensitive and will find partial matches as well.
All strings in the array are treated as `OR`.
workload_name : :obj:`Sequence[str]`
A filter on the list based on the Workload key `name`. The filter is
on the full name only. All strings in the array are treated as `OR`.
service_type : :obj:`Sequence[str]`
A filter on the list based on the Service Type key `service_type`. The
filter is on the full name only. All strings in the array are treated
as `OR`. Options are `cluster_ip`, `node_port` and `host_port`.
pagination_type : :obj:`Literal['limits', 'cursor', 'stream']`
Method of pagination to use for return results where `total_items` is
greater than `page_size`. `cursor` and `limits` will return one `page`
of results, `stream` will return all results. ('limits', 'cursor',
'stream')
page_size : :obj:`int`
Number of objects to be returned in each page. Page size can range
between 1 and 10000 objects.
page : :obj:`int`
An integer for the wanted page of results. Used only with
`pagination_type` set as `limits`.
next : :obj:`str`
An alphanumeric string bookmark to indicate where to start for the
next page. Used only with `pagination_type` set as `cursor`.
previous : :obj:`str`
An alphanumeric string bookmark to indicate where to end for the
previous page. Used only with `pagination_type` set as `cursor`.
direction : :obj:`Literal['asc', 'desc']`
Sorting order according to the `sort_by` parameter. ('asc', 'desc')
nulls : :obj:`Literal['first', 'last']`
Null ordering according to the `sort_by` parameter. Defaults to
`first` for ascending order and `last` for descending order. ('first',
'last')
sort_by : :obj:`Sequence[str]`
Sort the results by one of the Cluster parameters. Only one parameter
can be selected. Options: `name`, `workload_name`,
`network_interface`, `service_type`, `address`, `created` and
`updated`.
"""
result_types = {
"200": responses.OrchestrationClustersServiceListPaginatedResponseCursor,
"400": response.Error,
"401": response.Error,
"500": response.Error,
}
# override pagination_type
# stream type is only supported for raw responses
if not _get_response and pagination_type == "stream":
pagination_type = "cursor"
_request = cls._prepare_request(
method="GET",
path="/api/v4/orchestration/clusters/{cluster_name}/services/list",
values={"cluster_name": cluster_name},
params={
"search": search,
"workload_name": workload_name,
"service_type": service_type,
"pagination_type": pagination_type,
"page_size": page_size,
"page": page,
"next": next,
"previous": previous,
"direction": direction,
"nulls": nulls,
"sort_by": sort_by,
},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
if pagination_type == "limits":
result = cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.OrchestrationClustersServiceListPaginatedResponseLimits,
)
else: # default pagination_type is cursor
result = cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.OrchestrationClustersServiceListPaginatedResponseCursor,
)
if fetch:
return await cls._fetch_pages(
client=_client,
path=_request.path,
api_response=result,
method="GET",
params=_request.params,
data=_request.data,
result_types=result_types,
error_type=response.Error,
response_type=KList[responses.ServiceItem],
)
return result
@overload
@classmethod
async def update_orchestration_clusters(
cls,
cluster_name: str,
data: Optional[Union[requests.OrchestrationClustersUpdate, Mapping[str, object]]] = None,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
**kwargs: object,
) -> dict[str, object]: ...
@overload
@classmethod
async def update_orchestration_clusters(
cls,
cluster_name: str,
data: Optional[Union[requests.OrchestrationClustersUpdate, Mapping[str, object]]] = None,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[AsyncBaseClient] = None,
**kwargs: object,
) -> Response: ...
@overload
@classmethod
async def update_orchestration_clusters(
cls,
cluster_name: str,
data: Optional[Union[requests.OrchestrationClustersUpdate, Mapping[str, object]]] = None,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[AsyncBaseClient] = None,
**kwargs: object,
) -> responses.OrchestrationClustersUpdate: ...
[docs]
@classmethod
async def update_orchestration_clusters(
cls,
cluster_name: str,
data: Optional[Union[requests.OrchestrationClustersUpdate, Mapping[str, object]]] = None,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[AsyncBaseClient] = None,
**kwargs: object,
) -> Union[responses.OrchestrationClustersUpdate, dict[str, object], Response]:
"""Update the configuration settings of the Cluster. Only key/values provided in the request body will be updated, all other settings will remain unchanged.
**Permission Required:** `kelvin.permission.cluster.update`.
``updateOrchestrationClusters``: ``POST`` ``/api/v4/orchestration/clusters/{cluster_name}/update``
Args:
cluster_name : :obj:`str`, optional
Cluster key `name` target for sending parameter updates. The string
can only contain lowercase alphanumeric characters and `.`, `_` or `-`
characters.
data: requests.OrchestrationClustersUpdate, optional
**kwargs:
Extra parameters for requests.OrchestrationClustersUpdate
- update_orchestration_clusters: dict
"""
result_types = {
"200": responses.OrchestrationClustersUpdate,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"409": response.Error,
"500": response.Error,
}
_request = cls._prepare_request(
method="POST",
path="/api/v4/orchestration/clusters/{cluster_name}/update",
values={"cluster_name": cluster_name},
params={},
files={},
headers={},
data=data,
body_type=requests.OrchestrationClustersUpdate,
array_body=False,
**kwargs,
)
if _dry_run:
return _request.to_dict()
_response = await cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
return cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.OrchestrationClustersUpdate,
)