Dynamometer: Scale Testing HDFS on Minimal Hardware with Maximum Fidelity

Co-authors: Erik Krogen and Min Shen

dynamometer-2

In March 2015, LinkedIn’s Big Data Platform team experienced a crisis. As the team was preparing to head home for the day, signs of trouble began trickling in: our internal users were reporting that their applications were stalling or timing out. Job queues were backing up, and SLAs would be missed. A bit of investigation indicated that operations performed against our primary HDFS cluster were taking up to two orders of magnitude longer than normal, or timing out completely. This came soon after expanding the cluster by 500 machines, specifically to help provide more capacity to our internal users. Inadvertently, rather than providing an improved user experience as intended, we made our service nearly unusable. By the time the issue was detected, it was non-trivial to remove the new machines, as they had already accumulated a significant amount of data that would need to be copied off before they could be removed from service. Instead, we worked quickly to solve the specific issue causing the performance regression (described in further detail below), and subsequently began to make plans for how to protect ourselves from similar mishaps in the future.

We realized that sometimes applying changes from the Apache Hadoop open source community, even from official releases, can be risky because performance and scale testing are not part of the standard release process. While this is surprising given that scalability is a core tenet of the design of HDFS, it starts to make sense when you consider the following factors:

  • Scale testing is expensive—the only way to ensure that something will run on a multi-thousand node cluster is to run it on a cluster with thousands of nodes.

  • HDFS is maintained by a distributed developer community, and many developers do not have access to large clusters.

  • Developers are aware that Apache community releases (as opposed to distributions like CDH and HDP) are not typically run in production.

Even beyond potential performance regressions in Hadoop code, simple configuration changes in our environment can sometimes have unforeseen consequences, which blow out of proportion at our scale. We recognized the need to do performance testing in our own environment, but faced the difficulty that many issues are impossible to detect without running a cluster that is similar in size to what we use in production. Unfortunately, clusters of that scale are expensive, and we don’t have a spare one lying around that we can use for testing purposes; even if we did, generating sufficient and realistic load would also present a challenge. Many organizations deploying new software at scale go through a process in which changes are tested on successively larger installations, but this requires a lot of hardware, and users on the clusters become unwitting test subjects. We thought that there must be a better way, and to this end, we designed and built Dynamometer, a framework that allows us to realistically emulate the performance characteristics of an HDFS cluster with thousands of nodes using less than 5% of the hardware needed in production. The name is a reference to a chassis dynamometer, a tool used for performance measurement and testing of vehicles in which a moving road is simulated using a fixed roller.

In the interest of making Hadoop as robust and scalable as possible for everyone, we are open sourcing Dynamometer today. Feedback and contributions are greatly appreciated! Please continue reading to learn more about the story of its conception and implementation.

Background

LinkedIn is a data-driven company with over 530 million members. We have both an enormous amount of data to store, and thousands of data scientists, engineers, and business analysts who need to access and analyze this data. Our primary data analytics needs are met by the Apache Hadoop ecosystem (including related technologies, such as Apache Hive and Apache Spark), which is used for a variety of mission-critical tasks that range from reporting to feature experimentation to machine learning. LinkedIn’s Hadoop users are constantly developing new applications and workflows, meaning we have a consistent need to expand our clusters. Over the span of only a few years, the size of LinkedIn’s primary Hadoop clusters grew from hundreds to several thousands of nodes, and we now run hundreds of thousands of applications per day. Even at our current scale, we still see compute and storage capacity requirements roughly doubling each year. This means that we have to constantly look for ways to ensure that Hadoop will scale with us, and ensure that changes we make will not have a negative impact on the performance or scalability of our clusters, as they did in the incident described above.

Identifying HDFS scalability bottlenecks

To gain a proper understanding of how to evaluate the scalability of HDFS under varying conditions, we have to first identify the limiting factor of its performance. HDFS is a large and complex system; isolating a single limiting factor allows us to concentrate our efforts in a much more efficient manner than blindly testing the system as a whole.

dynamometer-3

