Open Source

Scaling LinkedIn's Hadoop YARN cluster beyond 10,000 nodes

Co-authors: Keqiu Hu, Jonathan Hung, Haibo Chen, and Sriram Rao

At LinkedIn, we use Hadoop as our backbone for big data analytics and machine learning. With an exponentially growing data volume, and the company heavily investing in machine learning and data science, we have been doubling our cluster size year over year to match the compute workload growth. Our largest cluster now has ~10,000 nodes, one of the largest (if not the largest) Hadoop clusters on the planet. Scaling Hadoop YARN has emerged as one of the most challenging tasks for our infrastructure over the years.

In this blog post, we will first discuss the YARN cluster slowdowns we observed as we approached 10,000 nodes and the fixes we developed for these slowdowns. Then, we will share the ways we proactively monitored for future performance degradations, including a now open-sourced tool we wrote called DynoYARN, which reliably forecasts performance for YARN clusters of arbitrary size. Finally, we will describe Robin, an in-house service which enables us to horizontally scale our clusters beyond 10,000 nodes.

When it becomes an issue

Compared with Hadoop Distributed File System’s (HDFS) NameNode, where all file system metadata is stored in a single machine, the YARN resource manager is quite lightweight, maintaining a small amount of metadata. As a result, we ran into issues with scalability for HDFS earlier than we did with YARN, and we started investing in scaling HDFS in 2016. By comparison, YARN, the compute component in Hadoop, has been a very stable and peaceful piece in our infrastructure portfolio for a very long time.

Our clusters grow 2x year over year, and we knew that one day YARN scalability would become an issue, since the resource manager’s single threaded scheduling mechanism cannot sustain indefinite growth. Nonetheless, we never really invested in understanding the scaling limit of YARN and assumed it might just work until the next technology surfaced; this approach worked until early 2019.

linkedin-cluster-trends-for-hdfs-space-used-total-name-node-objects-and-yarn-compute-capacity

LinkedIn cluster trends for HDFS space used, total NameNode objects, and YARN compute capacity

Historically, we built two Hadoop clusters in one of our data centers: the primary cluster served main traffic and was bound by both storage and compute, and a secondary cluster, which was built for data obfuscation, was primarily storage bound with idle compute resources. In order to improve resource utilization, we merged the compute nodes from the secondary Hadoop cluster into our primary Hadoop cluster as a separate partition.

Unfortunately, things started falling apart about two months later.

Observations

After the merge of the compute nodes, the cluster had two partitions with ~4,000 and ~2,000 nodes (let’s call them “primary” and “secondary”). Soon, Hadoop users were experiencing hours-long delays before their job submissions got scheduled; however, there were abundant resources available in the cluster.

When looking for the cause of the delay, we initially thought that the logic to handle software partitioning in Hadoop YARN was buggy, but we did not find any issues in that piece of code after debugging and investigation. We also suspected that increasing the size of the cluster by 50% overloaded the resource manager, and that the scheduler was not able to keep up. 

We took a closer look at the queue’s AggregatedContainerAllocation, which indicates the container allocation speed. Pre-merge, the average throughputs were 500 containers per second for the primary cluster, and 250 containers per second for the secondary cluster; post-merge, the aggregated average allocation speed was around 600 containers per second, but allocation speed also often dropped as low as 50 containers per second for extended periods of time (multiple hours).

We did a few rounds of profiling and found that some costly operations, like DNS, were behind @synchronized annotations, which limited parallelism. After moving those operations out of the synchronization block, we observed about a 10% improvement in throughput, but the delay was still significant for the users.

Mitigate by redefining fairness

After parsing the audit logs from the resource manager, we noticed that the scheduler was often trapped scheduling containers in a single queue for long periods of time before switching to other queues. Even in periods of reasonable performance (600 containers per second), users in some queues experienced hours of delays, while users in other queues experienced almost no delays. Container allocation speed was normal for some queues, but had dropped to almost zero for others. This observation led us to revisit how the scheduler decides which queue to prioritize for scheduling containers. At LinkedIn, we use the Capacity Scheduler, which sorts the queues based on utilization and allocates containers to queues with lowest utilization first.

