Source code for kelvin.api.client.api.deprecated_workload

# Code generated by builder. DO NOT EDIT.
"""
Kelvin API Client.
"""

from __future__ import annotations

from collections.abc import Mapping, Sequence
from datetime import datetime
from typing import Optional, Union, overload

from typing_extensions import Literal

from kelvin.api.base.api_service_model import ApiServiceModel
from kelvin.api.base.data_model import KList
from kelvin.api.base.http_client.base_client import Response, SyncBaseClient

from ..model import requests, response, responses


[docs] class DeprecatedWorkload(ApiServiceModel): @overload @classmethod def apply_legacy_workload( cls, data: Optional[Union[requests.LegacyWorkloadApply, Mapping[str, object]]] = None, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> dict[str, object]: ... @overload @classmethod def apply_legacy_workload( cls, data: Optional[Union[requests.LegacyWorkloadApply, Mapping[str, object]]] = None, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> Response: ... @overload @classmethod def apply_legacy_workload( cls, data: Optional[Union[requests.LegacyWorkloadApply, Mapping[str, object]]] = None, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> None: ...
[docs] @classmethod def apply_legacy_workload( cls, data: Optional[Union[requests.LegacyWorkloadApply, Mapping[str, object]]] = None, _dry_run: bool = False, _get_response: bool = False, _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> Optional[Union[dict[str, object], Response]]: """Initiate final deploy action for the downloaded Workloads. Only valid for Workloads that were previously deployed with the keys `staged` set to true and `instantly_apply` set to false. **Permission Required:** `kelvin.permission.workload.update`. ``applyLegacyWorkload``: ``POST`` ``/api/v4/workloads/apply`` Args: data: requests.LegacyWorkloadApply, optional **kwargs: Extra parameters for requests.LegacyWorkloadApply - apply_legacy_workload: dict """ result_types = { "200": None, "400": response.Error, "401": response.Error, "404": response.Error, "406": response.Error, "500": response.Error, "501": None, } _request = cls._prepare_request( method="POST", path="/api/v4/workloads/apply", values={}, params={}, files={}, headers={}, data=data, body_type=requests.LegacyWorkloadApply, array_body=False, **kwargs, ) if _dry_run: return _request.to_dict() _response = cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, )
@overload @classmethod def deploy_legacy_workload( cls, data: Optional[Union[requests.LegacyWorkloadDeploy, Mapping[str, object]]] = None, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> dict[str, object]: ... @overload @classmethod def deploy_legacy_workload( cls, data: Optional[Union[requests.LegacyWorkloadDeploy, Mapping[str, object]]] = None, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> Response: ... @overload @classmethod def deploy_legacy_workload( cls, data: Optional[Union[requests.LegacyWorkloadDeploy, Mapping[str, object]]] = None, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> responses.LegacyWorkloadDeploy: ...
[docs] @classmethod def deploy_legacy_workload( cls, data: Optional[Union[requests.LegacyWorkloadDeploy, Mapping[str, object]]] = None, _dry_run: bool = False, _get_response: bool = False, _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> Optional[Union[responses.LegacyWorkloadDeploy, dict[str, object], Response]]: """Deploy an App from the App Registry as a Workload to a Cluster/Node. **Permission Required:** `kelvin.permission.workload.update`. ``deployLegacyWorkload``: ``POST`` ``/api/v4/workloads/deploy`` Args: data: requests.LegacyWorkloadDeploy, optional **kwargs: Extra parameters for requests.LegacyWorkloadDeploy - deploy_legacy_workload: dict """ result_types = { "201": responses.LegacyWorkloadDeploy, "400": response.Error, "401": response.Error, "404": response.Error, "409": response.Error, "500": response.Error, "501": None, } _request = cls._prepare_request( method="POST", path="/api/v4/workloads/deploy", values={}, params={}, files={}, headers={}, data=data, body_type=requests.LegacyWorkloadDeploy, array_body=False, **kwargs, ) if _dry_run: return _request.to_dict() _response = cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) return cls._process_response( response=_response, result_types=result_types, result_type=responses.LegacyWorkloadDeploy, )
@overload @classmethod def list_legacy_workloads( cls, search: Optional[Sequence[str]] = None, app_name: Optional[Sequence[str]] = None, app_version: Optional[Sequence[str]] = None, acp_name: Optional[Sequence[str]] = None, cluster_name: Optional[Sequence[str]] = None, node_name: Optional[Sequence[str]] = None, workload_name: Optional[Sequence[str]] = None, enabled: Optional[bool] = None, asset_name: Optional[str] = None, staged: Optional[bool] = None, download_statuses: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod def list_legacy_workloads( cls, search: Optional[Sequence[str]] = None, app_name: Optional[Sequence[str]] = None, app_version: Optional[Sequence[str]] = None, acp_name: Optional[Sequence[str]] = None, cluster_name: Optional[Sequence[str]] = None, node_name: Optional[Sequence[str]] = None, workload_name: Optional[Sequence[str]] = None, enabled: Optional[bool] = None, asset_name: Optional[str] = None, staged: Optional[bool] = None, download_statuses: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[SyncBaseClient] = None, ) -> Response: ... @overload @classmethod def list_legacy_workloads( cls, search: Optional[Sequence[str]] = None, app_name: Optional[Sequence[str]] = None, app_version: Optional[Sequence[str]] = None, acp_name: Optional[Sequence[str]] = None, cluster_name: Optional[Sequence[str]] = None, node_name: Optional[Sequence[str]] = None, workload_name: Optional[Sequence[str]] = None, enabled: Optional[bool] = None, asset_name: Optional[str] = None, staged: Optional[bool] = None, download_statuses: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, sort_by: Optional[Sequence[str]] = None, *, fetch: Literal[False], _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, ) -> Union[ responses.LegacyWorkloadsListPaginatedResponseCursor, responses.LegacyWorkloadsListPaginatedResponseLimits ]: ... @overload @classmethod def list_legacy_workloads( cls, search: Optional[Sequence[str]] = None, app_name: Optional[Sequence[str]] = None, app_version: Optional[Sequence[str]] = None, acp_name: Optional[Sequence[str]] = None, cluster_name: Optional[Sequence[str]] = None, node_name: Optional[Sequence[str]] = None, workload_name: Optional[Sequence[str]] = None, enabled: Optional[bool] = None, asset_name: Optional[str] = None, staged: Optional[bool] = None, download_statuses: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: Literal[True] = True, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, ) -> KList[responses.LegacyWorkloadItem]: ...
[docs] @classmethod def list_legacy_workloads( cls, search: Optional[Sequence[str]] = None, app_name: Optional[Sequence[str]] = None, app_version: Optional[Sequence[str]] = None, acp_name: Optional[Sequence[str]] = None, cluster_name: Optional[Sequence[str]] = None, node_name: Optional[Sequence[str]] = None, workload_name: Optional[Sequence[str]] = None, enabled: Optional[bool] = None, asset_name: Optional[str] = None, staged: Optional[bool] = None, download_statuses: Optional[Sequence[str]] = None, pagination_type: Optional[Literal["limits", "cursor", "stream"]] = None, page_size: Optional[int] = 10000, page: Optional[int] = None, next: Optional[str] = None, previous: Optional[str] = None, direction: Optional[Literal["asc", "desc"]] = None, sort_by: Optional[Sequence[str]] = None, fetch: bool = True, _dry_run: bool = False, _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> Union[ Union[ KList[responses.LegacyWorkloadItem], responses.LegacyWorkloadsListPaginatedResponseCursor, responses.LegacyWorkloadsListPaginatedResponseLimits, ], dict[str, object], Response, ]: """Returns a list of Workload objects. The list can be optionally filtered and sorted on the server before being returned. **Permission Required:** `kelvin.permission.workload.read`. ``listLegacyWorkloads``: ``GET`` ``/api/v4/workloads/list`` Args: search : :obj:`Sequence[str]` Search and filter on the list based on the keys `name`, `title` (Display Name), `app_name`, `cluster_name` and `node_name`. All strings in the array are treated as `OR`. The search is case insensitive and will find partial matches as well. app_name : :obj:`Sequence[str]` A filter on the list based on the key `app_name`. The filter is on the full name only. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. app_version : :obj:`Sequence[str]` A filter on the list based on the key `app_version`. The filter is on the full value only. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. acp_name : :obj:`Sequence[str]` [`Deprecated`] A filter on the list based on the key `acp_name`. The filter is on the full name only. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. cluster_name : :obj:`Sequence[str]` A filter on the list based on the key `cluster_name`. The filter is on the full name only. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. If set, it will override acp_name node_name : :obj:`Sequence[str]` A filter on the list based on the key `node_name`. The filter is on the full name only. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. workload_name : :obj:`Sequence[str]` A filter on the list based on the key `name`. The filter is on the full name only. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. enabled : :obj:`bool` A filter on the list based on the key `status` (start/stop function in Kelvin UI) of the Workloads. asset_name : :obj:`str` A filter on the list based on Asset `name` associated with the Workload. The filter is on the full name only. The string can only contain lowercase alphanumeric characters and `.`, `_` or `-` characters. staged : :obj:`bool` A filter on the key staged. Using true will only show staged workloads, false will show all workloads. download_statuses : :obj:`Sequence[str]` pagination_type : :obj:`Literal['limits', 'cursor', 'stream']` Method of pagination to use for return results where `total_items` is greater than `page_size`. `cursor` and `limits` will return one `page` of results, `stream` will return all results. ('limits', 'cursor', 'stream') page_size : :obj:`int` Number of objects to be returned in each page. Page size can range between 1 and 10000 objects. page : :obj:`int` An integer for the wanted page of results. Used only with `pagination_type` set as `limits`. next : :obj:`str` An alphanumeric string bookmark to indicate where to start for the next page. Used only with `pagination_type` set as `cursor`. previous : :obj:`str` An alphanumeric string bookmark to indicate where to end for the previous page. Used only with `pagination_type` set as `cursor`. direction : :obj:`Literal['asc', 'desc']` Sorting order according to the `sort_by` parameter. ('asc', 'desc') sort_by : :obj:`Sequence[str]` """ result_types = { "200": responses.LegacyWorkloadsListPaginatedResponseCursor, "400": response.Error, "401": response.Error, "500": response.Error, } # override pagination_type # stream type is only supported for raw responses if not _get_response and pagination_type == "stream": pagination_type = "cursor" _request = cls._prepare_request( method="GET", path="/api/v4/workloads/list", values={}, params={ "search": search, "app_name": app_name, "app_version": app_version, "acp_name": acp_name, "cluster_name": cluster_name, "node_name": node_name, "workload_name": workload_name, "enabled": enabled, "asset_name": asset_name, "staged": staged, "download_statuses": download_statuses, "pagination_type": pagination_type, "page_size": page_size, "page": page, "next": next, "previous": previous, "direction": direction, "sort_by": sort_by, }, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) if pagination_type == "limits": result = cls._process_response( response=_response, result_types=result_types, result_type=responses.LegacyWorkloadsListPaginatedResponseLimits, ) else: # default pagination_type is cursor result = cls._process_response( response=_response, result_types=result_types, result_type=responses.LegacyWorkloadsListPaginatedResponseCursor, ) if fetch: return cls._fetch_pages( client=_client, path=_request.path, api_response=result, method="GET", data=_request.data, result_types=result_types, error_type=response.Error, response_type=KList[responses.LegacyWorkloadItem], ) return result
@overload @classmethod def get_legacy_workload_configuration( cls, workload_name: str, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod def get_legacy_workload_configuration( cls, workload_name: str, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[SyncBaseClient] = None, ) -> Response: ... @overload @classmethod def get_legacy_workload_configuration( cls, workload_name: str, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, ) -> responses.LegacyWorkloadConfigurationGet: ...
[docs] @classmethod def get_legacy_workload_configuration( cls, workload_name: str, _dry_run: bool = False, _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> Optional[Union[responses.LegacyWorkloadConfigurationGet, dict[str, object], Response]]: """Retrieve the configuration of a Workload. **Permission Required:** `kelvin.permission.workload.read`. ``getLegacyWorkloadConfiguration``: ``GET`` ``/api/v4/workloads/{workload_name}/configurations/get`` Args: workload_name : :obj:`str`, optional Unique identifier `name` of the Workload. """ result_types = { "200": responses.LegacyWorkloadConfigurationGet, "400": response.Error, "401": response.Error, "404": response.Error, "409": response.Error, "500": response.Error, "501": None, } _request = cls._prepare_request( method="GET", path="/api/v4/workloads/{workload_name}/configurations/get", values={"workload_name": workload_name}, params={}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) return cls._process_response( response=_response, result_types=result_types, result_type=responses.LegacyWorkloadConfigurationGet, )
@overload @classmethod def update_legacy_workload_configuration( cls, workload_name: str, data: Optional[Union[requests.LegacyWorkloadConfigurationUpdate, Mapping[str, object]]] = None, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> dict[str, object]: ... @overload @classmethod def update_legacy_workload_configuration( cls, workload_name: str, data: Optional[Union[requests.LegacyWorkloadConfigurationUpdate, Mapping[str, object]]] = None, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> Response: ... @overload @classmethod def update_legacy_workload_configuration( cls, workload_name: str, data: Optional[Union[requests.LegacyWorkloadConfigurationUpdate, Mapping[str, object]]] = None, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> responses.LegacyWorkloadConfigurationUpdate: ...
[docs] @classmethod def update_legacy_workload_configuration( cls, workload_name: str, data: Optional[Union[requests.LegacyWorkloadConfigurationUpdate, Mapping[str, object]]] = None, _dry_run: bool = False, _get_response: bool = False, _client: Optional[SyncBaseClient] = None, **kwargs: object, ) -> Optional[Union[responses.LegacyWorkloadConfigurationUpdate, dict[str, object], Response]]: """Update the configuration of a Workload. **Permission Required:** `kelvin.permission.workload.update` **`WARNING!!`: Sending an empty configuration object will remove all configurations**. ``updateLegacyWorkloadConfiguration``: ``POST`` ``/api/v4/workloads/{workload_name}/configurations/update`` Args: workload_name : :obj:`str`, optional Unique identifier `name` of the Workload. data: requests.LegacyWorkloadConfigurationUpdate, optional **kwargs: Extra parameters for requests.LegacyWorkloadConfigurationUpdate - update_legacy_workload_configuration: dict """ result_types = { "200": responses.LegacyWorkloadConfigurationUpdate, "400": response.Error, "401": response.Error, "404": response.Error, "409": response.Error, "500": response.Error, "501": None, } _request = cls._prepare_request( method="POST", path="/api/v4/workloads/{workload_name}/configurations/update", values={"workload_name": workload_name}, params={}, files={}, headers={}, data=data, body_type=requests.LegacyWorkloadConfigurationUpdate, array_body=False, **kwargs, ) if _dry_run: return _request.to_dict() _response = cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) return cls._process_response( response=_response, result_types=result_types, result_type=responses.LegacyWorkloadConfigurationUpdate, )
@overload @classmethod def download_legacy_workload( cls, workload_name: str, address: Optional[bool] = None, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod def download_legacy_workload( cls, workload_name: str, address: Optional[bool] = None, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[SyncBaseClient] = None, ) -> Response: ... @overload @classmethod def download_legacy_workload( cls, workload_name: str, address: Optional[bool] = None, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, ) -> str: ...
[docs] @classmethod def download_legacy_workload( cls, workload_name: str, address: Optional[bool] = None, _dry_run: bool = False, _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> Optional[Union[str, dict[str, object], Response]]: """Download the Workload package file for offline installation on the Edge System. The system automatically generates the file for download if the package is not already available. **Permission Required:** `kelvin.permission.workload.read`. ``downloadLegacyWorkload``: ``GET`` ``/api/v4/workloads/{workload_name}/download`` Args: workload_name : :obj:`str`, optional Unique identifier `name` of the Workload. address : :obj:`bool` If true, the endpoint will return a direct URL to the workload package file. """ result_types = { "200": str, "202": None, "400": response.Error, "401": response.Error, "404": response.Error, "409": response.Error, "412": response.Error, "500": response.Error, } _request = cls._prepare_request( method="GET", path="/api/v4/workloads/{workload_name}/download", values={"workload_name": workload_name}, params={"address": address}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) return cls._process_str_response(response=_response)
@overload @classmethod def get_legacy_workload( cls, workload_name: str, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod def get_legacy_workload( cls, workload_name: str, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[SyncBaseClient] = None, ) -> Response: ... @overload @classmethod def get_legacy_workload( cls, workload_name: str, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, ) -> responses.LegacyWorkloadGet: ...
[docs] @classmethod def get_legacy_workload( cls, workload_name: str, _dry_run: bool = False, _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> Union[responses.LegacyWorkloadGet, dict[str, object], Response]: """Retrieve the parameters of a Workload. **Permission Required:** `kelvin.permission.workload.read`. ``getLegacyWorkload``: ``GET`` ``/api/v4/workloads/{workload_name}/get`` Args: workload_name : :obj:`str`, optional Unique identifier `name` of the Workload. """ result_types = { "200": responses.LegacyWorkloadGet, "400": response.Error, "401": response.Error, "404": response.Error, "500": response.Error, } _request = cls._prepare_request( method="GET", path="/api/v4/workloads/{workload_name}/get", values={"workload_name": workload_name}, params={}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) return cls._process_response( response=_response, result_types=result_types, result_type=responses.LegacyWorkloadGet, )
@overload @classmethod def get_legacy_workload_logs( cls, workload_name: str, tail_lines: Optional[int] = None, since_time: Optional[datetime] = None, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod def get_legacy_workload_logs( cls, workload_name: str, tail_lines: Optional[int] = None, since_time: Optional[datetime] = None, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[SyncBaseClient] = None, ) -> Response: ... @overload @classmethod def get_legacy_workload_logs( cls, workload_name: str, tail_lines: Optional[int] = None, since_time: Optional[datetime] = None, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, ) -> responses.LegacyWorkloadLogsGet: ...
[docs] @classmethod def get_legacy_workload_logs( cls, workload_name: str, tail_lines: Optional[int] = None, since_time: Optional[datetime] = None, _dry_run: bool = False, _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> Union[responses.LegacyWorkloadLogsGet, dict[str, object], Response]: """Get Workload Logs **Permission Required:** `kelvin.permission.workload.read`. ``getLegacyWorkloadLogs``: ``GET`` ``/api/v4/workloads/{workload_name}/logs/get`` Args: workload_name : :obj:`str`, optional Unique identifier `name` of the Workload. tail_lines : :obj:`int` Specify the number of the most recent log lines to retrieve, counting backwards from the latest entry. since_time : :obj:`datetime` UTC time of the starting point for log retrieval, formatted in RFC 3339. """ result_types = { "200": responses.LegacyWorkloadLogsGet, "400": response.Error, "401": response.Error, "404": response.Error, "409": response.Error, "500": response.Error, "501": str, } _request = cls._prepare_request( method="GET", path="/api/v4/workloads/{workload_name}/logs/get", values={"workload_name": workload_name}, params={"tail_lines": tail_lines, "since_time": since_time}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, ) return cls._process_response( response=_response, result_types=result_types, result_type=responses.LegacyWorkloadLogsGet, )
@overload @classmethod def start_workload( cls, workload_name: str, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod def start_workload( cls, workload_name: str, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[SyncBaseClient] = None, ) -> Response: ... @overload @classmethod def start_workload( cls, workload_name: str, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, ) -> None: ...
[docs] @classmethod def start_workload( cls, workload_name: str, _dry_run: bool = False, _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> Optional[Union[dict[str, object], Response]]: """Start running the Workload. **Permission Required:** `kelvin.permission.workload.update`. ``startWorkload``: ``GET`` ``/api/v4/workloads/{workload_name}/start`` Args: workload_name : :obj:`str`, optional Unique identifier `name` of the Workload. """ result_types = { "200": None, "400": response.Error, "401": response.Error, "404": response.Error, "409": response.Error, "500": response.Error, "501": None, } _request = cls._prepare_request( method="GET", path="/api/v4/workloads/{workload_name}/start", values={"workload_name": workload_name}, params={}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, )
@overload @classmethod def stop_workload( cls, workload_name: str, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod def stop_workload( cls, workload_name: str, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[SyncBaseClient] = None, ) -> Response: ... @overload @classmethod def stop_workload( cls, workload_name: str, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, ) -> None: ...
[docs] @classmethod def stop_workload( cls, workload_name: str, _dry_run: bool = False, _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> Optional[Union[dict[str, object], Response]]: """Stop running the Workload. **Permission Required:** `kelvin.permission.workload.update`. ``stopWorkload``: ``GET`` ``/api/v4/workloads/{workload_name}/stop`` Args: workload_name : :obj:`str`, optional Unique identifier `name` of the Workload. """ result_types = { "200": None, "400": response.Error, "401": response.Error, "404": response.Error, "409": response.Error, "500": response.Error, "501": None, } _request = cls._prepare_request( method="GET", path="/api/v4/workloads/{workload_name}/stop", values={"workload_name": workload_name}, params={}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, )
@overload @classmethod def undeploy_workload( cls, workload_name: str, staged: Optional[bool] = None, *, _dry_run: Literal[True], _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> dict[str, object]: ... @overload @classmethod def undeploy_workload( cls, workload_name: str, staged: Optional[bool] = None, _dry_run: Literal[False] = False, *, _get_response: Literal[True], _client: Optional[SyncBaseClient] = None, ) -> Response: ... @overload @classmethod def undeploy_workload( cls, workload_name: str, staged: Optional[bool] = None, _dry_run: Literal[False] = False, _get_response: Literal[False] = False, _client: Optional[SyncBaseClient] = None, ) -> None: ...
[docs] @classmethod def undeploy_workload( cls, workload_name: str, staged: Optional[bool] = None, _dry_run: bool = False, _get_response: bool = False, _client: Optional[SyncBaseClient] = None, ) -> Optional[Union[dict[str, object], Response]]: """Undeploy Workload from the Edge System. **Permission Required:** `kelvin.permission.workload.delete`. ``undeployWorkload``: ``POST`` ``/api/v4/workloads/{workload_name}/undeploy`` Args: workload_name : :obj:`str`, optional Unique identifier `name` of the Workload. staged : :obj:`bool` If true, the endpoint only undeploys the staged workload, if any. """ result_types = { "200": None, "400": response.Error, "401": response.Error, "404": response.Error, "500": response.Error, "501": None, } _request = cls._prepare_request( method="POST", path="/api/v4/workloads/{workload_name}/undeploy", values={"workload_name": workload_name}, params={"staged": staged}, files={}, headers={}, data=None, body_type=None, array_body=False, ) if _dry_run: return _request.to_dict() _response = cls._make_request( client=_client, request=_request, stream=False, ) if _get_response: return _response cls._raise_api_error( response=_response, result_types=result_types, error_type=response.Error, )