Some examples of HDFS interactions. The NameNode is a centralized bottleneck for all requests requiring metadata, including reading/writing a file, renaming a file, and listing a directory. Note that most client processes are colocated on the same machines as the DataNode processes.

The architecture of HDFS is such that as a cluster grows, the NameNode is the primary bottleneck to scaling, due to a number of factors:

  • While data is distributed across all DataNodes, all metadata is tracked by a single NameNode. This process is additionally responsible for managing all DataNodes and facilitating all client interactions (excluding the transfer of actual data bytes).

  • As the cluster grows, there is more metadata to keep track of, such as files, directories, and information about each DataNode.

  • The way Hadoop colocates compute and storage resources means that each DataNode server is also used for executing users’ logic. As more DataNodes are added, the cluster’s compute capacity also expands, and our busy users happily consume these extra resources; thus, as the cluster grows, there are more requests submitted to the NameNode.

  • The load on a DataNode is mostly affected by the data blocks it stores locally, and we can always add more nodes to spread the load further, so they typically do not block the cluster’s performance.

As a result, our efforts to ensure that performance and scalability remain high are centered around the NameNode.

The NameNode performance crisis

dynamometer-4

Larger clusters submit more operations, and in the presence of the discussed performance regression, larger clusters also require a longer time to complete each operation. This combination results in a superlinear performance impact.

In the process of investigating our original incident, we found that the performance regression was brought on by a seemingly innocuous change that had slipped into several upstream releases. The goal of the original change (discovered and fixed a few months later) was to fix a bug in how load was calculated when choosing DataNodes to replicate blocks onto. It seemed fairly benign, with only about two dozen lines of non-test code changes, but it turned out that it also significantly increased the time it took the NameNode to add a new block to a file. Increasing the duration of this operation is particularly bad because every access to the file namespace occurs under a single read-write lock, so an increase in block addition time is an increase in the time that all other operations are blocked. The increase in block allocation time is proportional to the size of the cluster, so its impact becomes more severe as clusters are expanded. Larger clusters also have a more intense workload and thus more block allocations, further amplifying the effects of this regression. As shown above, the compounding effects of increased operation rate and increased operation runtime mean that adding nodes can have a superlinear effect on overall NameNode load; similarly, some operations’ runtimes are affected by the number of blocks or files in the system, making aggregate scaling effects difficult to predict in advance. This was a prime example of a scaling issue that was difficult for us to predict or to notice on our smaller testing clusters, but which we should have been able to catch prior to deployment.

The requirements for our solution

As discussed above, we reduced our HDFS scaling problem to a problem of NameNode scalability. We identified three key factors which affect its performance:

  • Number of DataNodes in the cluster;

  • Number and structure of objects managed (files/directories and blocks);

  • Client workload: request volume and the characteristics of those requests.

We needed a solution which would allow us to control all three parameters to match our real systems. To address the first point, we developed a way to run multiple DataNode processes per physical machine, and to easily adjust both how many machines are used and how many DataNode processes are run on each machine. The second point, in isolation, would be fairly trivial: start a NameNode and fill it with objects that do not contain any data. However, achieving a similar client workload significantly complicates this step, as explained below.

The last point is particularly tricky. The nature of requests can have huge performance implications on the system. Write requests are obviously more expensive than read requests; however, even within a single request type, there can be significant performance variations. For example, performing a listing operation against a very large directory can be thousands of times more expensive than performing a listing operation against a single-item directory, and this has significant implications for garbage collection efficiency and optimal tuning. To capture these effects, we set out with a requirement that our testing should be able to simulate exactly the same workload that our production clusters experience. This dictates that we not only execute the same commands, but that those commands are executed against the same namespace, hence the trivial solution to our second point is not sufficient.

Building off of these factors, and adding in a few additional requirements, we came up with the following list of goals:

  • The simulated HDFS cluster should have a configurable number of DataNodes and a file namespace which is identical to our production cluster.

  • We should be able to replay the same workload that our production cluster experienced against this simulated cluster. To plan for even larger configurations, we should be able to induce heavier workloads, for example by playing back a production workload at an increased rate.

  • It should be easy to operate. Ideally, every prospective change would be run through Dynamometer to validate if it improves or degrades the performance of the NameNode.

  • The coupling between Dynamometer and the implementation of HDFS should be loose enough that we can easily test multiple versions of HDFS using a single version of Dynamometer and a single underlying host cluster.