Assuming we have two queues A and B, if A is 10% utilized and B is 20% utilized, the scheduler will schedule containers for queue A first before moving to serve B. This works in most cases; however, there could be a temporary deadlock in an environment with high container churn. Let’s say most of the running jobs in queue B are relatively long jobs and the jobs running in queue A are very short-lived ones. Since A is only 10% utilized, containers will be scheduled in queue A over queue B. Since container churn is much higher in queue A than in queue B, by the time the scheduler finishes one iteration of scheduling workload in queue A, queue A’s utilization could stay the same or even drop and would still be much lower than queue B, say 9.5%, while queue B’s utilization dropped slightly to 19%. Workloads submitted to queue B won’t be picked up by the scheduler until the queue utilizations converge and queue A’s utilization exceeds queue B’s, but this might take hours due to different characteristics of queue workloads. From an observer's point of view, the scheduler appears to be stuck scheduling workloads in queue A, while workloads in queue B are starved of resources.

We asked ourselves why this only became an issue after merging the two clusters and realized the workload in the primary partition’s queues mostly consisted of AI experiments and data analyses, which were implemented as longer-running Spark jobs, while the workload in the secondary partition’s queues were mostly fast churning MapReduce jobs. If resource managers could schedule containers arbitrarily fast, this wouldn’t be an issue; however, since the cluster merge reduced scheduling speed so significantly, an allocation fairness problem surfaced.

The mitigation we came up with was to pick queues with equal probability when the scheduler allocates containers; in other words, we choose the queue randomly rather than based on utilization. Voila! Our problem was temporarily mitigated. We later contributed the patch to Apache Hadoop.

Root cause of the inefficiency

Despite the queue fairness mitigation, the root cause of scheduling slowness was still not solved. We knew there was still an imminent scaling problem in our YARN cluster on the horizon. We had to dig deeper!

Looking back at the total scheduling throughput before and after the merge, in the best case we were at 80% performance parity (~600 containers per second vs. 750 containers per second), and in the worst case, we were at 7% performance parity (~50 containers per second vs. 750 containers per second). This gap intuitively led us to revisit the scheduling logic for partitions, where we found an irregularity that caught our attention. 

By default, YARN resource manager uses synchronous scheduling, i.e., nodes heartbeat to the resource manager, which triggers the scheduler to schedule outstanding containers on that node. Outstanding containers are submitted to either the primary or secondary partition, and if the container’s partition and the node’s partition do not match, the container will not be allocated on that node.

When scheduling applications in a queue, the scheduler iterates through them in First In, First Out (FIFO) order. Now suppose a node in the primary partition heartbeats to the resource manager; the scheduler picks queue A to schedule in, and the first 100 outstanding applications in queue A are requesting resources from the secondary partition. We found an inefficiency where the scheduler will still try to match containers in these applications to that node, even though the match will always fail. Since both partitions handled a non-trivial amount of workload, this generated a huge amount of overhead on each heartbeat, leading to the slowdown.

To solve this challenge, we optimized the logic so that if a node from the primary (or secondary) partition heartbeats to the resource manager, the scheduler only considers applications submitted to the primary (or secondary) partition when scheduling. After the change, we observed parity in total average throughput before and after the merge, and a 9x improvement in the worst case scenario when both partitions are churning busy! We also contributed this optimization upstream.

What gets measured gets fixed

In order to approach the YARN scalability challenge, we followed the wisdom from our previous engineering lead David Henke, “what gets measured gets fixed.” The immediate next step was to build metrics and tooling to measure and monitor scalability.

Before our clusters hit the size they are today, any slowness experienced by users could be explained by lack of resources in a user’s queue—a logistics problem that only affected the team running in that queue. To find the root cause of any slowness, we could simply find which workloads were consuming a disproportionate amount of resources in that queue, and ask them to tune their workflow.

However, our clusters eventually reached a size where resource manager level scalability issues triggered user slowness. Thus, we needed (1) a way to measure and respond to resource manager slowness, and (2) a tool to forecast future resource manager performance as we continue to scale up cluster size and workload.

Setting up scalability metrics and alerting

We were able to leverage existing resource manager metrics for measuring performance issues. The most relevant ones were:

1) Apps pending:

chart-showing-apps-pending

2) Container allocation throughput (“AggregateContainersAllocated”):

