Skip to content

Examples

Here are some examples of how to package and submit spark apps with this package. In the examples, the base image is built with the spark image tool, as described in the spark documentation.

Python

In this example, we use a small PySpark application that takes a parameter num_points to calculate the value of Pi:

from __future__ import annotations

import random
import sys

from my_modules.pi import is_point_in_the_unit_circle
from pyspark.sql import SparkSession

if __name__ == "__main__":
    # Check if the number of arguments is correct
    if len(sys.argv) != 2:
        print("Usage: python script.py <num_points>")
        sys.exit(1)

    # Get the number of points from command-line arguments
    num_points = int(sys.argv[1])

    # Create a Spark session
    spark = SparkSession.builder.appName("Pi-Estimation").getOrCreate()

    # Generate random points within the unit square
    points = spark.sparkContext.parallelize(range(1, num_points + 1)).map(
        lambda _: (random.random(), random.random())
    )

    # Count points within the unit circle
    inside_circle = points.filter(is_point_in_the_unit_circle)

    # Estimate Pi
    pi_estimate = 4 * inside_circle.count() / num_points

    # Display the result
    print(f"Pi is approximately {pi_estimate}")

    # Stop the Spark session
    spark.stop()
and a Dockerfile to package the application with the spark image:
FROM husseinawala/spark-py:v3.5.0

# Add the job and the modules
ADD my_modules /opt/spark/work-dir/my_modules
ADD job.py /opt/spark/work-dir/
# Update the PYTHONPATH to include the modules
ENV PYTHONPATH "${PYTHONPATH}:/opt/spark/work-dir"

First, build the docker image and push it to a registry accessible by your cluster, or load it into your cluster's local registry if you're using minikube or kind:

docker build -t pyspark-job examples/python

# For minikube
minikube image load pyspark-job
# For kind
kind load docker-image pyspark-job
# For remote clusters, you will need to change the image name to match your registry,
# and then push it to that registry
docker push pyspark-job
Then, you can submit the job using the python client:
from __future__ import annotations

import logging

from spark_on_k8s.client import ExecutorInstances, PodResources, SparkOnK8S

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    spark_client = SparkOnK8S()
    spark_client.submit_app(
        image="pyspark-job",
        app_path="local:///opt/spark/work-dir/job.py",
        app_arguments=["100000"],
        app_name="pyspark-job-example",
        namespace="spark",
        service_account="spark",
        app_waiter="log",
        # If you test this locally (minikube or kind) without pushing the image to a registry,
        # you need to set the image_pull_policy to Never.
        image_pull_policy="Never",
        ui_reverse_proxy=True,
        driver_resources=PodResources(cpu=1, memory=512, memory_overhead=128),
        executor_resources=PodResources(cpu=1, memory=512, memory_overhead=128),
        # Run with 5 executors
        executor_instances=ExecutorInstances(initial=5),
    )
or using the CLI:
spark-on-k8s app submit \
  --image pyspark-job \
  --path local:///opt/spark/work-dir/job.py \
  --namespace spark \
  --name pyspark-job-example \
  --service-account spark \
  --image-pull-policy Never \
  --driver-cpu 1 \
  --driver-memory 512 \
  --driver-memory-overhead 128 \
  --executor-cpu 1 \
  --executor-memory 512 \
  --executor-memory-overhead 128 \
  --executor-initial-instances 5 \
  --ui-reverse-proxy \
  --logs \
  100000

Java

This example is similar to the previous one, but it's implemented in java (with maven).

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.oss-tech.examples</groupId>
    <artifactId>java-job</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <spark.version>3.5.3</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
</project>
the job class (src/main/java/com/oss_tech/examples/TestJob.java):
package com.oss_tech.examples;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;


