Skip to content

operator_links

Bases: BaseOperatorLink

Operator link for SparkOnK8SOperator.

It allows users to access Spark job UI and spark history UI using SparkOnK8SOperator.

Source code in spark_on_k8s/airflow/operator_links.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class SparkOnK8SOperatorLink(BaseOperatorLink):
    """
    Operator link for SparkOnK8SOperator.

    It allows users to access Spark job UI and spark history UI using SparkOnK8SOperator.
    """

    name = "Spark Job UI"

    @staticmethod
    def persist_spark_ui_link(
        context: Context,
        task_instance: BaseOperator,
        spark_on_k8s_service_url: str,
        namespace: str,
        spark_app_id: str,
    ):
        """
        Persist Spark UI link to XCom.
        """
        from spark_on_k8s.airflow.operators import SparkOnK8SOperator

        spark_ui_link = f"{spark_on_k8s_service_url}/webserver/ui/{namespace}/{spark_app_id}"
        task_instance.xcom_push(
            context,
            key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK,
            value=spark_ui_link,
        )

    @staticmethod
    def persist_spark_history_ui_link(
        context: Context, task_instance: BaseOperator, spark_on_k8s_service_url: str, spark_app_id: str
    ):
        """
        Persist Spark history UI link to XCom.
        """
        from spark_on_k8s.airflow.operators import SparkOnK8SOperator

        spark_history_ui_link = f"{spark_on_k8s_service_url}/webserver/ui-history/history/{spark_app_id}"
        task_instance.xcom_push(
            context,
            key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK,
            value=spark_history_ui_link,
        )

    def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
        """
        Get link to Spark job UI or Spark history UI.
        """
        from spark_on_k8s.airflow.operators import SparkOnK8SOperator

        spark_ui_link = XCom.get_value(ti_key=ti_key, key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK)
        if spark_ui_link:
            return spark_ui_link
        else:
            return ""

Get link to Spark job UI or Spark history UI.

Source code in spark_on_k8s/airflow/operator_links.py
58
59
60
61
62
63
64
65
66
67
68
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
    """
    Get link to Spark job UI or Spark history UI.
    """
    from spark_on_k8s.airflow.operators import SparkOnK8SOperator

    spark_ui_link = XCom.get_value(ti_key=ti_key, key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK)
    if spark_ui_link:
        return spark_ui_link
    else:
        return ""

Persist Spark history UI link to XCom.

Source code in spark_on_k8s/airflow/operator_links.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@staticmethod
def persist_spark_history_ui_link(
    context: Context, task_instance: BaseOperator, spark_on_k8s_service_url: str, spark_app_id: str
):
    """
    Persist Spark history UI link to XCom.
    """
    from spark_on_k8s.airflow.operators import SparkOnK8SOperator

    spark_history_ui_link = f"{spark_on_k8s_service_url}/webserver/ui-history/history/{spark_app_id}"
    task_instance.xcom_push(
        context,
        key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK,
        value=spark_history_ui_link,
    )

Persist Spark UI link to XCom.

Source code in spark_on_k8s/airflow/operator_links.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@staticmethod
def persist_spark_ui_link(
    context: Context,
    task_instance: BaseOperator,
    spark_on_k8s_service_url: str,
    namespace: str,
    spark_app_id: str,
):
    """
    Persist Spark UI link to XCom.
    """
    from spark_on_k8s.airflow.operators import SparkOnK8SOperator

    spark_ui_link = f"{spark_on_k8s_service_url}/webserver/ui/{namespace}/{spark_app_id}"
    task_instance.xcom_push(
        context,
        key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK,
        value=spark_ui_link,
    )