Spark Streaming: Commong Pitfalls and Tips for Long-running Streaming Applications

Last updated:
Spark Streaming: Commong Pitfalls and Tips for Long-running Streaming Applications
Source

WIP Alert This is a work in progress. Current information is correct but more content will probably be added in the future.

HEADS-UP - all information on this post refer mainly to Spark 2.x, running on YARN (AWS EMR), but most are likely relevant for other versions/setups too!

Short Windows and Long Windows

You probably know you can call methods such as reduceByKeyAndWindow or just window to keep a live window of data so that you can compute aggregate metrics on batches.

TODO split Main methods to share code and configurations

foreachRDD method

It's always important to remember that any code you execute in forEachRDD blocks is executed in the driver, so you probably do not want to call .collect() on large RDDs because you may crash your driver.

Disk usage

You can run out of space on HDFS (thereby crasing your app) when you have a cluster up for a long time.

For example, logs under /var/log/spark may pile up, especially if you have loose logging settings and/or print a lot of stuff to STDOUT.

You can check your current disk usage using commands such as

  • $ hadoop fs -df -h /

  • $ hadoop fs -du -h /

Configuring RollingFileAppender and setting file location to YARN’s log directory will avoid disk overflow caused by large log files, and logs can be accessed using YARN’s log utility.

(AWS EMR ONLY) Event Logs under hdfs:/var/log/spark/apps (when using a history Server)

This is a special case of the issue mentioned above (disk usage issues)

This is where event logs are kept by default on AWS EMR (it runs on YARN) so that you can view them in the Spark History Server.

Thing is, this grows like crazy sometimes, especically for long-running application, and this may crash your application or YARN cluster if you run out of space.

Spark event logs look like this (content ommited)

{"Event":"SparkListenerTaskEnd","Stage ID":1,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task 
Info":{"Task ID":54,"Index":4,"Attempt":0,"Launch Time":1482870344233,"Executor ID":"2","Host":"ip-172-16-1-95.ec2.internal","Locality":
"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1482870344688,"Failed":false,"Accumulables":[{"ID":1122,"Name":"i
nternal.metrics.executorDeserializeTime"

The relevant configuration options for the event logs can be seen here: Spark configuration Options - Monitoring

But if you want to just disable these, you should set spark.eventLog.enabled to false.

GC Issues

You generally want predictable behaviour from your streaming application so you want to avoid stop-the-world garbage collection, i.e. settings that configure Java to do its garbage collection all at once, thereby potentially freezing the application until it's done collecting the garbage.

Gracefully stopping your apps to avoid losing data

As of recent versions, you can call ssc.stop() with an extra parameter, namely stopGracefully. For example:

ssc.stop(stopSparkContext = true, stopGracefully = true)

But the thing is, how do you obtain a reference to the context from outside?

One solution is to expose an HTTP REST endpoint using a http api (e.g. akka-http) and call that endpoint when you want to shutdown your application (you can reference the streaming context then).

TODO what about yarn.nodemanager.sleep-delay-before-sigkill.ms?

Unpersisting RDDs after using them

In newer Spark versions, this is done by default, so there's nothing you need to do here.

See configuration option spark.streaming.unpersist; its default value is true

Keeping State

TODO example

Make sure any state you keep (using functions like mapWithState) does not grow in unbounded fashion otherwise you risk crashing your application due to lack of space.

Also, you need to delete any saved state if you change the classes of the objects being saved. This can be done by logging into the master node and deleting stuff on HDFS (we generally save stuff on HDFS under /tmp/spark).

This means that if you add attributes to classes of saved objects or even change their names you need to delete any saved state before launching a new application.


See also

Dialogue & Discussion