Bases: BaseOperatorLink
Operator link for SparkOnK8SOperator.
It allows users to access Spark job UI and spark history UI using SparkOnK8SOperator.
Source code in spark_on_k8s/airflow/operator_links.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 | class SparkOnK8SOperatorLink(BaseOperatorLink):
"""
Operator link for SparkOnK8SOperator.
It allows users to access Spark job UI and spark history UI using SparkOnK8SOperator.
"""
name = "Spark Job UI"
@staticmethod
def persist_spark_ui_link(
context: Context,
task_instance: BaseOperator,
spark_on_k8s_service_url: str,
namespace: str,
spark_app_id: str,
):
"""
Persist Spark UI link to XCom.
"""
from spark_on_k8s.airflow.operators import SparkOnK8SOperator
spark_ui_link = f"{spark_on_k8s_service_url}/webserver/ui/{namespace}/{spark_app_id}"
task_instance.xcom_push(
context,
key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK,
value=spark_ui_link,
)
@staticmethod
def persist_spark_history_ui_link(
context: Context, task_instance: BaseOperator, spark_on_k8s_service_url: str, spark_app_id: str
):
"""
Persist Spark history UI link to XCom.
"""
from spark_on_k8s.airflow.operators import SparkOnK8SOperator
spark_history_ui_link = f"{spark_on_k8s_service_url}/webserver/ui-history/history/{spark_app_id}"
task_instance.xcom_push(
context,
key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK,
value=spark_history_ui_link,
)
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
"""
Get link to Spark job UI or Spark history UI.
"""
from spark_on_k8s.airflow.operators import SparkOnK8SOperator
spark_ui_link = XCom.get_value(ti_key=ti_key, key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK)
if spark_ui_link:
return spark_ui_link
else:
return ""
|
get_link(operator, *, ti_key)
Get link to Spark job UI or Spark history UI.
Source code in spark_on_k8s/airflow/operator_links.py
58
59
60
61
62
63
64
65
66
67
68 | def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
"""
Get link to Spark job UI or Spark history UI.
"""
from spark_on_k8s.airflow.operators import SparkOnK8SOperator
spark_ui_link = XCom.get_value(ti_key=ti_key, key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK)
if spark_ui_link:
return spark_ui_link
else:
return ""
|
persist_spark_history_ui_link(context, task_instance, spark_on_k8s_service_url, spark_app_id)
staticmethod
Persist Spark history UI link to XCom.
Source code in spark_on_k8s/airflow/operator_links.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 | @staticmethod
def persist_spark_history_ui_link(
context: Context, task_instance: BaseOperator, spark_on_k8s_service_url: str, spark_app_id: str
):
"""
Persist Spark history UI link to XCom.
"""
from spark_on_k8s.airflow.operators import SparkOnK8SOperator
spark_history_ui_link = f"{spark_on_k8s_service_url}/webserver/ui-history/history/{spark_app_id}"
task_instance.xcom_push(
context,
key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK,
value=spark_history_ui_link,
)
|
persist_spark_ui_link(context, task_instance, spark_on_k8s_service_url, namespace, spark_app_id)
staticmethod
Persist Spark UI link to XCom.
Source code in spark_on_k8s/airflow/operator_links.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 | @staticmethod
def persist_spark_ui_link(
context: Context,
task_instance: BaseOperator,
spark_on_k8s_service_url: str,
namespace: str,
spark_app_id: str,
):
"""
Persist Spark UI link to XCom.
"""
from spark_on_k8s.airflow.operators import SparkOnK8SOperator
spark_ui_link = f"{spark_on_k8s_service_url}/webserver/ui/{namespace}/{spark_app_id}"
task_instance.xcom_push(
context,
key=SparkOnK8SOperator.XCOM_SPARK_UI_LINK,
value=spark_ui_link,
)
|