public class TestJob {
  public static void main(String[] args) {
    // Check if the correct number of arguments is provided
    if (args.length != 1) {
      System.err.println("Usage: PiEstimation <num_points>");
      System.exit(1);
    }

    // Initialize Spark session
    final SparkSession spark = SparkSession.builder()
        .appName("Spark Java example")
        .getOrCreate();

    // Get the number of points from command-line arguments
    final int numPoints = Integer.parseInt(args[0]);

    Dataset<Row> points = spark.range(1, numPoints + 1)
        .selectExpr("rand() as x", "rand() as y");

    // Count points within the unit circle
    long insideCircle = points.filter("x * x + y * y <= 1").count();

    // Estimate Pi
    double piEstimate = 4.0 * insideCircle / numPoints;

    // Display the result
    System.out.println("Pi is approximately " + piEstimate);
  }
}

Similar to the PySpark application, you need to build the docker image and push it to a registry accessible by your cluster, or load it into your cluster's local registry if you're using minikube or kind:

docker build -t java-spark-job examples/java

# For minikube
minikube image load java-spark-job
# For kind
kind load docker-image java-spark-job
# For remote clusters, you will need to change the image name to match your registry,
# and then push it to that registry
docker push java-spark-job
Then, submit the job using the python client:
from __future__ import annotations

import logging

from spark_on_k8s.client import ExecutorInstances, SparkOnK8S

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    spark_client = SparkOnK8S()
    spark_client.submit_app(
        image="java-spark-job",
        app_path="local:///java-job.jar",
        app_arguments=["100000"],
        app_name="spark-java-job-example",
        namespace="spark",
        service_account="spark",
        app_waiter="log",
        class_name="com.oss_tech.examples.TestJob",
        # If you test this locally (minikube or kind) without pushing the image to a registry,
        # you need to set the image_pull_policy to Never.
        image_pull_policy="Never",
        ui_reverse_proxy=True,
        # Run with dynamic allocation enabled, with minimum of 0 executors and maximum of 5 executors
        executor_instances=ExecutorInstances(min=0, max=5),
    )
or using the CLI:
spark-on-k8s app submit \
  --image java-spark-job \
  --path local:///java-job.jar \
  --namespace spark \
  --name spark-java-job-example \
  --service-account spark \
  --image-pull-policy Never \
  --executor-min-instances 0 \
  --executor-max-instances 5 \
  --executor-initial-instances 5 \
  --ui-reverse-proxy \
  --logs \
  --class com.oss_tech.examples.TestJob \
  100000

Driver configuration

This package manages Spark applications in Kubernetes by directly creating the driver pod and submitting the job in client mode. This approach bypasses some of Spark's built-in driver configuration mechanisms, causing certain settings to be ignored (e.g., spark.kubernetes.driver.podTemplateFile, spark.kubernetes.driver.label.[LabelName], ...).

To address this, the package provides internal implementations for some key driver configurations, ensuring greater control and flexibility when deploying Spark applications in Kubernetes.

Driver service account

Instead of providing spark.kubernetes.authenticate.driver.serviceAccountName=<some-service-account>, you can use:

spark_client.submit_app(
    ...,
    service_account="<some-service-account>",
)

Driver resources

Instead of providing spark.driver.memory, spark.driver.cores, spark.driver.memoryOverhead, you can use:

from spark_on_k8s.client import PodResources

spark_client.submit_app(
    ...,
    driver_resources=PodResources(cpu=1, memory=512, memory_overhead=128),
)

Driver environment variables

Instead of providing spark.kubernetes.driverEnv.[EnvironmentVariableName]=<some-value>, the package provides a safer way by creating an ephemeral secret and mounting it as an environment variable in the driver pod:

spark_client.submit_app(
    ...,
    secret_values={"EnvironmentVariableName": "some-value"},
)

Driver volume mounts

Instead of providing spark.kubernetes.driver.volumes, spark.kubernetes.driver.volumeMounts, you can use:

from kubernetes import client as k8s