The architecture

To meet the aforementioned requirements, we implemented Dynamometer as an application on top of YARN, the cluster scheduler in Hadoop. We rely on YARN heavily at LinkedIn for other Hadoop-based processing, so this was a natural choice that allowed us to leverage our existing infrastructure. YARN allows us to easily scale Dynamometer by adjusting the amount of resources requested, and helps to decouple the simulated HDFS cluster from the underlying host cluster.

There are three main components to the Dynamometer setup:

  1. Infrastructure is the simulated HDFS cluster.

  2. Workload simulates HDFS clients to generate load on the simulated NameNode.

  3. The driver coordinates the two other components.

The logic encapsulated in the driver enables a user to perform a full test execution of Dynamometer with a single command, making it possible to do things like sweeping over different parameters to find optimal configurations.

The infrastructure application is written as a native YARN application in which a single NameNode and numerous DataNodes are launched and wired together to create a fully simulated HDFS cluster. To meet our requirements, we need a cluster which contains, from the NameNode’s perspective, the same information as our production cluster. To achieve this, we first collect the FsImage file (containing all file system metadata) from a production NameNode and place this onto the host HDFS cluster; our simulated NameNode can use it as-is. To avoid having to copy an entire cluster’s worth of blocks, we leverage the fact that the actual data stored in blocks is irrelevant to the NameNode, which is only aware of the block metadata. We first parse the FsImage using a modified version of Hadoop’s Offline Image Viewer and extract the metadata for each block, then partition this information onto the nodes which will run the simulated DataNodes. We use SimulatedFSDataset to bypass the DataNode storage layer and store only the block metadata, loaded from the information extracted in the previous step. This scheme allows us to pack many simulated DataNodes onto each physical node, as the size of the metadata is many orders of magnitude smaller than the data itself.

dynamometer-5

The driver launches two applications onto the host YARN cluster. Simulated clients and DataNodes are spread across the cluster and may be colocated.

To make a stress testing job that matches our requirement to replay the same workload as was experienced in production, we must first have a way to collect the information about the production workload. The NameNode has facilities to record every request which it services; the resulting command log is known as the audit log. A heavily-loaded NameNode services tens of thousands of operations per second; to induce such a load, we need numerous clients to submit requests. In an effort to ensure that each request has the same effect and performance implications as its original submission, we want to ensure that related requests (for example, a directory creation followed by a listing of that directory) are performed in such a way as to preserve their original ordering. To achieve this while distributing the replay across multiple nodes, we partition the audit log based on source IP address, with the assumption that requests which originated from the same host have more tightly coupled causal relationships than those which originated from different hosts. In the interest of simplicity, the stress testing job is written as a map-only MapReduce job, in which each mapper consumes a partitioned audit log file and replays the commands contained within against the simulated NameNode. During execution we collect statistics about the replay, such as latency for different types of requests.

Dynamometer in practice

LinkedIn is all about data-driven decision making, so it is important to us to have the metrics to back up key changes we make to our Hadoop clusters. Dynamometer has become a standard tool to evaluate new features, giving us hard data about performance impact under production scale and real workloads. We plan to integrate Dynamometer into our testing pipeline, allowing us to quickly catch performance regressions as they are introduced into the codebase. In addition to using Dynamometer to estimate the limits of NameNode performance, we have used Dynamometer to develop actionable plans in a number of areas; we will discuss a few of them here.

When upgrading our HDFS clusters from Hadoop 2.3 to 2.6, we first used Dynamometer to predict how the new version of NameNode would react to our workload. We found that a change to the storage format of the FsImage file introduced by HDFS-5698 resulted in a large increase in the memory footprint of the NameNode. Being aware of this in advance allowed us to properly adjust our JVM heap size and GC tuning parameters, potentially avoiding a disaster upon upgrading.