container-allocation-throughput

3) NodeManager heartbeat processing rate:

node-manager-heartbeat-processing-rate
node-manager-heartbeat-processing-rate

The apps pending metric gives us an overall idea of what kind of performance the user is seeing. Having many apps pending means queues are backed up, and many users’ apps are not yet running.

On the resource manager side, the container allocation throughput metric tells us if the resource manager is not able to schedule containers fast enough; a low throughput sustained over a long period of time (e.g., 30 minutes) indicates that something is likely wrong. However, a low container allocation throughput alone does not indicate resource manager performance problems. For example, if the cluster is being fully utilized and there is low container churn, we may see low throughput, but because of a lack of cluster resources.

In the capacity scheduler, we use the default setting of scheduling containers synchronously, i.e., on the node manager heartbeat. The node manager heartbeat processing rate metric tells us if there’s any slowness in this critical code path. For example, in one incident we noticed the resource manager was spending a lot more CPU cycles after a feature rollout. Using this metric helped us determine that the feature made changes to node heartbeat processing logic, which wastefully serialized and deserialized node manager heartbeat container status objects sent over the wire, leading to excess CPU cycles. After optimizing this code path, node update throughput increased dramatically, and resource manager CPU usage returned to its previous levels.

DynoYARN for projecting YARN scalability

Another important gap in assessing YARN scalability was being able to forecast future resource manager performance. While we knew from historical capacity analysis that our workload and cluster size grew 2x year over year, we didn’t know how the resource manager would respond to such growth, nor at what point resource manager performance would no longer be able to sustain these increases in workload. Similar to Dynamometer, the scale testing tool we wrote to evaluate future HDFS NameNode performance, we wrote DynoYARN, a tool to spin up simulated YARN clusters of arbitrary size and then replay arbitrary workloads on those clusters.

DynoYARN consists of two components: a “driver” for spinning up a simulated YARN cluster, and a “workload” for replaying a simulated workload on this cluster. Both are implemented as YARN applications; we are essentially running a YARN cluster within a YARN cluster, but with much lower resource constraints. For example, to spin up a 1,200 node simulated cluster, the driver application will allocate a container to run the simulated resource manager, and containers to run the simulated node managers. Each of the latter containers can run multiple simulated node managers; in our setup, we can run 30 simulated node managers within a single 50GB container. Therefore, on 256GB hosts, we can run 5 containers with 30 simulated node managers each, or 150 simulated node managers on each real 256GB host. Thus, the 1,200 node simulated cluster only requires 1200/150 = 8 real hosts.

For evaluating resource manager performance, the workload application parses audit logs from our production cluster, and feeds them into the driver application’s simulated cluster. Production workloads are faithfully replayed on the simulated resource manager. From the audit logs, we extract the number of containers each application requested, the memory/vcore requirements of each container, what time the application was submitted, and other metadata such as what user submitted the application (to emulate user limit constraints) and what queue the application was submitted to. The result is a simulation that is very close to the performance that we saw in production, since the workload is almost exactly the same.

To project future scalability, we implemented a feature in the workload application that allows us to modify the parsed audit logs to simulate projected workload. For example, one use case we often simulate is to “multiply” the production workload by a fixed number, e.g., 1.5x or 2x. In the 1.5x case, each application would have a 50% chance of being submitted twice; in the 2x case, each application would have a 100% chance of being submitted twice. Using this strategy, we retained the high-level workload patterns from production (e.g., proportion of Spark applications, proportion of long-running vs. short-running applications, etc.) while scaling them up to predict future performance.

By rerunning the simulation on many fine-grained multipliers (e.g., 1.5x, 1.6x, 1.7x, 1.8x, 1.9x, 2x), we could get an accurate trend of how the resource manager’s performance changes as we incrementally scale up our production clusters. Below are the results of these simulations:

graph-of-application-delays
Multiplier Number of Node Managers Applications Per Day p95 application delay (minutes)
1 7152 237472 4.633
1.5 10728 354600 8.769
1.6 11443 377962 10.278
1.7 12158 401440 19.653
1.8 12873 424540 22.815
1.9 13588 441090 43.029

Scalability results