spark_client.submit_app(
    ...,
    volumes=[
        k8s.V1Volume(
            name="volume1",
            host_path=k8s.V1HostPathVolumeSource(path="/mnt/volume1"),
        ),
        k8s.V1Volume(
            name="volume2",
            empty_dir=k8s.V1EmptyDirVolumeSource(medium="Memory", size_limit="1Gi"),
        ),
        k8s.V1Volume(
            name="volume3",
            secret=k8s.V1SecretVolumeSource(
                secret_name="secret1", items=[k8s.V1KeyToPath(key="key1", path="path1")]
            ),
        ),
        k8s.V1Volume(
            name="volume4",
            config_map=k8s.V1ConfigMapVolumeSource(
                name="configmap1", items=[k8s.V1KeyToPath(key="key1", path="path1")]
            ),
        ),
        k8s.V1Volume(
            name="volume5",
            projected=k8s.V1ProjectedVolumeSource(
                sources=[
                    k8s.V1VolumeProjection(
                        secret=k8s.V1SecretProjection(items=[k8s.V1KeyToPath(key="key1", path="path1")])
                    ),
                    k8s.V1VolumeProjection(
                        config_map=k8s.V1ConfigMapProjection(
                            items=[k8s.V1KeyToPath(key="key1", path="path1")]
                        )
                    ),
                ]
            ),
        ),
        k8s.V1Volume(
            name="volume6",
            nfs=k8s.V1NFSVolumeSource(server="nfs-server", path="/mnt/volume6", read_only=True),
        ),
        k8s.V1Volume(
            name="volume7",
            persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
                claim_name="pvc", read_only=True
            ),
        ),
    ],
    driver_volume_mounts=[
        k8s.V1VolumeMount(name="volume1", mount_path="/mnt/volume1"),
        k8s.V1VolumeMount(name="volume2", mount_path="/mnt/volume2", sub_path="sub-path"),
        k8s.V1VolumeMount(name="volume3", mount_path="/mnt/volume3"),
        k8s.V1VolumeMount(name="volume4", mount_path="/mnt/volume4"),
        k8s.V1VolumeMount(name="volume5", mount_path="/mnt/volume5"),
        k8s.V1VolumeMount(name="volume6", mount_path="/mnt/volume6"),
        k8s.V1VolumeMount(name="volume7", mount_path="/mnt/volume7", read_only=True),
    ],

)

Driver node selector

Instead of providing spark.kubernetes.driver.node.selector.[labelKey]=<labelValue>, you can use:

spark_client.submit_app(
    ...,
    driver_node_selector={"labelKey": "labelValue"},
)

Driver tolerations

Instead of configuring tolerations for the driver pod via a pod template file, you can use:

from kubernetes import client as k8s

spark_client.submit_app(
    ...,
    driver_tolerations=[
        k8s.V1Toleration(
            key="key1",
            operator="Equal",
            value="value1",
            effect="NoSchedule",
        ),
        k8s.V1Toleration(
            key="key2",
            operator="Exists",
            effect="NoExecute",
        ),
    ],
)

Driver init containers

Instead of adding init containers to the driver pod via a pod template file, you can use:

from kubernetes import client as k8s

spark_client.submit_app(
    ...,
    driver_init_containers=[
        k8s.V1Container(
            name="init-container1",
            image="init-container1-image",
            command=["init-command1"],
            args=["init-arg1"],
        ),
        k8s.V1Container(
            name="init-container2",
            image="init-container2-image",
            command=["init-command2"],
            args=["init-arg2"],
        ),
    ],
)

Driver host aliases

Instead of providing host aliases (entries added to pod's /etc/hosts file) for the driver pod via a pod template file, you can use:

from kubernetes import client as k8s

spark_client.submit_app(
    ...,
    driver_host_aliases=[
        k8s.V1HostAlias(
            hostnames=["foo.local", "bar.local"],
            ip="127.0.0.1",
        )
    ],
)