Airflow and Spark: Running Spark Jobs in Apache Airflow

Airflow and Spark: Running Spark Jobs in Apache Airflow

In this blog, we run and schedule Spark jobs on Apache Airflow, we built a Spark job that extracts data from an API, transforms the result JSON data, and loads the data into an S3 bucket. We have decided to run Spark and Airflow locally, and we have configured Spark and Airflow to talk together using the Airflow UI. To ensure smooth communication between Spark and S3 bucket, we have set up an S3 access point, and a dedicated AWS IAM role so that data is sent to the S3 bucket directly from our Spark application.

Introduction

Apache Spark is a unified analytics engine for large-scale data processing. Spark is a powerful open-source tool which provides high-level APIs in Java, Python, Scala, and R, and an optimized engine that supports general computation graphs for data analysis and different workloads. Another interesting feature of Spark is its fast processing ability and fault-tolerance, you can rest assured that your deployment can be consistent in a situation where there is a resource failure. This is a common problem of big data, you never can tell when things might go wrong, but having an engine that can continue with its operation without impacting critical business processes is very important.

Interestingly, Spark also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing. In this blog post, we are going to employ Spark to transform data coming from an API, and also use Spark to load the data into the S3 bucket. Before we do that, let’s look at some fascinating features of Apache Spark. 

A diagram showing Spark ecosystem

Some features of Apache Spark are:

  • The fast processing speed of Spark
  • Spark is easy to use
  • Presence of powerful tools to help with different big data workloads
  • Real-time stream processing
  • Spark can run on a distributed platforms and can run in a variety of systems

You can read more about all these from here.

With that, let’s go straight at it. In this blog, we are going to schedule Spark jobs on Apache Airflow, we are going to build a Spark job that extracts data from an API, transforms the JSON data, and loads this data into an S3 bucket. Airflow is another powerful tool that allows you to orchestrate workflows, so we are going to design two scripts one for our Spark job and one for our Airflow DAGs. For these scripts to be executed successfully, Apache Airflow and Apache Spark must be installed. So our plan is to install Airflow, and Spark locally, and install all of the dependencies for both Airflow and Spark. If you would like to learn more about Apache Airflow, please check a blog post there from here. The code for this article is available on Github.

A diagram showing an ETL data pipeline

Installing Apache Spark locally

For Spark to run smoothly, make sure you have Java installed, and set the JAVA_HOME environment variable pointing to the Java installation folder. For an easy life, we have prepared a bash script that installs Spark, installs all the necessary dependencies, and makes settings for Spark to run smoothly. In our case, we needed the Amazon Web Services Hadoop jar file and the Amazon Web Services SDK bundle jar file, we got these JAR files to the JAR folder of Spark, and we packaged all the necessary commands to get Spark running locally with a bash script. You can look into the script folder of the Github page, to get a feel of what is going on in the bash script. Make the bash script executable using chmod u+x scripts/spark_installation.sh, and start the bash script with the command below. 

bash scripts/spark_installation.sh
  • Copy the lines below into your terminal, and execute them one by one
source ~/.bashrc
start-master.sh
start-slave.sh spark://XXXXXXXXXXXX:7077 
##{{Replace the URL with your Spark master the one with port 7077}}

Installing Apache Airflow locally

For Apache Airflow to run locally and be able to schedule Spark jobs easily. We have also designed a bash script that installs all the necessary packages, initializes the Airflow database, and performs all the necessary setup. You can clone the repository and add your own dependencies and start scheduling with Airflow. Make the Airflow bash script executable using chmod u+x scripts/airflow_installer.sh, and start the script with the command below. Very importantly, you would be asked to enter your airflow password, this is the password which you would be asked to enter when you get to Airflow UI. With Airflow running locally, we can then move forward to set our connection to Spark, so that AIrflow can trigger our Spark workloads successfully. Please check the diagram below to see how we have done the configuration.  

bash scripts/airflow_installer.sh

Spark connection set up on the Airflow UI

Airflow Spark connection configuration page

Set up S3 Access Point for our Spark and S3 bucket connection

As part of the stage in our ETL processes, we are going to load the transform data into an S3 bucket from Spark. We know that using the general S3 endpoint (s3://sparkjobresult/output) is not a recommended way of sending data into the S3 bucket, so we are going to set up a recommended S3 API (s3a://sparkjosult/output). This is how we have done this

Set up IAM role for the S3 bucket

On your AWS console, navigate to the IAM role page and create a new role based on your use cases. I have provided a sample of the policy configuration of what our role looks like. 

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:*",
                "s3-object-lambda:*"
            ],
            "Resource": "*"
        }
    ]
}
Create an Access point for your S3 bucket