Our target is to keep p95 application delays at 10 minutes or below. Based on our simulations, we found that an 11,000 node cluster can keep application delays roughly within the 10 minute time window (an 11,443 node cluster gives us a 10.278 minute delay, only slightly above our 10 minute target), but a 12,000 node cluster will give us application delays of 19.653 minutes, far beyond our SLA.

Based on this forecast, we extrapolated (based on our 2x year over year growth) when we would hit this milestone, and therefore how much time we had before we would start to experience serious resource manager performance issues due to scaling.

Open sourcing DynoYARN

In addition to determining future scaling performance, at LinkedIn we use DynoYARN to evaluate the impact of large features before pushing them to production, and for ensuring performance parity when upgrading our clusters to later upstream versions. For example, we used DynoYARN to compare resource manager performance when upgrading our Hadoop clusters from Hadoop 2.7 to 2.10. We also used this tool to A/B test the resource manager optimizations discussed earlier. It has been a useful tool for us in determining our YARN performance roadmap, and for confidently rolling out large resource manager features and upgrades. We think the YARN community can also benefit from it, so we are happy to announce that we are open sourcing DynoYARN on GitHub. The repo is available at https://github.com/linkedin/dynoyarn. Comments and contributions are welcome!

Going horizontally scalable with Robin

While we were able to quickly roll out several optimizations to mitigate bottlenecks that we discovered within the resource manager, it became clear that a single YARN cluster would soon no longer be able to sustain LinkedIn’s current compute growth (mainly due to the fundamental design limitation of a single-threaded scheduler). We therefore embarked on a journey for a long-term solution we could rely on for the next few years. 

We first evaluated two potential solutions from the open source Hadoop community, namely, Global Scheduling and YARN Federation

The main goal of Global Scheduling is to address complex resource placement requirements (e.g., anti-affinity) that the default heartbeat-driven scheduler cannot fulfill. It also introduces multiple-threaded scheduling, combined with optimistic concurrency control, to improve overall cluster scheduling throughput. However, we did not observe noticeable improvement over the default single-threaded scheduler in DynoYARN simulations with our production trace (likely due to excessive lock contention between scheduling threads or high rejection rates of container allocations at the final commit step). Given that we were only able to achieve limited (with respect to our growth rate) improvements from doing scheduling optimizations with YARN, we did not pursue this direction further.

On the other hand, YARN Federation, designed specifically to address the scalability limit of a single YARN cluster, seemed to be a much more promising long-term plan for us. It allows applications to span across multiple clusters of tens of thousands of nodes while presenting a view of a single YARN cluster, which is ideal for keeping things totally transparent to YARN applications and our users as we add more clusters to accommodate future compute growth. However, we decided to not use it at LinkedIn for a few reasons.

  • The current implementation of the control plane (Global Policy Generator) is a CLI-based manual process, which cannot handle the level of dynamicness in our Hadoop clusters. The operational complexity can also be significant, if not impossible, to deal with.

  • It brings in new dependencies (MySQL for policy store, Zookeeper for YARN registry) and requires many features such as YARN Reservation, unmanaged AM, and AMRMProxy, that we have never tested or used. These complexities are significant for a team of our size.

Note that most complexities of the design come from allowing YARN applications to span multiple clusters, and if applications can stay within the boundary of a single YARN cluster, we can avoid most of the complexities and build a domain-specific load balancer for YARN, very much like a canonical L7 load balancer.

Robin

We envision our Hadoop cluster to be composed of sub-clusters each with ~5,000 nodes, so all applications can stay within the boundary of one sub-cluster. With this assumption, we can build a cluster orchestrator to coordinate jobs among underlying YARN clusters. Enter Robin: a load balancer to dynamically distribute YARN applications to multiple Hadoop clusters, that we built at LinkedIn to scale our YARN clusters. 

At a high level, Robin provides a simple REST API that returns a YARN cluster for a given job. Before a job submission, YARN clients check with Robin to determine which cluster the job should be routed to, and then send the job to the correct cluster. Once submitted, the job stays within the cluster from start to finish. While applications are only allowed to consume up to the capacity of a single cluster, we have not found this to be a limitation for our workloads.

