Skip to content

triggers

SparkOnK8STrigger

Bases: BaseTrigger

Watch a Spark application on Kubernetes.

Source code in spark_on_k8s/airflow/triggers.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class SparkOnK8STrigger(BaseTrigger):
    """Watch a Spark application on Kubernetes."""

    def __init__(
        self,
        *,
        driver_pod_name: str,
        namespace: str = "default",
        kubernetes_conn_id: str = "kubernetes_default",
        poll_interval: int = 10,
    ):
        super().__init__()
        self.driver_pod_name = driver_pod_name
        self.namespace = namespace
        self.kubernetes_conn_id = kubernetes_conn_id
        self.poll_interval = poll_interval

    def serialize(self) -> tuple[str, dict[str, Any]]:
        return (
            "spark_on_k8s.airflow.triggers.SparkOnK8STrigger",
            {
                "driver_pod_name": self.driver_pod_name,
                "namespace": self.namespace,
                "kubernetes_conn_id": self.kubernetes_conn_id,
                "poll_interval": self.poll_interval,
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        from spark_on_k8s.utils.async_app_manager import AsyncSparkAppManager

        try:
            k8s_client_manager = _AirflowKubernetesAsyncClientManager(
                kubernetes_conn_id=self.kubernetes_conn_id,
            )
            async_spark_app_manager = AsyncSparkAppManager(
                k8s_client_manager=k8s_client_manager,
            )
            await async_spark_app_manager.wait_for_app(
                namespace=self.namespace,
                pod_name=self.driver_pod_name,
                poll_interval=self.poll_interval,
            )
            yield TriggerEvent(
                {
                    "namespace": self.namespace,
                    "pod_name": self.driver_pod_name,
                    "status": await async_spark_app_manager.app_status(
                        namespace=self.namespace,
                        pod_name=self.driver_pod_name,
                    ),
                }
            )
        except Exception as e:
            yield TriggerEvent(
                {
                    "namespace": self.namespace,
                    "pod_name": self.driver_pod_name,
                    "status": "error",
                    "error": str(e),
                    "stacktrace": traceback.format_exc(),
                }
            )