Open Source

Project Magnet, providing push-based shuffle, now available in Apache Spark 3.2

Co-authors: Venkata Krishnan Sowrirajan and Min Shen

We are excited to announce that push-based shuffle (codenamed Project Magnet) is now available in Apache Spark as part of the 3.2 release. Since the SPIP vote on Project Magnet passed in September 2020, there has been a lot of interest in getting it into Apache Spark. As of March 2021, 100% of LinkedIn’s Spark workloads’ shuffle data is served through push-based shuffle. As we productionized push-based shuffle at LinkedIn, we also actively engaged with the community to contribute it back to Apache Spark. Push-based shuffle involved 30+ commits adding roughly 13k+ lines of code reviewed by 21 contributors and/or committers from 6 different companies.

In this blog post, we capture the journey of Project Magnet from its inception to contributing it back to Apache Spark. 

What is push-based shuffle?

Push-based shuffle is an implementation of shuffle where the shuffle blocks are pushed to the remote shuffle services from the mapper tasks in order to address shuffle scalability and reliability issues. In a nutshell, with push-based shuffle, a large number of small, random reads is converted into a small number of large, sequential reads, which significantly improves disk I/O efficiency and shuffle data locality.

This is explained in greater detail in an earlier blog post, Magnet: A scalable and performant shuffle architecture for Apache Spark, which you can read for more information about how we achieve push-based shuffle.

Why is push-based shuffle needed?

The vanilla Spark shuffle mechanism is good at keeping a balance between both performance and fault tolerance requirements. However, when operating it at LinkedIn’s scale, we have faced different kinds of challenges:

  1. We’ve seen shuffle service availability issues during peak hours, given the sheer number of nodes on the cluster and the scale of the shuffle workload.

  2. At LinkedIn, we use hard disk drives (HDD) to store shuffle data. With the clusters being multi-tenant, running many Spark applications simultaneously, the shuffle services access the data in random order to serve requests, which severely impacts disk throughput, increasing shuffle wait times.

  3. Since the shuffle services are shared across multiple applications, one poorly tuned application can stress the shuffle services significantly, causing performance degradation to other applications as well.

Although at LinkedIn we run huge clusters with HDDs running massive numbers of Spark applications daily, push-based shuffle would also improve shuffle performance on deployments of smaller clusters with SSDs as well. This is validated by the benchmarks referred to in this post (see section “HDD vs NVMe”). This improvement is mostly due to improved shuffle data locality reducing the load on the shuffle services.

Project Magnet metamorphosis

From 2017 to the present, Spark usage within LinkedIn has grown 3x year-over-year. With the rapid growth of Spark usage, we started seeing scaling issues, specifically on the shuffle layer. By the end of 2019, we started seeing that, on average, more than 50% of stage failures were Spark shuffle fetch failures, which caused expensive stage retries leading to workflow SLA violations.

In order to address these issues, the Spark team at LinkedIn started looking into various possible solutions and also prototyped an implementation of a push-based shuffle. Our open source software philosophy is to be close to open source releases of Apache Spark as much as possible to avoid large divergences from the upstream codebase. This helps us to provide Spark to our internal users with the latest features and performance enhancements. Given the results we saw with push-based shuffle internally, as well as its broader potential applicability, we submitted a SPIP proposal for Project Magnet to contribute back to the community. This was well-received by the Apache Spark community, who supported us in the process. The community also provided guidance in making push-based shuffle work better by handling application retries, handling indeterminate stage retries, and also enabling the seamless interplay with Adaptive Query Execution (AQE).

Earlier this year, we rolled out push-based shuffle to 100% of Spark workloads on all of our clusters. Currently, push-based shuffle is handling around 15-18PB of daily shuffle data. Some of the high-level improvements in performance metrics following the push-based shuffle roll out are:

  1. Overall reduction of 16% in compute resource consumption

  2. 45% of workflows saw at least 10% reduction in job runtime

  3. 30x increase in overall shuffle data locality ratio

