Skip to content

Examples

Here are some examples of how to package and submit spark apps with this package. In the examples, the base image is built with the spark image tool, as described in the spark documentation.

Python

In this example, we use a small PySpark application that takes a parameter num_points to calculate the value of Pi:

from __future__ import annotations

import random
import sys

from my_modules.pi import is_point_in_the_unit_circle
from pyspark.sql import SparkSession

if __name__ == "__main__":
    # Check if the number of arguments is correct
    if len(sys.argv) != 2:
        print("Usage: python script.py <num_points>")
        sys.exit(1)

    # Get the number of points from command-line arguments
    num_points = int(sys.argv[1])

    # Create a Spark session
    spark = SparkSession.builder.appName("Pi-Estimation").getOrCreate()

    # Generate random points within the unit square
    points = spark.sparkContext.parallelize(range(1, num_points + 1)).map(
        lambda _: (random.random(), random.random())
    )

    # Count points within the unit circle
    inside_circle = points.filter(is_point_in_the_unit_circle)

    # Estimate Pi
    pi_estimate = 4 * inside_circle.count() / num_points

    # Display the result
    print(f"Pi is approximately {pi_estimate}")

    # Stop the Spark session
    spark.stop()
and a Dockerfile to package the application with the spark image:
FROM husseinawala/spark-py:v3.5.0

# Add the job and the modules
ADD my_modules /opt/spark/work-dir/my_modules
ADD job.py /opt/spark/work-dir/
# Update the PYTHONPATH to include the modules
ENV PYTHONPATH "${PYTHONPATH}:/opt/spark/work-dir"

First, build the docker image and push it to a registry accessible by your cluster, or load it into your cluster's local registry if you're using minikube or kind:

docker build -t pyspark-job examples/python

# For minikube
minikube image load pyspark-job
# For kind
kind load docker-image pyspark-job
# For remote clusters, you will need to change the image name to match your registry,
# and then push it to that registry
docker push pyspark-job
Then, you can submit the job using the python client:
from __future__ import annotations

import logging

from spark_on_k8s.client import ExecutorInstances, PodResources, SparkOnK8S

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    spark_client = SparkOnK8S()
    spark_client.submit_app(
        image="pyspark-job",
        app_path="local:///opt/spark/work-dir/job.py",
        app_arguments=["100000"],
        app_name="pyspark-job-example",
        namespace="spark",
        service_account="spark",
        app_waiter="log",
        # If you test this locally (minikube or kind) without pushing the image to a registry,
        # you need to set the image_pull_policy to Never.
        image_pull_policy="Never",
        ui_reverse_proxy=True,
        driver_resources=PodResources(cpu=1, memory=512, memory_overhead=128),
        executor_resources=PodResources(cpu=1, memory=512, memory_overhead=128),
        # Run with 5 executors
        executor_instances=ExecutorInstances(initial=5),
    )
or using the CLI:
spark-on-k8s app submit \
  --image pyspark-job \
  --path local:///opt/spark/work-dir/job.py \
  --namespace spark \
  --name pyspark-job-example \
  --service-account spark \
  --image-pull-policy Never \
  --driver-cpu 1 \
  --driver-memory 512 \
  --driver-memory-overhead 128 \
  --executor-cpu 1 \
  --executor-memory 512 \
  --executor-memory-overhead 128 \
  --executor-initial-instances 5 \
  --ui-reverse-proxy \
  --logs \
  100000

Java

This example is similar to the previous one, but it's implemented in java (with maven).

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.oss-tech.examples</groupId>
    <artifactId>java-job</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <spark.version>3.5.3</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
</project>
the job class (src/main/java/com/oss_tech/examples/TestJob.java):
package com.oss_tech.examples;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;


public class TestJob {
  public static void main(String[] args) {
    // Check if the correct number of arguments is provided
    if (args.length != 1) {
      System.err.println("Usage: PiEstimation <num_points>");
      System.exit(1);
    }

    // Initialize Spark session
    final SparkSession spark = SparkSession.builder()
        .appName("Spark Java example")
        .getOrCreate();

    // Get the number of points from command-line arguments
    final int numPoints = Integer.parseInt(args[0]);

    Dataset<Row> points = spark.range(1, numPoints + 1)
        .selectExpr("rand() as x", "rand() as y");

    // Count points within the unit circle
    long insideCircle = points.filter("x * x + y * y <= 1").count();

    // Estimate Pi
    double piEstimate = 4.0 * insideCircle / numPoints;

    // Display the result
    System.out.println("Pi is approximately " + piEstimate);
  }
}

Similar to the PySpark application, you need to build the docker image and push it to a registry accessible by your cluster, or load it into your cluster's local registry if you're using minikube or kind:

docker build -t java-spark-job examples/java

# For minikube
minikube image load java-spark-job
# For kind
kind load docker-image java-spark-job
# For remote clusters, you will need to change the image name to match your registry,
# and then push it to that registry
docker push java-spark-job
Then, submit the job using the python client:
from __future__ import annotations

import logging

from spark_on_k8s.client import ExecutorInstances, SparkOnK8S

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    spark_client = SparkOnK8S()
    spark_client.submit_app(
        image="java-spark-job",
        app_path="local:///java-job.jar",
        app_arguments=["100000"],
        app_name="spark-java-job-example",
        namespace="spark",
        service_account="spark",
        app_waiter="log",
        class_name="com.oss_tech.examples.TestJob",
        # If you test this locally (minikube or kind) without pushing the image to a registry,
        # you need to set the image_pull_policy to Never.
        image_pull_policy="Never",
        ui_reverse_proxy=True,
        # Run with dynamic allocation enabled, with minimum of 0 executors and maximum of 5 executors
        executor_instances=ExecutorInstances(min=0, max=5),
    )
or using the CLI:
spark-on-k8s app submit \
  --image java-spark-job \
  --path local:///java-job.jar \
  --namespace spark \
  --name spark-java-job-example \
  --service-account spark \
  --image-pull-policy Never \
  --executor-min-instances 0 \
  --executor-max-instances 5 \
  --executor-initial-instances 5 \
  --ui-reverse-proxy \
  --logs \
  --class com.oss_tech.examples.TestJob \
  100000