Given the proven power and capability of Apache Spark for large-scale data processing, we use Spark on a regular basis here at ZGL. To write Spark code that will execute efficiently, it is extremely important to be aware of a set of tuning consideration and tricks. Unlike many other blog posts on Spark tuning, this post is intended to provide a concise checklist of such considerations that you can easily inspect your code against in case of performance issues with Spark. Thus, the reader needs to have a basic understanding of the Spark internal architecture and execution model and should be familiar with Spark concepts such as RDDs, transformations, and actions.
This list will be updated as we continue to learn about Spark performance optimization. The current version is created based on the following resources:
- Learning Spark Book – O’Reilly Media
- Spark documentation on tuning
- How-to: Tune Your Apache Spark Jobs (Part 1)
- How-to: Tune Your Apache Spark Jobs (Part 2)
- Tuning Java Garbage Collection for Apache Spark Applications
- By default, Spark serializes by Java ObjectOutputStream framework, which works with any class that implements java.io.Serializable interface.
- Java internal Serializer is flexible but is often slow and produces large serialized objects.
- Switch to Kyro Serializer with SparkConf, which is faster and is more compact.
- You can register your own custom classes with Kyro. Kyro will still work without registering custom classes, but it will be wasteful.
- You may need to increase spark.kyroserializer.buffer to hold the large objects you will serialize.
- When Java evicts old objects to make room for new ones, it needs to trace through all objects and discard the old ones. Thus, the cost of JVM GC is proportional to the number of Java objects.
- Data structures with fewer objects will lower this cost (Array of Int vs. LinkedList).
- Persist objects in serialized form so that there is only one object (byte array) per RDD.
- You can measure GC frequency and time spent by adding
-XX:+PrintGCTimeStampsto Java options.
- Too many or long lasting GCs implies that there isn’t enough memory left for the execution of GC. For efficient memory usage, you can clean up cached/persisted collections after they are no longer needed.
- Advanced: adjust JVM Young (Eden, Survivor1, Survivor2) and Old Heap spaces.
- Execution memory is used for computation (e.g., shuffles, joins, sorts, and agg).
- Storage memory is used for caching and propagating internal data.
- Jobs that do not use cache can use all space for execution, and avoid disk spills.
- Applications that use caching reserve minimum storage space where the data cannot be evicted by execution requirements.
- Set spark.memory.fraction to determine what fraction of the JVM heap space is used for Spark execution/storage memory. The default is 60%.
- Set JVM flag
-XX:+UseCompressedOopsif you have less than 32 GB of RAM to make pointers be four bytes instead of eight.
- Avoid using executors with too much memory as it would delay the JVM garbage collection process.
- Avoid using a lot of very small executors to be able to still benefit from running multiple tasks in a single JVM.
Partitioning and Parallelism
- It is recommended to have 2-3 tasks per CPU core in your cluster.
- Even though Spark attempts to infer a sensible number of partitions for your collections, sometimes you may need to tune the level of parallelism by optimizing the number of partitions.
- RDDs produced by
jsonFileare partitioned based on by the underlying MapReduce InputFormat that’s used. HDFS input RDDs are typically partitioned based on the number of HDFS blocks the data resides on.
- RDDs that are derived from other collections simply inherit the number of partitions from the parent collection.
- Minimize the number of shuffles by using partitioning mechanisms that would allow the program to know that a set of keys will appear together on the same node.
- Hash partitioning: the keys that have the same hash value will appear on the same node.
- Range partitioning: the keys within same range appear on the same node.
- Spark operations that involves shuffling data by key benefit from partitioning: cogroup(), groupWith(), join(), groupByKey(), combineByKey(), reduceByKey(), and lookup()).
- Repartitioning (repartition()) is an expensive task because it moves the data around, but you can use coalesce() instead only of you are decreasing the number of partitions.
- If a collection is used once there is no point in repartitioning it, but repartitioning is useful only if it is used multiple times in key-oriented operations.
Action/Transformation Selection and Optimization
- Minimize shuffles on join() by either broadcasting the smaller collection or by hash partitioning both RDDs by keys.
- Use narrow transformations instead of the wide ones as much as possible. In narrow transformations (e.g., map()and filter()), the data required to be processed resides on one partition, whereas in wide transformation (e.g, groupByKey(), reduceByKey(), and join()), the data to be processed resides on multiple partitions and so needs to be shuffled.
- Avoid using GroupByKey() for associative reductive operations. Always use the ReduceByKey() instead. With the ReduceByKey, Spark combines output with common keys on each partition before shuffling the data.
- To join two grouped datasets and keep them grouped, use cogroup() rather than the flatMap-join-groupBy pattern.
- To repartition the data and have you records sorted in each partition, use repartitionAndSortWithinPartitions() instead of calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.
Caching and Persisting
- Persist or cache collections that will be used more than once.
- Persist collections after re-partitioning.
- If you need to cache large amounts of processed data, change Spark’s default cache operation:
- By default, Spark caches data in MEMORY_ONLY, so if there is not enough memory, Spark will just drop the old RDD partitions and recalculate them if they are needed. Instead, use MEMORY_AND_DISK storage level which drops the extra partitions into the disk and simply re-read them again if they needed.
- Cache serialized objects instead of the raw ones (via MEMORY_ONLY_SER or MEMORY_AND_DISK_SER). Even though it slows down the caching process because of the serialization cost, it substantially reduces the time spent on garbage collection.
- OutofMemoryError due to operations: Increase parallelism so each task input is smaller. Since Spark reuses executor JVM for many tasks, increase parallelism to more than the number of cores.
- Spark schedules tasks at best locality possible. If there is data waiting to be processed, Spark will wait for a local CPU to free up. If that takes too long, then Spark will move the data to a free CPU far away. You can configure how long spark waits for a CPU to free up if your tasks are long and you have poor locality.
We are pleased to be able to share our work and are excited by the possibilities. We would love your feedback and to connect with others interested in this work, please get in touch. We are also looking for some other amazing Scientists to join our team so please check out our open roles.
Taraneh Khazaei is a Scientist at Zero Gravity Labs and Danny Luo is a co-op student who co-authored this amazing post.
For all Zero Gravity Labs blog content follow us on Medium