These performance numbers were calculated by comparing workflows (that shuffle roughly similar amounts of data) with both push-based shuffle enabled and disabled over a period of time, in order to account for push-based shuffle specific improvements.

To sum it up, push-based shuffle results in both job runtime reduction and lesser compute resource consumption. These improvements are explained in greater detail in this article: Bringing Next-Gen Shuffle Architecture To Data Infrastructure at LinkedIn Scale.

Try Push-based shuffle yourself

Now that Apache Spark 3.2.0 is released, you can also try out push-based shuffle yourself. Here we provide some basic instructions of how to enable it and a simple example to see it work in action.

Shuffle server configuration
Currently, push-based shuffle is only supported with YARN as the cluster manager. To enable Spark applications on the cluster with push-based shuffle, include the spark-yarn-shuffle-3.2.0.jar in the NodeManager classpath and set the configuration spark.shuffle.push.server.mergedShuffleFileManagerImpl to org.apache.spark.network.shuffle.RemoteBlockPushResolver and restart all the NodeManagers.

Spark configuration
In order to enable Spark applications with push-based shuffle, set this configuration spark.shuffle.push.enabled to true.

To further tune push-based shuffle on both the client and server side, there are additional configurations that are documented here.

A simple example
Below is a one-line code snippet shuffling 1TB of data to see the speed up with push-based shuffle.

sc.parallelize(1 to 8000).repartition(100).flatMap( _ => (1 to 15000000).iterator.map(num => num)).repartition(500).count

Note:

  1. If you are testing in a dedicated cluster with large memory nodes, the speed up might be less due to caching of shuffle data by the operating system.

  2. If you are testing in a small cluster with only a few nodes, then you need to set the following configs:

  • spark.shuffle.push.mergersMinThresholdRatio = 0.01

  • spark.shuffle.push.mergersMinStaticThreshold = 1

Future work

In terms of future enhancements, we are already working on contributing some of the additional performance enhancements we have implemented internally. Some of them are listed below:

  1. Adaptive shuffle merge finalization - SPARK-33701

  2. Adaptive fetch of shuffle merger locations - SPARK-34826

  3. Improve locality for push-based shuffle, especially for join-like operations - SPARK-33574

  4. Push-based shuffle metrics - SPARK-33573 and SPARK-36620

  5. Support YARN NodeManager work-preserving restart feature with push-based shuffle - SPARK-33236

Specifically, once SPARK-33701 and SPARK-34826 are fixed, the need to tweak push-based shuffle configs based on cluster size and workload nature would be eliminated.

Also, we are considering supporting push-based shuffle in other deployment modes, like standalone and Kubernetes. Currently, we are operating our clusters on-prem, which have collocated storage and compute. However, LinkedIn is migrating to Azure; therefore, we are evaluating ways to use push-based shuffle in the context of compute/storage disaggregated clusters. In the current implementation, push-based shuffle is well suited for batch computations, but it can be further adapted to continuous streaming workloads as well.

Feedback welcome

With the Apache Spark 3.2 release, we are really excited to share the benefits of push-based shuffle with the community. Please give it a try and let us know your feedback! We look forward to ideas as well as contributions that you might have to leverage push-based shuffle to unlock other avenues.

Acknowledgments

We would not be able to make it this far without the tremendous support offered by the community to contribute push-based shuffle back to Apache Spark. Special shout-out to Mridul Muralidharan for shepherding the OSS contribution, along with other committers like Yi Wu, Thomas Graves, Attila Zsolt Piros, Dongjoon Hyun, and Gengliang Wang with their thorough and extensive code reviews. We would also like to thank Sunitha Beeram, Yuval Degani, Min Shen, Erik Krogen, and Ye Zhou for reviewing this article and the LinkedIn Spark team for testing the Apache Spark 3.2 release candidate with push-based shuffle extensively before its release.