Skip to content

configuration

Configuration

Spark on Kubernetes configuration.

Source code in spark_on_k8s/utils/configuration.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class Configuration:
    """Spark on Kubernetes configuration."""

    SPARK_ON_K8S_DOCKER_IMAGE = getenv("SPARK_ON_K8S_DOCKER_IMAGE")
    SPARK_ON_K8S_APP_PATH = getenv("SPARK_ON_K8S_APP_PATH")
    SPARK_ON_K8S_NAMESPACE = getenv("SPARK_ON_K8S_NAMESPACE", "default")
    SPARK_ON_K8S_SERVICE_ACCOUNT = getenv("SPARK_ON_K8S_SERVICE_ACCOUNT", "spark")
    SPARK_ON_K8S_APP_NAME = getenv("SPARK_ON_K8S_APP_NAME")
    SPARK_ON_K8S_SPARK_CONF = json.loads(getenv("SPARK_ON_K8S_SPARK_CONF", "{}"))
    SPARK_ON_K8S_CLASS_NAME = getenv("SPARK_ON_K8S_CLASS_NAME")
    SPARK_ON_K8S_PACKAGES = getenv("SPARK_ON_K8S_PACKAGES", "")
    SPARK_ON_K8S_APP_ARGUMENTS = json.loads(getenv("SPARK_ON_K8S_APP_ARGUMENTS", "[]"))
    SPARK_ON_K8S_APP_WAITER = getenv("SPARK_ON_K8S_APP_WAITER", "no_wait")
    SPARK_ON_K8S_IMAGE_PULL_POLICY = getenv("SPARK_ON_K8S_IMAGE_PULL_POLICY", "IfNotPresent")
    SPARK_ON_K8S_UI_REVERSE_PROXY = getenv("SPARK_ON_K8S_UI_REVERSE_PROXY", "false").lower() == "true"
    SPARK_ON_K8S_DRIVER_CPU = int(getenv("SPARK_ON_K8S_DRIVER_CPU", 1))
    SPARK_ON_K8S_DRIVER_MEMORY = int(getenv("SPARK_ON_K8S_DRIVER_MEMORY", 1024))
    SPARK_ON_K8S_DRIVER_MEMORY_OVERHEAD = int(getenv("SPARK_ON_K8S_DRIVER_MEMORY_OVERHEAD", 512))
    SPARK_ON_K8S_EXECUTOR_CPU = int(getenv("SPARK_ON_K8S_EXECUTOR_CPU", 1))
    SPARK_ON_K8S_EXECUTOR_MEMORY = int(getenv("SPARK_ON_K8S_EXECUTOR_MEMORY", 1024))
    SPARK_ON_K8S_EXECUTOR_MEMORY_OVERHEAD = int(getenv("SPARK_ON_K8S_EXECUTOR_MEMORY_OVERHEAD", 512))
    SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES = (
        int(getenv("SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES"))
        if getenv("SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES")
        else None
    )
    SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES = (
        int(getenv("SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES"))
        if getenv("SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES")
        else None
    )
    SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES = (
        int(getenv("SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES"))
        if getenv("SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES")
        else None
    )
    SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO = float(getenv("SPARK_ON_K8S_EXECUTOR_ALLOCATION_RATIO", 1.0))
    SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT = getenv("SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT", "1s")
    SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT = getenv(
        "SPARK_ON_K8S_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT", SPARK_ON_K8S_SCHEDULER_BACKLOG_TIMEOUT
    )
    SPARK_ON_K8S_SECRET_ENV_VAR = json.loads(getenv("SPARK_ON_K8S_SECRET_ENV_VAR", "{}"))
    SPARK_ON_K8S_DRIVER_ENV_VARS_FROM_SECRET = (
        getenv("SPARK_ON_K8S_DRIVER_ENV_VARS_FROM_SECRET").split(",")
        if getenv("SPARK_ON_K8S_DRIVER_ENV_VARS_FROM_SECRET")
        else []
    )
    SPARK_ON_K8S_STARTUP_TIMEOUT = int(getenv("SPARK_ON_K8S_STARTUP_TIMEOUT", 0))

    # Kubernetes client configuration
    # K8S client configuration
    SPARK_ON_K8S_CONFIG_FILE = getenv("SPARK_ON_K8S_CONFIG_FILE", None)
    SPARK_ON_K8S_CONTEXT = getenv("SPARK_ON_K8S_CONTEXT", None)
    SPARK_ON_K8S_CLIENT_CONFIG = (
        k8s.Configuration(json.loads(getenv("SPARK_ON_K8S_CLIENT_CONFIG")))
        if getenv("SPARK_ON_K8S_CLIENT_CONFIG", None)
        else None
    )
    SPARK_ON_K8S_IN_CLUSTER = bool(getenv("SPARK_ON_K8S_IN_CLUSTER", False))
    SPARK_ON_K8S_SPARK_DRIVER_NODE_SELECTOR = json.loads(
        getenv("SPARK_ON_K8S_SPARK_DRIVER_NODE_SELECTOR", "{}")
    )
    SPARK_ON_K8S_SPARK_EXECUTOR_NODE_SELECTOR = json.loads(
        getenv("SPARK_ON_K8S_SPARK_EXECUTOR_NODE_SELECTOR", "{}")
    )
    SPARK_ON_K8S_SPARK_DRIVER_LABELS = json.loads(getenv("SPARK_ON_K8S_SPARK_DRIVER_LABELS", "{}"))
    SPARK_ON_K8S_SPARK_EXECUTOR_LABELS = json.loads(getenv("SPARK_ON_K8S_SPARK_EXECUTOR_LABELS", "{}"))
    SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS = json.loads(getenv("SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS", "{}"))
    SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS = json.loads(
        getenv("SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS", "{}")
    )
    SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH = getenv("SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH", None)
    try:
        from kubernetes_asyncio import client as async_k8s

        SPARK_ON_K8S_ASYNC_CLIENT_CONFIG = (
            async_k8s.Configuration(json.loads(getenv("SPARK_ON_K8S_ASYNC_CLIENT_CONFIG")))
            if getenv("SPARK_ON_K8S_ASYNC_CLIENT_CONFIG", None)
            else None
        )
    except ImportError:
        SPARK_ON_K8S_ASYNC_CLIENT_CONFIG = None