Skip to content

setup_namespace

SparkOnK8SNamespaceSetup

Bases: LoggingMixin

Utility class to set up a namespace for Spark on Kubernetes.

Parameters:

Name Type Description Default
k8s_client_manager KubernetesClientManager

Kubernetes client manager. Defaults to None.

None
logger_name str

logger name. Defaults to "SparkOnK8SNamespaceSetup".

None
Source code in spark_on_k8s/utils/setup_namespace.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class SparkOnK8SNamespaceSetup(LoggingMixin):
    """Utility class to set up a namespace for Spark on Kubernetes.

    Args:
        k8s_client_manager (KubernetesClientManager, optional): Kubernetes client manager.
            Defaults to None.
        logger_name (str, optional): logger name. Defaults to "SparkOnK8SNamespaceSetup".
    """

    def __init__(
        self,
        *,
        k8s_client_manager: KubernetesClientManager | None = None,
        logger_name: str | None = None,
    ):
        super().__init__(logger_name=logger_name or "SparkOnK8SNamespaceSetup")
        self.k8s_client_manager = k8s_client_manager or KubernetesClientManager()

    def setup_namespace(self, namespace: str, should_print: bool = False):
        """Set up a namespace for Spark on Kubernetes.

        This method creates a namespace if it doesn't exist, creates a service account for Spark
        if it doesn't exist, and creates a cluster role binding for the service account and the
        edit cluster role if it doesn't exist.

        Args:
            namespace (str): the namespace to set up
            should_print (bool, optional): whether to print logs instead of logging them.
                Defaults to False.
        """
        with self.k8s_client_manager.client() as client:
            api = k8s.CoreV1Api(client)
            namespaces = [ns.metadata.name for ns in api.list_namespace().items]
            if namespace not in namespaces:
                self.log(msg=f"Creating namespace {namespace}", level=logging.INFO, should_print=should_print)
                api.create_namespace(
                    body=k8s.V1Namespace(
                        metadata=k8s.V1ObjectMeta(
                            name=namespace,
                        ),
                    ),
                )
            service_accounts = [
                sa.metadata.name for sa in api.list_namespaced_service_account(namespace=namespace).items
            ]
            if "spark" not in service_accounts:
                self.log(
                    msg=f"Creating spark service account in namespace {namespace}",
                    level=logging.INFO,
                    should_print=should_print,
                )
                api.create_namespaced_service_account(
                    namespace=namespace,
                    body=k8s.V1ServiceAccount(
                        metadata=k8s.V1ObjectMeta(
                            name="spark",
                        ),
                    ),
                )
            rbac_api = k8s.RbacAuthorizationV1Api(client)
            cluster_role_bindings = [crb.metadata.name for crb in rbac_api.list_cluster_role_binding().items]
            role_binding_name = f"spark-role-binding-{namespace}"
            if role_binding_name not in cluster_role_bindings:
                self.log(msg="Creating spark role binding", level=logging.INFO, should_print=should_print)
                rbac_api.create_cluster_role_binding(
                    body=k8s.V1ClusterRoleBinding(
                        metadata=k8s.V1ObjectMeta(
                            name=role_binding_name,
                        ),
                        role_ref=k8s.V1RoleRef(
                            api_group="rbac.authorization.k8s.io",
                            kind="ClusterRole",
                            name="edit",
                        ),
                        subjects=[
                            k8s.RbacV1Subject(
                                kind="ServiceAccount",
                                name="spark",
                                namespace=namespace,
                            )
                        ],
                    ),
                )

setup_namespace(namespace, should_print=False)

Set up a namespace for Spark on Kubernetes.

This method creates a namespace if it doesn't exist, creates a service account for Spark if it doesn't exist, and creates a cluster role binding for the service account and the edit cluster role if it doesn't exist.

Parameters:

Name Type Description Default
namespace str

the namespace to set up

required
should_print bool

whether to print logs instead of logging them. Defaults to False.

False
Source code in spark_on_k8s/utils/setup_namespace.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def setup_namespace(self, namespace: str, should_print: bool = False):
    """Set up a namespace for Spark on Kubernetes.

    This method creates a namespace if it doesn't exist, creates a service account for Spark
    if it doesn't exist, and creates a cluster role binding for the service account and the
    edit cluster role if it doesn't exist.

    Args:
        namespace (str): the namespace to set up
        should_print (bool, optional): whether to print logs instead of logging them.
            Defaults to False.
    """
    with self.k8s_client_manager.client() as client:
        api = k8s.CoreV1Api(client)
        namespaces = [ns.metadata.name for ns in api.list_namespace().items]
        if namespace not in namespaces:
            self.log(msg=f"Creating namespace {namespace}", level=logging.INFO, should_print=should_print)
            api.create_namespace(
                body=k8s.V1Namespace(
                    metadata=k8s.V1ObjectMeta(
                        name=namespace,
                    ),
                ),
            )
        service_accounts = [
            sa.metadata.name for sa in api.list_namespaced_service_account(namespace=namespace).items
        ]
        if "spark" not in service_accounts:
            self.log(
                msg=f"Creating spark service account in namespace {namespace}",
                level=logging.INFO,
                should_print=should_print,
            )
            api.create_namespaced_service_account(
                namespace=namespace,
                body=k8s.V1ServiceAccount(
                    metadata=k8s.V1ObjectMeta(
                        name="spark",
                    ),
                ),
            )
        rbac_api = k8s.RbacAuthorizationV1Api(client)
        cluster_role_bindings = [crb.metadata.name for crb in rbac_api.list_cluster_role_binding().items]
        role_binding_name = f"spark-role-binding-{namespace}"
        if role_binding_name not in cluster_role_bindings:
            self.log(msg="Creating spark role binding", level=logging.INFO, should_print=should_print)
            rbac_api.create_cluster_role_binding(
                body=k8s.V1ClusterRoleBinding(
                    metadata=k8s.V1ObjectMeta(
                        name=role_binding_name,
                    ),
                    role_ref=k8s.V1RoleRef(
                        api_group="rbac.authorization.k8s.io",
                        kind="ClusterRole",
                        name="edit",
                    ),
                    subjects=[
                        k8s.RbacV1Subject(
                            kind="ServiceAccount",
                            name="spark",
                            namespace=namespace,
                        )
                    ],
                ),
            )