Skip to content

sync_client

KubernetesClientManager

Kubernetes client manager.

Parameters:

Name Type Description Default
config_file str

Path to the Kubernetes configuration file. Defaults to None.

NOTSET
context str

Kubernetes context. Defaults to None.

NOTSET
client_configuration Configuration

Kubernetes client configuration. Defaults to None.

NOTSET
Source code in spark_on_k8s/k8s/sync_client.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class KubernetesClientManager:
    """Kubernetes client manager.

    Args:
        config_file (str, optional): Path to the Kubernetes configuration file. Defaults to None.
        context (str, optional): Kubernetes context. Defaults to None.
        client_configuration (k8s.Configuration, optional): Kubernetes client configuration.
            Defaults to None.
    """

    def __init__(
        self,
        config_file: str | ArgNotSet = NOTSET,
        context: str | ArgNotSet = NOTSET,
        client_configuration: k8s.Configuration | ArgNotSet = NOTSET,
        in_cluster: bool | ArgNotSet = NOTSET,
    ) -> None:
        self.config_file = (
            config_file if config_file is not NOTSET else Configuration.SPARK_ON_K8S_CONFIG_FILE
        )
        self.context = context if context is not NOTSET else Configuration.SPARK_ON_K8S_CONTEXT
        self.client_configuration = (
            client_configuration
            if client_configuration is not NOTSET
            else Configuration.SPARK_ON_K8S_CLIENT_CONFIG
        )
        self.in_cluster = in_cluster if in_cluster is not NOTSET else Configuration.SPARK_ON_K8S_IN_CLUSTER

    @contextmanager
    def client(self) -> k8s.ApiClient:
        """Create a Kubernetes client in a context manager.

        Examples:
            >>> from spark_on_k8s.k8s.sync_client import KubernetesClientManager
            >>> with KubernetesClientManager().client() as client:
            ...     api = k8s.CoreV1Api(client)
            ...     namespaces = [ns.metadata.name for ns in api.list_namespace().items]
            ...     print(namespaces)
            ['default', 'kube-node-lease', 'kube-public', 'kube-system', 'spark']

        Yields:
            k8s.ApiClient: Kubernetes client.
        """
        _client = None
        try:
            _client = self.create_client()
            yield _client
        finally:
            if _client:
                _client.close()

    def create_client(self) -> k8s.ApiClient:
        """Load the Kubernetes configuration and create a Kubernetes client.

        This method could be overridden to create a Kubernetes client in a different way without
        overriding the client method.

        Returns:
            k8s.ApiClient: Kubernetes client.
        """
        if not self.in_cluster:
            config.load_kube_config(
                config_file=self.config_file,
                context=self.context,
                client_configuration=self.client_configuration,
            )
        else:
            config.load_incluster_config()
        return k8s.ApiClient()

client()

Create a Kubernetes client in a context manager.

Examples:

>>> from spark_on_k8s.k8s.sync_client import KubernetesClientManager
>>> with KubernetesClientManager().client() as client:
...     api = k8s.CoreV1Api(client)
...     namespaces = [ns.metadata.name for ns in api.list_namespace().items]
...     print(namespaces)
['default', 'kube-node-lease', 'kube-public', 'kube-system', 'spark']

Yields:

Type Description
ApiClient

k8s.ApiClient: Kubernetes client.

Source code in spark_on_k8s/k8s/sync_client.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@contextmanager
def client(self) -> k8s.ApiClient:
    """Create a Kubernetes client in a context manager.

    Examples:
        >>> from spark_on_k8s.k8s.sync_client import KubernetesClientManager
        >>> with KubernetesClientManager().client() as client:
        ...     api = k8s.CoreV1Api(client)
        ...     namespaces = [ns.metadata.name for ns in api.list_namespace().items]
        ...     print(namespaces)
        ['default', 'kube-node-lease', 'kube-public', 'kube-system', 'spark']

    Yields:
        k8s.ApiClient: Kubernetes client.
    """
    _client = None
    try:
        _client = self.create_client()
        yield _client
    finally:
        if _client:
            _client.close()

create_client()

Load the Kubernetes configuration and create a Kubernetes client.

This method could be overridden to create a Kubernetes client in a different way without overriding the client method.

Returns:

Type Description
ApiClient

k8s.ApiClient: Kubernetes client.

Source code in spark_on_k8s/k8s/sync_client.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def create_client(self) -> k8s.ApiClient:
    """Load the Kubernetes configuration and create a Kubernetes client.

    This method could be overridden to create a Kubernetes client in a different way without
    overriding the client method.

    Returns:
        k8s.ApiClient: Kubernetes client.
    """
    if not self.in_cluster:
        config.load_kube_config(
            config_file=self.config_file,
            context=self.context,
            client_configuration=self.client_configuration,
        )
    else:
        config.load_incluster_config()
    return k8s.ApiClient()