The entire file namespace stored by the NameNode is protected by a single read-write lock, the FSNamesystemLock. This can be a bottleneck for the request volume the NameNode can handle, as all write operations become serialized, and even read operations may become blocked, since the lock is placed into fair mode by default (opposite of the Java default). This means that approximate arrival order is used for scheduling which threads can acquire the lock, as opposed to non-fair mode, which prioritizes readers to grab the lock in an effort to increase concurrency. The increased concurrency on non-fair mode comes at the cost of possible starvation of writers, which can become blocked by a stream of readers.

The impact of the difference between fairness modes is dependent on the ordering of requests—for example, a burst of only reads followed by a burst of only writes would be unaffected by the locking mode, whereas the lock mode can have a strong influence on a workload which has closely interleaved operation types. This workload-dependent performance impact makes the audit trace replay capabilities of Dynamometer highly appropriate for this analysis. Using Dynamometer, we were able to verify that our write latencies would not be negatively affected by non-fair locking, and would actually improve due to an overall increase in the throughput of the NameNode. Below you can see a graph of the average time a request spent in the NameNode’s queue waiting to be serviced before and after we deployed unfair locking on a production cluster, as well as a prediction of the same using Dynamometer:

dynamometer-6

The average request wait time shown as a function of load, as predicted on Dynamometer and observed in production.

We have also experimented with using the G1 garbage collector (G1GC) for our NameNode instead of the Concurrent Mark Sweep collector. While we can gather an idea of which tuning parameters to employ from an understanding of how the NameNode uses its heap memory, ultimately achieving a finely-tuned system requires a number of iterations of changing garbage collection tuning parameters and seeing how the system reacts. Dynamometer enabled us to become aware of a number of unexpected issues that, without tuning, would have resulted in disastrous consequences upon enabling G1GC on a production NameNode. It also enabled us to very finely tune each parameter because of the ease with which we were able to run tests with varying configuration values.

Finally, our team recently pushed out the 2.7.4 release of Hadoop. It is imperative that performance remains stable across maintenance releases, so we had to build a high level of confidence in the quality of the release. To this end, we used Dynamometer to verify a lack of noticeable performance regressions to the NameNode, even at our scale.

The similarities between a production environment and that of Dynamometer allow for accurate testing, and combined with the ease of running Dynamometer with various configurations and binaries, provide us with a very powerful experimentation methodology.

Related work

A number of tools for measuring HDFS and NameNode performance have been developed in the past. Though all are useful, we found that they did not quite satisfy our requirements.

  • NNThroughputBenchmark is a utility for measuring NameNode performance on a single node; however, it does away with a few aspects that we find to be important, such as DataNodes and the RPC handling layer. It is meant to “reveal the upper bound of pure NameNode performance” rather than to set realistic real-world expectations.

  • TestDFSIO is a useful distributed benchmark but focuses on overall data I/O, utilizing DataNodes. We wanted to focus on NameNode performance specifically, since we found that the metadata operation throughput is our scale-limiting factor.

  • Synthetic Load Generator is a useful workload generator similar in form to that which we built for Dynamometer, but it generates its own synthetic workload based on input parameters that decide, for example, the percentage of read requests and write requests. One of our requirements was to have an identically matched workload, so this did not quite match our use case.

  • S-Live is similar to TestDFSIO and the Synthetic Load Generator. It tests an entire cluster rather than focusing on the NameNode, and also works off a distribution of operations to perform rather than attempting to match a production workload.

Acknowledgements

Dynamometer is the result of the combined efforts of many people. We would like to thank Carl Steinbach for originally proposing this project along with several elements of the design; Adam Whitlock, Subbu Subramaniam, and Mark Wagner, who worked on the initial proof-of-concept; Vinitha Gankidi, who converted Dynamometer into a YARN application; and Zhe Zhang, who helped us make the final push to get the system to where it is today. Lastly, big shout outs go to our team manager Suja Viswesan and to the members of LinkedIn’s Grid SRE team, who have supported our efforts throughout.