With the S3 IAM role created, we are going to create an access point for the S3 bucket that houses our data coming from Spark. Following the steps as shown in the screenshot, our access point will look like the code below;

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "Statement1",
			"Effect": "Allow",
			"Principal": {
				"AWS": "arn:aws:iam::ACCOUNT_IDENTIFIER:role/S3_Spark"
			},
			"Action": "s3:*",
			"Resource": "arn:aws:s3:us-east-1:ACCOUNT_IDENTIFIER:accesspoint/sparkjobs/object/*"
		}
	]
}
Airflow AWS S3 bucket configuration page

With all of these setups, we are going to design a Spark job that extracts data from an Indian Mutual Fund API, transforms the JSON data into a Spark DataFrame and loads the data into an S3 bucket. We are going to design an Airflow DAG that schedules the Spark job, our Spark job looks like this;

aws_access_key = config('AWS_ACCESS_KEY')
aws_secret_key = config('AWS_SECRET_KEY')

spark = SparkSession \
    .builder \
    .appName("DataExtraction") \
    .getOrCreate()

hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", aws_access_key)
hadoop_conf.set("fs.s3a.secret.key", aws_secret_key)
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

response = requests.get("https://api.mfapi.in/mf/118550")
data = response.json()
json_formatted = json.dumps(data)
with open("./api_data.json", "w") as data_file:
        data_file.write(json_formatted)

raw_json_dataframe = spark.read.format("json") \
                            .option("inferSchema","true") \
                            .load("./api_data.json")

raw_json_dataframe.printSchema()
raw_json_dataframe.createOrReplaceTempView("Mutual_benefit")

dataframe = raw_json_dataframe.withColumn("data", F.explode(F.col("data"))) \
        .withColumn('meta', F.expr("meta")) \
        .select("data.*", "meta.*")
        
dataframe.show(100, False)
dataframe.write.format('csv').option('header','true').save('s3a://sparkjobresult/output',mode='overwrite')

Before we run our Spark jobs on Airflow, we would want to confirm if our jobs were all completed successfully from the command line. If all went well, then we would stage our Spark job on Airflow, and run and schedule the Spark jobs with Airflow. Confirm this with the command below. 

spark-submit --master spark://XXXXXXXXXXXX:7077 spark_etl_script.py
A diagram showing the Spark job result window

Our Airflow DAG looks like the code below.  

spark_dag = DAG(
        dag_id = "spark_airflow_dag",
        default_args=default_args,
        schedule_interval=None,	
        dagrun_timeout=timedelta(minutes=60),
        description='use case of sparkoperator in airflow',
        start_date = airflow.utils.dates.days_ago(1)
        )
Extract = SparkSubmitOperator(
		application ='/workspace/Spark-jobs/spark_etl_script.py',
		conn_id= 'spark_local', 
		task_id='spark_submit_task', 
		dag=spark_dag
		)
Extract

With that, we can now provide the steps to perform to schedule and run Spark jobs. 

  • Now create the Airflow dags folder in AIRFLOW_HOME
mkdir ~/airflow/dags
  • Move the Spark job DAG file to the Airflow dags folder
mv dags/spark_jobs_dag.py ~/airflow/dags

By now if you have followed all the instructions provided in the previous steps, then you should be able to see the Spark jobs DAG in your Airflow UI, please click the spark_airflow_dag and trigger the DAG. The results would look like what you see in the screenshots below. 

A diagram showing running Spark Job DAG on Airflow
A diagram showing Spark Job DAG logs on Airflow
Result
Output in S3

Final Note

One problem you might encounter across Airflow and Spark platforms is the dependency issue. Make sure you install all the necessary libraries and also make sure you target the version of the dependencies that match the version of your Spark installation or Airflow installation. A very important piece of information here is that we have installed Spark and Airflow locally on our computer, but you can set up all these processes on Docker containers. Interestingly, this video shows you how to run and schedule your Spark jobs on Airflow locally, but with a different use case as to what we showed here. The Github page for the deployment is available here, and we have also provided all our code for our deployment on the GitHub page. With all these, you are good to go, you can now start running, and scheduling your Spark jobs on Airflow now. 

Cassandra.Link

Cassandra.Link is a knowledge base that we created for all things Apache Cassandra. Our goal with Cassandra.Link was to not only fill the gap of Planet Cassandra but to bring the Cassandra community together. Feel free to reach out if you wish to collaborate with us on this project in any capacity.

We are a technology company that specializes in building business platforms. If you have any questions about the tools discussed in this post or about any of our services, feel free to send us an email!