At LinkedIn, most of the workloads are managed by Azkaban, our workflow orchestration engine that acts as a YARN client on behalf of the end users. It has traditionally supported only a single physical Hadoop cluster; therefore, we had to retrofit Azkaban to support dynamic job submission and added Robin integration, so as to present a view of a single logical cluster to our end users as we scale the logical cluster and add physical clusters underneath it. As a result, most end users stay unaware of Robin (see the architecture diagram below).

robin-architecture

While the overall idea of Robin and its design is straightforward, we had to address a few other issues worth mentioning here.

High availability: Azkaban is the core interface for our Hadoop end users. Every job execution in Azkaban depends on Robin, so it is crucial that Robin always stays up.

  1. Robin periodically checks the resources available at each YARN cluster in the background and makes its routing decision only based on the latest snapshot, so jobs can be routed even if the Robin to YARN connection fails intermittently. 

  2. Robin is designed to be stateless, so we can scale up and down by just adding or removing replicas. It is deployed on Kubernetes (K8s) for the self-healing capabilities that it provides.

Load balancing policy: Choosing the right load balancing policy is crucial to keep workloads at each cluster balanced and to reduce resource contention and job delay. We have experimented with a few policies, such as:

  1. Most absolute Dominant Resource Fairness (DRF) resources available, i.e., route a job to the cluster with the most raw resources available. For example, if cluster A has 20 TB out of 100 TB available, and cluster B has 30 TB out of 200 TB available, route the job to cluster B.

  2. Most relative DRF resources available, i.e., route a job to the cluster with the highest percentage of resources available. For example, if cluster A has 20 TB out of 100 TB available (20% headroom), and cluster B has 30 TB out of 200 TB available (15% headroom), route the job to cluster A.

  3. Queue-level absolute resources available, i.e., route a job to the cluster which has the most resources available in the queue that this job will run in. (Our clusters have identical queue structures.)

We simulated each policy in DynoYARN with production workload traces, and found that the most absolute DRF resources available policy minimizes application delays, making it the best policy for us.

Data locality: Today, LinkedIn’s workload job performance heavily relies on data locality. As part of deploying Robin to our biggest production cluster, we’re splitting our existing YARN cluster into two equal-sized YARN sub-clusters, with the same underlying HDFS cluster sharing the same nodes with the two YARN sub-clusters. Because each job now only has  access to 50% of the HDFS data, in comparison to 100% of the data it had access to before the split, there is some loss of data locality. To mitigate this, we had to allocate machines on each rack evenly between the two sub-clusters (a.k.a rack-striping) so that data can still be accessed from the same rack no matter which cluster a job runs on. This has proven to be effective to handle our current traffic at rack and pod network switches.

Next steps

LinkedIn is actively migrating to Azure. As the next step, we are investigating the best way to manage and scale our YARN cluster on the cloud. There are quite a few exciting challenges to uplift the 10,000-plus nodes of our on-prem Hadoop YARN cluster to the cloud, including handling noisy neighbors, disk utilization aware scheduling to decrease costs, and job caching; in the meantime, there are also thrilling opportunities to explore in Azure, like spot instances, auto scaling, etc. We also plan to extend Robin to support routing across on-prem and cloud clusters, and later open source Robin. If you are interested in joining the journey to scale one of the largest Hadoop YARN clusters on the cloud, join us!

Acknowledgements

It takes a village to scale Hadoop YARN to what we have now. In addition to the efforts by the YARN dev team, we also want to call out the support from our partner teams. Special shout out to the folks in the Grid SRE team, who take care of all the operational work and stay late with us whenever there are site up issues: Tu Tran, Amit Balode, Mario Carvalho, Chen Qiang, Ganesh Ananthakrishnan, Savitha Ravikrishnan, Alex Augsburger, Alex Reznik, Aman Goel, Anuj Maurice, Ryosuke Morino, and Bowen Jiao; thanks to the COP team for the collaboration to integrate Robin with Azkaban: Janki Akhani, Deepak Jaiswal, Abhishek Tiwari, and Arvind Pruthi.

Large infrastructure efforts like scaling YARN require significant and sustained commitment from management: Zhe Zhang (alumni), Dhaval Patel, Vasanth Rajamani, Eric Baldeschwieler, Kapil Surlaker, and Igor Perisic, thank you for your unyielding support and guidance. We also received tremendous support from the open source Apache Hadoop YARN community.