Skip to content

spark_sql

execute_sql_commands(spark_session, commands)

Executes a list of SQL commands on a Spark session.

Source code in spark_on_k8s/airflow/scripts/spark_sql.py
20
21
22
23
24
25
26
27
28
29
def execute_sql_commands(spark_session: SparkSession, commands):
    """
    Executes a list of SQL commands on a Spark session.
    """
    for command in commands:
        try:
            logger.info(f"Executing: {command}")
            spark_session.sql(command).show(truncate=False)
        except Exception:
            logger.exception(f"Error executing SQL command: {command}")

read_sql_file(file_path)

Reads an SQL file and splits commands by semicolon ";".

Source code in spark_on_k8s/airflow/scripts/spark_sql.py
10
11
12
13
14
15
16
17
def read_sql_file(file_path):
    """
    Reads an SQL file and splits commands by semicolon ";".
    """
    with open(file_path) as f:
        file_content = f.read()
    # Split commands by semicolon while ignoring empty lines
    return [command.strip() for command in file_content.split(";") if command.strip()]