Skip to content

async_client

KubernetesAsyncClientManager

Kubernetes async client manager.

Parameters:

Name Type Description Default
config_file str

Path to the Kubernetes configuration file. Defaults to None.

NOTSET
context str

Kubernetes context. Defaults to None.

NOTSET
client_configuration Configuration

Kubernetes client configuration. Defaults to None.

NOTSET
in_cluster bool

Whether to load the in-cluster config. Defaults to False.

NOTSET
Source code in spark_on_k8s/k8s/async_client.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class KubernetesAsyncClientManager:
    """Kubernetes async client manager.

    Args:
        config_file (str, optional): Path to the Kubernetes configuration file. Defaults to None.
        context (str, optional): Kubernetes context. Defaults to None.
        client_configuration (k8s.Configuration, optional): Kubernetes client configuration.
            Defaults to None.
        in_cluster (bool, optional): Whether to load the in-cluster config. Defaults to False.
    """

    def __init__(
        self,
        config_file: str | ArgNotSet = NOTSET,
        context: str | ArgNotSet = NOTSET,
        client_configuration: k8s.Configuration | ArgNotSet = NOTSET,
        in_cluster: bool | ArgNotSet = NOTSET,
    ) -> None:
        self.config_file = (
            config_file if config_file is not NOTSET else Configuration.SPARK_ON_K8S_CONFIG_FILE
        )
        self.context = context if context is not NOTSET else Configuration.SPARK_ON_K8S_CONTEXT
        self.client_configuration = (
            client_configuration
            if client_configuration is not NOTSET
            else Configuration.SPARK_ON_K8S_CLIENT_CONFIG
        )
        self.in_cluster = in_cluster if in_cluster is not NOTSET else Configuration.SPARK_ON_K8S_IN_CLUSTER

    @asynccontextmanager
    async def client(self) -> k8s.ApiClient:
        """Create a Kubernetes client in a context manager.

        Examples:
            >>> import asyncio
            >>> from spark_on_k8s.k8s.async_client import KubernetesAsyncClientManager
            >>> async def get_namespaces():
            >>>     async with KubernetesAsyncClientManager().client() as async_client:
            ...         api = k8s.CoreV1Api(async_client)
            ...         namespaces = [ns.metadata.name for ns in await api.list_namespace().items]
            ...         print(namespaces)
            >>> asyncio.run(get_namespaces())
            ['default', 'kube-node-lease', 'kube-public', 'kube-system', 'spark']

        Yields:
            k8s.ApiClient: Kubernetes client.
        """
        async_client = await self.create_client()
        try:
            yield async_client
        finally:
            await async_client.close()

    async def create_client(self) -> k8s.ApiClient:
        """Load the Kubernetes configuration and create a Kubernetes client.

        This method could be overridden to create a Kubernetes client in a different way without
        overriding the client method.

        Returns:
            k8s.ApiClient: Kubernetes client.
        """
        if not self.in_cluster:
            await config.load_kube_config(
                config_file=self.config_file,
                context=self.context,
                client_configuration=self.client_configuration,
            )
        else:
            config.load_incluster_config()
        return k8s.ApiClient()

client() async

Create a Kubernetes client in a context manager.

Examples:

>>> import asyncio
>>> from spark_on_k8s.k8s.async_client import KubernetesAsyncClientManager
>>> async def get_namespaces():
>>>     async with KubernetesAsyncClientManager().client() as async_client:
...         api = k8s.CoreV1Api(async_client)
...         namespaces = [ns.metadata.name for ns in await api.list_namespace().items]
...         print(namespaces)
>>> asyncio.run(get_namespaces())
['default', 'kube-node-lease', 'kube-public', 'kube-system', 'spark']

Yields:

Type Description
ApiClient

k8s.ApiClient: Kubernetes client.

Source code in spark_on_k8s/k8s/async_client.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@asynccontextmanager
async def client(self) -> k8s.ApiClient:
    """Create a Kubernetes client in a context manager.

    Examples:
        >>> import asyncio
        >>> from spark_on_k8s.k8s.async_client import KubernetesAsyncClientManager
        >>> async def get_namespaces():
        >>>     async with KubernetesAsyncClientManager().client() as async_client:
        ...         api = k8s.CoreV1Api(async_client)
        ...         namespaces = [ns.metadata.name for ns in await api.list_namespace().items]
        ...         print(namespaces)
        >>> asyncio.run(get_namespaces())
        ['default', 'kube-node-lease', 'kube-public', 'kube-system', 'spark']

    Yields:
        k8s.ApiClient: Kubernetes client.
    """
    async_client = await self.create_client()
    try:
        yield async_client
    finally:
        await async_client.close()

create_client() async

Load the Kubernetes configuration and create a Kubernetes client.

This method could be overridden to create a Kubernetes client in a different way without overriding the client method.

Returns:

Type Description
ApiClient

k8s.ApiClient: Kubernetes client.

Source code in spark_on_k8s/k8s/async_client.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
async def create_client(self) -> k8s.ApiClient:
    """Load the Kubernetes configuration and create a Kubernetes client.

    This method could be overridden to create a Kubernetes client in a different way without
    overriding the client method.

    Returns:
        k8s.ApiClient: Kubernetes client.
    """
    if not self.in_cluster:
        await config.load_kube_config(
            config_file=self.config_file,
            context=self.context,
            client_configuration=self.client_configuration,
        )
    else:
        config.load_incluster_config()
    return k8s.ApiClient()