Apache Zeppelin, Spark Streaming and Amazon Kinesis: Simple Guide and Examples

Apache Zeppelin, Spark Streaming and Amazon Kinesis: Simple Guide and Examples

Last updated:
Table of Contents

Zeppelin is a web UI for data processing tools. It can be used with spark and spark streaming (one possible source of data is Kinesis (also Kafka, Flume, etc)).

EMR supports Spark 1.6.0 and now it supports Zeppelin too.

Prerequisites

You need to add the Kinesis Jar as a dependency in Spark.

One way to do it is to change the last line in file /usr/lib/zeppelin/conf/zeppelin-env.sh so that it looks like this:

export SPARK_SUBMIT_OPTIONS="$SPARK_SUBMIT_OPTIONS --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.0"

General Workflow

Suggested workflow for debugging Spark Streaming applications:

  1. Write your code paragraphs and run them

  2. Run ssc.start()

  3. Wait a couple of seconds (at least as long as your batchDuration)

  4. Run ssc.stop(stopSparkContext=false, stopGracefully=true)

  5. Wait until output is printed onto the screen

  6. Delete the checkpoints on DynamoDB

  7. Go to step 1.

Troubleshooting common mistakes

org.apache.spark.SparkException: Task not serializable

If running code outside of any block, use @transient to prevent things from being "sucked" into the context, because then Spark will need to serialize it.

For example:

@transient val ssc = new StreamingContext(sc,windowDuration)

ssc.remember(rememberDuration)

@transient val streams = (0 until 1).map { i =>
  KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
   InitialPositionInStream.LATEST, windowDuration, StorageLevel.MEMORY_ONLY)
}

streams.print()

ssc.start()

Nothing gets printed after you stop the context

There are a couple of possible reasons for this:

  • Kinesis uses a DynamoDB instance to save checkpoints so that it can know what data it needs to read. You should probably delete the entries in the DynamoDB table used by Kinesis in between ssc.start/ssc.stop cycles.

  • You didn't call print() on any streams.

  • You stopped the context before the first batch window closed.

Adding external Jar Dependencies to Zeppelin

If, for some reason, calling z.load("org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.0") doesn't work for you (I sometimes see this error when I try to use the context after calling load()), you can SSH into your instance and edit the file at /usr/lib/zeppelin/conf/zeppelin-env.sh and change the last line so that it looks like this:

export SPARK_SUBMIT_OPTIONS="$SPARK_SUBMIT_OPTIONS --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.0"

java.lang.IllegalStateException: RpcEnv has been stopped

You have probably stopped the underlying SparkContext as well as the StreamingContext. You need to restart the SparkInterpreter.

click-on-interpreter Click on "Interperter" to open the interpreter view

restart-interpreter Click on "restart" for changes to take effect

To avoid this, always close the StreamingContext but leave the underlying SparkContext alive:

ssc.stop(stopSparkContext=false, stopGracefully=true)

java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported

You have probably stopped the SparkContext; you need to restart the SparkInterpreter. See the bullet point above for a solution.

Dialogue & Discussion