Creating a Spark Cluster on AWS EMR: a Tutorial

Creating a Spark Cluster on AWS EMR: a Tutorial

Last updated:
Creating a Spark Cluster on AWS EMR: a Tutorial
Source
Table of Contents

AWS Elastic MapReduce is a way to remotely create and control Hadoop and Spark clusters on AWS.

You can think of it as something like Hadoop-as-a-service; you spin up a cluster (with as many nodes as you like), define the job that you want to run, input and output locations and you only pay for the time your cluster was actually up.

In other words, you can work on massive problems on large clusters using AWS infrastructure and only pay a couple of dollars.

What's more, you don't need to spend any time configuring the cluster, the machines and the software; it's all done for you.

Prerequisites: You need an AWS Account, you need to install and configure the AWS CLI tool and you also need to create default roles (just run $ aws emr create-default-roles)

See how to interact with your cluster on Using the AWS CLI to manage Spark Clusters on EMR: Examples and Reference

Simplest possible example

Create a 1-node Spark 1.5.0 cluster that terminates as soon as it is up. This is just an example to get you started.

The following bash command spins up a single-node Spark cluster on a m3.xlarge EC2 instance and then closes it.

You will not be charged because you didn't do any work (the cluster shuts down after any steps are run).

Adding a \ at the end of a line tells bash it should continue the command on the next line

$ aws emr create-cluster \
    --name "1-node dummy cluster" \
    --instance-type m3.xlarge \
    --release-label emr-4.1.0 \
    --instance-count 1 \
    --use-default-roles \
    --applications Name=Spark \
    --auto-terminate

You can verify that it has been created and terminated by navigating to the EMR section on the AWS Console associated with your AWS account.

A few seconds after running the command, the top entry in you cluster list should look like this:

cluster-before

If you don't see the cluster in your cluster list, make sure you have created the cluster in the same aws-region you are looking at.

After a few minutes, it will finish loading and then terminate:

cluster-after

Start a cluster and run a Custom Spark Job

If you are to do real work on EMR, you need to submit an actual Spark job. These are called steps in EMR parlance and all you need to do is to add a --steps option to the command above.

A custom Spark Job can be something as simple as this (Scala code):

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.{SQLContext,Row}

/**
 * Sample Job to showcase AWS EMR functionality.
 *
 * This job takes arguments: <s3_input_dir>, <s3_output_dir>
 *
 * It will load any json files located at <s3_input_dir>, load them into a DataFrame,
 * remove any rows that contain NULL values and write it back to <s3_output_dir>
 */
object ProcessData {

  def main(args: Array[String]) {

    if (args.length < 2) {
      System.err.println("Please set arguments for <s3_input_dir> <s3_output_dir>")
      System.exit(1)
    }

    val inputDir = args(0)
    val outputDir = args(1)

    val cnf = new SparkConf()
      .setAppName("Example AWS EMR Spark Job")

    val sc = new SparkContext(cnf)

    val sqlContext = new SQLContext(sc)

    // initializing the dataframe from json file
    val inputDF = sqlContext.read.json(inputDir)

    // schema is inferred automatically
    val schema = inputDF.schema

    // transform dataframe into RDD so that we can call filter
    // to remove any rows with Null values
    val cleanRDD = inputDF.rdd.filter { row: Row => !row.anyNull  }

    // then recreate the dataframe from the RDD
    val outputDF = sqlContext.createDataFrame(cleanRDD, schema)

    // write the dataframe back to the output dir
    outputDF.write.json(outputDir)

    sc.stop()
  }
}

Once you've packaged the job into a JAR file, you need to upload it to S3. You just need to create a bucket and enable any authenticated users to read from it.

For more info on how to package a Spark Job (as a JAR file), please see Creating Scala Fat Jars for Spark on SBT with sbt-assembly Plugin

Supposing you own an S3 Bucket called my-example-bucket , you can upload the job JAR file (lets call it ProcessData.jar) to it via the command line as well:

$ aws s3 cp path/to/ProcessData.jar s3://my-example-bucket/jars/

The next thing to do is to create a file called step.json and add the description for the step you want to run.

It's quite self-explanatory; you need to tell AWS where your Jar is located and pass any parameters your script may need (in our case, two S3 urls):

[
  {
    "Name": "Sample AWS EMR Spark Job",
    "Type":"CUSTOM_JAR",
    "Jar":"command-runner.jar",
    "Args":
    [
      "spark-submit",
      "--deploy-mode", "cluster",
      "--class", "ProcessData",
      "s3://my-example-bucket/jars/ProcessData.jar",
      "s3://my-example-bucket/input-files",
      "s3://my-example-bucket/output-files"
    ],
    "ActionOnFailure": "TERMINATE_CLUSTER"
  }
]

With step.jsonin the same directory you are, you can now run:

aws emr create-cluster \
    --name "Sample Spark Cluster with a single Job" \
    --instance-type m3.xlarge \
    --release-label emr-4.1.0 \
    --instance-count 1 \
    --use-default-roles \
    --applications Name=Spark \
    --steps file://step.json \
    --auto-terminate

The only difference is the extra --steps option; by adding it, you are instructing EMR to run that Job and then terminate.

It will consume any data found in the input directory passed as the first argument (in our example, s3://my-example-bucket/input-files) and write the result to the directory given by the next argument (in our case, s3://my-example-bucket/output-files).

Note that you need write permissions to these buckets, or you'll get an error.

See also

Dialogue & Discussion