Examples
Here are some examples of how to package and submit spark apps with this package. In the examples, the base image is built with the spark image tool, as described in the spark documentation.
Python
In this example, we use a small PySpark application that takes a parameter num_points
to calculate the value of Pi:
from __future__ import annotations
import random
import sys
from my_modules.pi import is_point_in_the_unit_circle
from pyspark.sql import SparkSession
if __name__ == "__main__":
# Check if the number of arguments is correct
if len(sys.argv) != 2:
print("Usage: python script.py <num_points>")
sys.exit(1)
# Get the number of points from command-line arguments
num_points = int(sys.argv[1])
# Create a Spark session
spark = SparkSession.builder.appName("Pi-Estimation").getOrCreate()
# Generate random points within the unit square
points = spark.sparkContext.parallelize(range(1, num_points + 1)).map(
lambda _: (random.random(), random.random())
)
# Count points within the unit circle
inside_circle = points.filter(is_point_in_the_unit_circle)
# Estimate Pi
pi_estimate = 4 * inside_circle.count() / num_points
# Display the result
print(f"Pi is approximately {pi_estimate}")
# Stop the Spark session
spark.stop()
FROM husseinawala/spark-py:v3.5.0
# Add the job and the modules
ADD my_modules /opt/spark/work-dir/my_modules
ADD job.py /opt/spark/work-dir/
# Update the PYTHONPATH to include the modules
ENV PYTHONPATH "${PYTHONPATH}:/opt/spark/work-dir"
First, build the docker image and push it to a registry accessible by your cluster, or load it into your cluster's local registry if you're using minikube or kind:
docker build -t pyspark-job examples/python
# For minikube
minikube image load pyspark-job
# For kind
kind load docker-image pyspark-job
# For remote clusters, you will need to change the image name to match your registry,
# and then push it to that registry
docker push pyspark-job
from __future__ import annotations
import logging
from spark_on_k8s.client import ExecutorInstances, PodResources, SparkOnK8S
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
spark_client = SparkOnK8S()
spark_client.submit_app(
image="pyspark-job",
app_path="local:///opt/spark/work-dir/job.py",
app_arguments=["100000"],
app_name="pyspark-job-example",
namespace="spark",
service_account="spark",
app_waiter="log",
# If you test this locally (minikube or kind) without pushing the image to a registry,
# you need to set the image_pull_policy to Never.
image_pull_policy="Never",
ui_reverse_proxy=True,
driver_resources=PodResources(cpu=1, memory=512, memory_overhead=128),
executor_resources=PodResources(cpu=1, memory=512, memory_overhead=128),
# Run with 5 executors
executor_instances=ExecutorInstances(initial=5),
)
spark-on-k8s app submit \
--image pyspark-job \
--path local:///opt/spark/work-dir/job.py \
--namespace spark \
--name pyspark-job-example \
--service-account spark \
--image-pull-policy Never \
--driver-cpu 1 \
--driver-memory 512 \
--driver-memory-overhead 128 \
--executor-cpu 1 \
--executor-memory 512 \
--executor-memory-overhead 128 \
--executor-initial-instances 5 \
--ui-reverse-proxy \
--logs \
100000
Java
This example is similar to the previous one, but it's implemented in java (with maven).
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.oss-tech.examples</groupId>
<artifactId>java-job</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<spark.version>3.5.3</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
src/main/java/com/oss_tech/examples/TestJob.java
):
package com.oss_tech.examples;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class TestJob {
public static void main(String[] args) {
// Check if the correct number of arguments is provided
if (args.length != 1) {
System.err.println("Usage: PiEstimation <num_points>");
System.exit(1);
}
// Initialize Spark session
final SparkSession spark = SparkSession.builder()
.appName("Spark Java example")
.getOrCreate();
// Get the number of points from command-line arguments
final int numPoints = Integer.parseInt(args[0]);
Dataset<Row> points = spark.range(1, numPoints + 1)
.selectExpr("rand() as x", "rand() as y");
// Count points within the unit circle
long insideCircle = points.filter("x * x + y * y <= 1").count();
// Estimate Pi
double piEstimate = 4.0 * insideCircle / numPoints;
// Display the result
System.out.println("Pi is approximately " + piEstimate);
}
}
Similar to the PySpark application, you need to build the docker image and push it to a registry accessible by your cluster, or load it into your cluster's local registry if you're using minikube or kind:
docker build -t java-spark-job examples/java
# For minikube
minikube image load java-spark-job
# For kind
kind load docker-image java-spark-job
# For remote clusters, you will need to change the image name to match your registry,
# and then push it to that registry
docker push java-spark-job
from __future__ import annotations
import logging
from spark_on_k8s.client import ExecutorInstances, SparkOnK8S
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
spark_client = SparkOnK8S()
spark_client.submit_app(
image="java-spark-job",
app_path="local:///java-job.jar",
app_arguments=["100000"],
app_name="spark-java-job-example",
namespace="spark",
service_account="spark",
app_waiter="log",
class_name="com.oss_tech.examples.TestJob",
# If you test this locally (minikube or kind) without pushing the image to a registry,
# you need to set the image_pull_policy to Never.
image_pull_policy="Never",
ui_reverse_proxy=True,
# Run with dynamic allocation enabled, with minimum of 0 executors and maximum of 5 executors
executor_instances=ExecutorInstances(min=0, max=5),
)
spark-on-k8s app submit \
--image java-spark-job \
--path local:///java-job.jar \
--namespace spark \
--name spark-java-job-example \
--service-account spark \
--image-pull-policy Never \
--executor-min-instances 0 \
--executor-max-instances 5 \
--executor-initial-instances 5 \
--ui-reverse-proxy \
--logs \
--class com.oss_tech.examples.TestJob \
100000