# Code generated by builder. DO NOT EDIT.
"""
Kelvin API Client.
"""
from __future__ import annotations
from collections.abc import Mapping, Sequence
from datetime import datetime
from typing import Optional, Union, overload
from typing_extensions import Literal
from kelvin.api.base.api_service_model import ApiServiceModel
from kelvin.api.base.data_model import KList
from kelvin.api.base.http_client.base_client import Response, SyncBaseClient
from ..model import requests, response, responses
[docs]
class DeprecatedWorkload(ApiServiceModel):
@overload
@classmethod
def apply_legacy_workload(
cls,
data: Optional[Union[requests.LegacyWorkloadApply, Mapping[str, object]]] = None,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> dict[str, object]: ...
@overload
@classmethod
def apply_legacy_workload(
cls,
data: Optional[Union[requests.LegacyWorkloadApply, Mapping[str, object]]] = None,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> Response: ...
@overload
@classmethod
def apply_legacy_workload(
cls,
data: Optional[Union[requests.LegacyWorkloadApply, Mapping[str, object]]] = None,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> None: ...
[docs]
@classmethod
def apply_legacy_workload(
cls,
data: Optional[Union[requests.LegacyWorkloadApply, Mapping[str, object]]] = None,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> Optional[Union[dict[str, object], Response]]:
"""Initiate final deploy action for the downloaded Workloads. Only valid for Workloads that were previously deployed with the keys `staged` set to true and `instantly_apply` set to false.
**Permission Required:** `kelvin.permission.workload.update`.
``applyLegacyWorkload``: ``POST`` ``/api/v4/workloads/apply``
Args:
data: requests.LegacyWorkloadApply, optional
**kwargs:
Extra parameters for requests.LegacyWorkloadApply
- apply_legacy_workload: dict
"""
result_types = {
"200": None,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"406": response.Error,
"500": response.Error,
"501": None,
}
_request = cls._prepare_request(
method="POST",
path="/api/v4/workloads/apply",
values={},
params={},
files={},
headers={},
data=data,
body_type=requests.LegacyWorkloadApply,
array_body=False,
**kwargs,
)
if _dry_run:
return _request.to_dict()
_response = cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
@overload
@classmethod
def deploy_legacy_workload(
cls,
data: Optional[Union[requests.LegacyWorkloadDeploy, Mapping[str, object]]] = None,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> dict[str, object]: ...
@overload
@classmethod
def deploy_legacy_workload(
cls,
data: Optional[Union[requests.LegacyWorkloadDeploy, Mapping[str, object]]] = None,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> Response: ...
@overload
@classmethod
def deploy_legacy_workload(
cls,
data: Optional[Union[requests.LegacyWorkloadDeploy, Mapping[str, object]]] = None,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> responses.LegacyWorkloadDeploy: ...
[docs]
@classmethod
def deploy_legacy_workload(
cls,
data: Optional[Union[requests.LegacyWorkloadDeploy, Mapping[str, object]]] = None,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> Optional[Union[responses.LegacyWorkloadDeploy, dict[str, object], Response]]:
"""Deploy an App from the App Registry as a Workload to a Cluster/Node.
**Permission Required:** `kelvin.permission.workload.update`.
``deployLegacyWorkload``: ``POST`` ``/api/v4/workloads/deploy``
Args:
data: requests.LegacyWorkloadDeploy, optional
**kwargs:
Extra parameters for requests.LegacyWorkloadDeploy
- deploy_legacy_workload: dict
"""
result_types = {
"201": responses.LegacyWorkloadDeploy,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"409": response.Error,
"500": response.Error,
"501": None,
}
_request = cls._prepare_request(
method="POST",
path="/api/v4/workloads/deploy",
values={},
params={},
files={},
headers={},
data=data,
body_type=requests.LegacyWorkloadDeploy,
array_body=False,
**kwargs,
)
if _dry_run:
return _request.to_dict()
_response = cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
return cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.LegacyWorkloadDeploy,
)
@overload
@classmethod
def list_legacy_workloads(
cls,
search: Optional[Sequence[str]] = None,
app_name: Optional[Sequence[str]] = None,
app_version: Optional[Sequence[str]] = None,
acp_name: Optional[Sequence[str]] = None,
cluster_name: Optional[Sequence[str]] = None,
node_name: Optional[Sequence[str]] = None,
workload_name: Optional[Sequence[str]] = None,
enabled: Optional[bool] = None,
asset_name: Optional[str] = None,
staged: Optional[bool] = None,
download_statuses: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
def list_legacy_workloads(
cls,
search: Optional[Sequence[str]] = None,
app_name: Optional[Sequence[str]] = None,
app_version: Optional[Sequence[str]] = None,
acp_name: Optional[Sequence[str]] = None,
cluster_name: Optional[Sequence[str]] = None,
node_name: Optional[Sequence[str]] = None,
workload_name: Optional[Sequence[str]] = None,
enabled: Optional[bool] = None,
asset_name: Optional[str] = None,
staged: Optional[bool] = None,
download_statuses: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[SyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
def list_legacy_workloads(
cls,
search: Optional[Sequence[str]] = None,
app_name: Optional[Sequence[str]] = None,
app_version: Optional[Sequence[str]] = None,
acp_name: Optional[Sequence[str]] = None,
cluster_name: Optional[Sequence[str]] = None,
node_name: Optional[Sequence[str]] = None,
workload_name: Optional[Sequence[str]] = None,
enabled: Optional[bool] = None,
asset_name: Optional[str] = None,
staged: Optional[bool] = None,
download_statuses: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
sort_by: Optional[Sequence[str]] = None,
*,
fetch: Literal[False],
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
) -> Union[
responses.LegacyWorkloadsListPaginatedResponseCursor, responses.LegacyWorkloadsListPaginatedResponseLimits
]: ...
@overload
@classmethod
def list_legacy_workloads(
cls,
search: Optional[Sequence[str]] = None,
app_name: Optional[Sequence[str]] = None,
app_version: Optional[Sequence[str]] = None,
acp_name: Optional[Sequence[str]] = None,
cluster_name: Optional[Sequence[str]] = None,
node_name: Optional[Sequence[str]] = None,
workload_name: Optional[Sequence[str]] = None,
enabled: Optional[bool] = None,
asset_name: Optional[str] = None,
staged: Optional[bool] = None,
download_statuses: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: Literal[True] = True,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
) -> KList[responses.LegacyWorkloadItem]: ...
[docs]
@classmethod
def list_legacy_workloads(
cls,
search: Optional[Sequence[str]] = None,
app_name: Optional[Sequence[str]] = None,
app_version: Optional[Sequence[str]] = None,
acp_name: Optional[Sequence[str]] = None,
cluster_name: Optional[Sequence[str]] = None,
node_name: Optional[Sequence[str]] = None,
workload_name: Optional[Sequence[str]] = None,
enabled: Optional[bool] = None,
asset_name: Optional[str] = None,
staged: Optional[bool] = None,
download_statuses: Optional[Sequence[str]] = None,
pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None,
page_size: Optional[int] = 10000,
page: Optional[int] = None,
next: Optional[str] = None,
previous: Optional[str] = None,
direction: Optional[Literal["asc", "desc"]] = None,
sort_by: Optional[Sequence[str]] = None,
fetch: bool = True,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> Union[
Union[
KList[responses.LegacyWorkloadItem],
responses.LegacyWorkloadsListPaginatedResponseCursor,
responses.LegacyWorkloadsListPaginatedResponseLimits,
],
dict[str, object],
Response,
]:
"""Returns a list of Workload objects. The list can be optionally filtered and sorted on the server before being returned.
**Permission Required:** `kelvin.permission.workload.read`.
``listLegacyWorkloads``: ``GET`` ``/api/v4/workloads/list``
Args:
search : :obj:`Sequence[str]`
Search and filter on the list based on the keys `name`, `title`
(Display Name), `app_name`, `cluster_name` and `node_name`. All
strings in the array are treated as `OR`. The search is case
insensitive and will find partial matches as well.
app_name : :obj:`Sequence[str]`
A filter on the list based on the key `app_name`. The filter is on the
full name only. The string can only contain lowercase alphanumeric
characters and `.`, `_` or `-` characters.
app_version : :obj:`Sequence[str]`
A filter on the list based on the key `app_version`. The filter is on
the full value only. The string can only contain lowercase
alphanumeric characters and `.`, `_` or `-` characters.
acp_name : :obj:`Sequence[str]`
[`Deprecated`] A filter on the list based on the key `acp_name`. The
filter is on the full name only. The string can only contain lowercase
alphanumeric characters and `.`, `_` or `-` characters.
cluster_name : :obj:`Sequence[str]`
A filter on the list based on the key `cluster_name`. The filter is on
the full name only. The string can only contain lowercase alphanumeric
characters and `.`, `_` or `-` characters. If set, it will override
acp_name
node_name : :obj:`Sequence[str]`
A filter on the list based on the key `node_name`. The filter is on
the full name only. The string can only contain lowercase alphanumeric
characters and `.`, `_` or `-` characters.
workload_name : :obj:`Sequence[str]`
A filter on the list based on the key `name`. The filter is on the
full name only. The string can only contain lowercase alphanumeric
characters and `.`, `_` or `-` characters.
enabled : :obj:`bool`
A filter on the list based on the key `status` (start/stop function in
Kelvin UI) of the Workloads.
asset_name : :obj:`str`
A filter on the list based on Asset `name` associated with the
Workload. The filter is on the full name only. The string can only
contain lowercase alphanumeric characters and `.`, `_` or `-`
characters.
staged : :obj:`bool`
A filter on the key staged. Using true will only show staged
workloads, false will show all workloads.
download_statuses : :obj:`Sequence[str]`
pagination_type : :obj:`Literal['limits', 'cursor', 'stream']`
Method of pagination to use for return results where `total_items` is
greater than `page_size`. `cursor` and `limits` will return one `page`
of results, `stream` will return all results. ('limits', 'cursor',
'stream')
page_size : :obj:`int`
Number of objects to be returned in each page. Page size can range
between 1 and 10000 objects.
page : :obj:`int`
An integer for the wanted page of results. Used only with
`pagination_type` set as `limits`.
next : :obj:`str`
An alphanumeric string bookmark to indicate where to start for the
next page. Used only with `pagination_type` set as `cursor`.
previous : :obj:`str`
An alphanumeric string bookmark to indicate where to end for the
previous page. Used only with `pagination_type` set as `cursor`.
direction : :obj:`Literal['asc', 'desc']`
Sorting order according to the `sort_by` parameter. ('asc', 'desc')
sort_by : :obj:`Sequence[str]`
"""
result_types = {
"200": responses.LegacyWorkloadsListPaginatedResponseCursor,
"400": response.Error,
"401": response.Error,
"500": response.Error,
}
# override pagination_type
# stream type is only supported for raw responses
if not _get_response and pagination_type == "stream":
pagination_type = "cursor"
_request = cls._prepare_request(
method="GET",
path="/api/v4/workloads/list",
values={},
params={
"search": search,
"app_name": app_name,
"app_version": app_version,
"acp_name": acp_name,
"cluster_name": cluster_name,
"node_name": node_name,
"workload_name": workload_name,
"enabled": enabled,
"asset_name": asset_name,
"staged": staged,
"download_statuses": download_statuses,
"pagination_type": pagination_type,
"page_size": page_size,
"page": page,
"next": next,
"previous": previous,
"direction": direction,
"sort_by": sort_by,
},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
if pagination_type == "limits":
result = cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.LegacyWorkloadsListPaginatedResponseLimits,
)
else: # default pagination_type is cursor
result = cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.LegacyWorkloadsListPaginatedResponseCursor,
)
if fetch:
return cls._fetch_pages(
client=_client,
path=_request.path,
api_response=result,
method="GET",
data=_request.data,
result_types=result_types,
error_type=response.Error,
response_type=KList[responses.LegacyWorkloadItem],
)
return result
@overload
@classmethod
def get_legacy_workload_configuration(
cls,
workload_name: str,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
def get_legacy_workload_configuration(
cls,
workload_name: str,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[SyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
def get_legacy_workload_configuration(
cls,
workload_name: str,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
) -> responses.LegacyWorkloadConfigurationGet: ...
[docs]
@classmethod
def get_legacy_workload_configuration(
cls,
workload_name: str,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> Optional[Union[responses.LegacyWorkloadConfigurationGet, dict[str, object], Response]]:
"""Retrieve the configuration of a Workload.
**Permission Required:** `kelvin.permission.workload.read`.
``getLegacyWorkloadConfiguration``: ``GET`` ``/api/v4/workloads/{workload_name}/configurations/get``
Args:
workload_name : :obj:`str`, optional
Unique identifier `name` of the Workload.
"""
result_types = {
"200": responses.LegacyWorkloadConfigurationGet,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"409": response.Error,
"500": response.Error,
"501": None,
}
_request = cls._prepare_request(
method="GET",
path="/api/v4/workloads/{workload_name}/configurations/get",
values={"workload_name": workload_name},
params={},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
return cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.LegacyWorkloadConfigurationGet,
)
@overload
@classmethod
def update_legacy_workload_configuration(
cls,
workload_name: str,
data: Optional[Union[requests.LegacyWorkloadConfigurationUpdate, Mapping[str, object]]] = None,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> dict[str, object]: ...
@overload
@classmethod
def update_legacy_workload_configuration(
cls,
workload_name: str,
data: Optional[Union[requests.LegacyWorkloadConfigurationUpdate, Mapping[str, object]]] = None,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> Response: ...
@overload
@classmethod
def update_legacy_workload_configuration(
cls,
workload_name: str,
data: Optional[Union[requests.LegacyWorkloadConfigurationUpdate, Mapping[str, object]]] = None,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> responses.LegacyWorkloadConfigurationUpdate: ...
[docs]
@classmethod
def update_legacy_workload_configuration(
cls,
workload_name: str,
data: Optional[Union[requests.LegacyWorkloadConfigurationUpdate, Mapping[str, object]]] = None,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
**kwargs: object,
) -> Optional[Union[responses.LegacyWorkloadConfigurationUpdate, dict[str, object], Response]]:
"""Update the configuration of a Workload.
**Permission Required:** `kelvin.permission.workload.update`
**`WARNING!!`: Sending an empty configuration object will remove all configurations**.
``updateLegacyWorkloadConfiguration``: ``POST`` ``/api/v4/workloads/{workload_name}/configurations/update``
Args:
workload_name : :obj:`str`, optional
Unique identifier `name` of the Workload.
data: requests.LegacyWorkloadConfigurationUpdate, optional
**kwargs:
Extra parameters for requests.LegacyWorkloadConfigurationUpdate
- update_legacy_workload_configuration: dict
"""
result_types = {
"200": responses.LegacyWorkloadConfigurationUpdate,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"409": response.Error,
"500": response.Error,
"501": None,
}
_request = cls._prepare_request(
method="POST",
path="/api/v4/workloads/{workload_name}/configurations/update",
values={"workload_name": workload_name},
params={},
files={},
headers={},
data=data,
body_type=requests.LegacyWorkloadConfigurationUpdate,
array_body=False,
**kwargs,
)
if _dry_run:
return _request.to_dict()
_response = cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
return cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.LegacyWorkloadConfigurationUpdate,
)
@overload
@classmethod
def download_legacy_workload(
cls,
workload_name: str,
address: Optional[bool] = None,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
def download_legacy_workload(
cls,
workload_name: str,
address: Optional[bool] = None,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[SyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
def download_legacy_workload(
cls,
workload_name: str,
address: Optional[bool] = None,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
) -> str: ...
[docs]
@classmethod
def download_legacy_workload(
cls,
workload_name: str,
address: Optional[bool] = None,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> Optional[Union[str, dict[str, object], Response]]:
"""Download the Workload package file for offline installation on the Edge System. The system automatically generates the file for download if the package is not already available.
**Permission Required:** `kelvin.permission.workload.read`.
``downloadLegacyWorkload``: ``GET`` ``/api/v4/workloads/{workload_name}/download``
Args:
workload_name : :obj:`str`, optional
Unique identifier `name` of the Workload.
address : :obj:`bool`
If true, the endpoint will return a direct URL to the workload package
file.
"""
result_types = {
"200": str,
"202": None,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"409": response.Error,
"412": response.Error,
"500": response.Error,
}
_request = cls._prepare_request(
method="GET",
path="/api/v4/workloads/{workload_name}/download",
values={"workload_name": workload_name},
params={"address": address},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
return cls._process_str_response(response=_response)
@overload
@classmethod
def get_legacy_workload(
cls,
workload_name: str,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
def get_legacy_workload(
cls,
workload_name: str,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[SyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
def get_legacy_workload(
cls,
workload_name: str,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
) -> responses.LegacyWorkloadGet: ...
[docs]
@classmethod
def get_legacy_workload(
cls,
workload_name: str,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> Union[responses.LegacyWorkloadGet, dict[str, object], Response]:
"""Retrieve the parameters of a Workload.
**Permission Required:** `kelvin.permission.workload.read`.
``getLegacyWorkload``: ``GET`` ``/api/v4/workloads/{workload_name}/get``
Args:
workload_name : :obj:`str`, optional
Unique identifier `name` of the Workload.
"""
result_types = {
"200": responses.LegacyWorkloadGet,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"500": response.Error,
}
_request = cls._prepare_request(
method="GET",
path="/api/v4/workloads/{workload_name}/get",
values={"workload_name": workload_name},
params={},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
return cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.LegacyWorkloadGet,
)
@overload
@classmethod
def get_legacy_workload_logs(
cls,
workload_name: str,
tail_lines: Optional[int] = None,
since_time: Optional[datetime] = None,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
def get_legacy_workload_logs(
cls,
workload_name: str,
tail_lines: Optional[int] = None,
since_time: Optional[datetime] = None,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[SyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
def get_legacy_workload_logs(
cls,
workload_name: str,
tail_lines: Optional[int] = None,
since_time: Optional[datetime] = None,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
) -> responses.LegacyWorkloadLogsGet: ...
[docs]
@classmethod
def get_legacy_workload_logs(
cls,
workload_name: str,
tail_lines: Optional[int] = None,
since_time: Optional[datetime] = None,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> Union[responses.LegacyWorkloadLogsGet, dict[str, object], Response]:
"""Get Workload Logs
**Permission Required:** `kelvin.permission.workload.read`.
``getLegacyWorkloadLogs``: ``GET`` ``/api/v4/workloads/{workload_name}/logs/get``
Args:
workload_name : :obj:`str`, optional
Unique identifier `name` of the Workload.
tail_lines : :obj:`int`
Specify the number of the most recent log lines to retrieve, counting
backwards from the latest entry.
since_time : :obj:`datetime`
UTC time of the starting point for log retrieval, formatted in RFC
3339.
"""
result_types = {
"200": responses.LegacyWorkloadLogsGet,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"409": response.Error,
"500": response.Error,
"501": str,
}
_request = cls._prepare_request(
method="GET",
path="/api/v4/workloads/{workload_name}/logs/get",
values={"workload_name": workload_name},
params={"tail_lines": tail_lines, "since_time": since_time},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
return cls._process_response(
response=_response,
result_types=result_types,
result_type=responses.LegacyWorkloadLogsGet,
)
@overload
@classmethod
def start_workload(
cls,
workload_name: str,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
def start_workload(
cls,
workload_name: str,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[SyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
def start_workload(
cls,
workload_name: str,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
) -> None: ...
[docs]
@classmethod
def start_workload(
cls,
workload_name: str,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> Optional[Union[dict[str, object], Response]]:
"""Start running the Workload.
**Permission Required:** `kelvin.permission.workload.update`.
``startWorkload``: ``GET`` ``/api/v4/workloads/{workload_name}/start``
Args:
workload_name : :obj:`str`, optional
Unique identifier `name` of the Workload.
"""
result_types = {
"200": None,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"409": response.Error,
"500": response.Error,
"501": None,
}
_request = cls._prepare_request(
method="GET",
path="/api/v4/workloads/{workload_name}/start",
values={"workload_name": workload_name},
params={},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
@overload
@classmethod
def stop_workload(
cls,
workload_name: str,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
def stop_workload(
cls,
workload_name: str,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[SyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
def stop_workload(
cls,
workload_name: str,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
) -> None: ...
[docs]
@classmethod
def stop_workload(
cls,
workload_name: str,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> Optional[Union[dict[str, object], Response]]:
"""Stop running the Workload.
**Permission Required:** `kelvin.permission.workload.update`.
``stopWorkload``: ``GET`` ``/api/v4/workloads/{workload_name}/stop``
Args:
workload_name : :obj:`str`, optional
Unique identifier `name` of the Workload.
"""
result_types = {
"200": None,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"409": response.Error,
"500": response.Error,
"501": None,
}
_request = cls._prepare_request(
method="GET",
path="/api/v4/workloads/{workload_name}/stop",
values={"workload_name": workload_name},
params={},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)
@overload
@classmethod
def undeploy_workload(
cls,
workload_name: str,
staged: Optional[bool] = None,
*,
_dry_run: Literal[True],
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> dict[str, object]: ...
@overload
@classmethod
def undeploy_workload(
cls,
workload_name: str,
staged: Optional[bool] = None,
_dry_run: Literal[False] = False,
*,
_get_response: Literal[True],
_client: Optional[SyncBaseClient] = None,
) -> Response: ...
@overload
@classmethod
def undeploy_workload(
cls,
workload_name: str,
staged: Optional[bool] = None,
_dry_run: Literal[False] = False,
_get_response: Literal[False] = False,
_client: Optional[SyncBaseClient] = None,
) -> None: ...
[docs]
@classmethod
def undeploy_workload(
cls,
workload_name: str,
staged: Optional[bool] = None,
_dry_run: bool = False,
_get_response: bool = False,
_client: Optional[SyncBaseClient] = None,
) -> Optional[Union[dict[str, object], Response]]:
"""Undeploy Workload from the Edge System.
**Permission Required:** `kelvin.permission.workload.delete`.
``undeployWorkload``: ``POST`` ``/api/v4/workloads/{workload_name}/undeploy``
Args:
workload_name : :obj:`str`, optional
Unique identifier `name` of the Workload.
staged : :obj:`bool`
If true, the endpoint only undeploys the staged workload, if any.
"""
result_types = {
"200": None,
"400": response.Error,
"401": response.Error,
"404": response.Error,
"500": response.Error,
"501": None,
}
_request = cls._prepare_request(
method="POST",
path="/api/v4/workloads/{workload_name}/undeploy",
values={"workload_name": workload_name},
params={"staged": staged},
files={},
headers={},
data=None,
body_type=None,
array_body=False,
)
if _dry_run:
return _request.to_dict()
_response = cls._make_request(
client=_client,
request=_request,
stream=False,
)
if _get_response:
return _response
cls._raise_api_error(
response=_response,
result_types=result_types,
error_type=response.Error,
)