The exabyte club: LinkedIn’s journey of scaling the Hadoop Distributed File System

May 27, 2021

Co-authors: Konstantin V. Shvachko, Chen Liang, and Simbarashe Dzinamarira

LinkedIn runs its big data analytics on Hadoop. During the last five years, the analytics infrastructure has experienced tremendous growth, almost doubling every year in data size, compute workloads, and in all other dimensions. It recently reached two important milestones.

  1. LinkedIn now stores 1 exabyte of total data across all Hadoop clusters.

  2. Our largest 10,000-node cluster stores 500 PB of data. It maintains 1 billion objects (directories, files, and blocks) on a single NameNode serving RPCs with an average latency under 10 milliseconds, making it one of the largest (if not the largest) Hadoop cluster in the industry.

From the early days of LinkedIn, Apache Hadoop was the basis of our analytics infrastructure. Many teams assisted in this effort to make Hadoop our canonical big data platform.

This blog post primarily focuses on Hadoop storage—HDFS. It gives an overview of achievements across multiple dimensions that lifted us to the current level of scalability. We talk about performance tuning that helped to scale HDFS namespace services and describe important existing features of HDFS and show their benefits in our environment. We also present engineering challenges of scaling out the satellite cluster intended to maintain a very large namespace of very small files. We explain the design of a new feature, Consistent Reads from Standby, which was developed to accommodate the exponential growth of the namespace workloads. Finally, we review new and existing components of HDFS, such as Selective Wire encryption, Encryption at Rest, and Wormhole that are designed to expand HDFS ecosystem to new use cases and to provide compliance with data protection laws. 

Historical scale

Figure 1 tracks three main metrics and shows the exponential growth of our largest Hadoop cluster.

  • chart-showing-growth-on-linkedins-largest-hadoop-cluster

Figure 1. Exponential growth of data, metadata, and compute on LinkedIn’s largest Hadoop cluster, 2015-2020. In 2020, we reached 500 PB space capacity, 1.6B namespace objects, and 50M GbHr compute.

  1. Total data storage used on the cluster—we ended 2015 with just over 20 PB of data, which has since grown to 500 PB.

  2. The number of namespace objects (directories, files, blocks) was 145 million in 2015. By the end of 2020, the cluster had three namespace volumes with a total of 1.6 billion objects and the largest volume surpassing 1 billion.

  3. GbHr measures the total amount of RAM used by all applications on the cluster over a day and reflects the workload on the cluster. The compute growth substantially accelerated over the last two years due to expansion of machine learning at LinkedIn.

High availability and rolling upgrades

In Hadoop Distributed File System (HDFS), the file system metadata is decoupled from data. The metadata consists of the directory tree, files, and blocks and is maintained in memory of a dedicated server called NameNode. The file data is partitioned into blocks and is replicated across multiple DataNodes for reliability and availability. The NameNode used to be a single point of failure (SPOF) in an HDFS cluster. The High Availability (HA) feature introduced in Hadoop 2 eliminated this limitation. It allows you to run multiple replicated NameNodes. The single Active NameNode receives all clients’ requests and publishes its journal transactions into a Journal Service. Potentially multiple Standby NameNodes consume the transactions and update their namespace state accordingly, which keeps them up to date so that they can take over in case Active NameNode fails.

HDFS provides a variety of choices to set up an HA cluster. We use Quorum Journal Manager (QJM) as the Journal Service and IP failover as the mechanism for clients to determine which NameNode is currently active. QJM consists of several servers—three, in our case—which form a quorum system that reliably stores NameNode transactions and serves them to standby nodes. With IP failover, all clients communicate to the Active NameNode using the same Virtual IP address (VIP) irrespective of which physical host is assigned as active. The VIP failover is handled by Linux system tools, allowing a transition between NameNodes to happen transparently to clients.

In addition to eliminating the SPOF, HA is also crucial for rolling upgrades. Before the introduction of HA, a restart of the NameNode was required for any software or hardware upgrade. This is a disruptive procedure, since the NameNode startup can take up to an hour, especially on a large cluster. During this time, all jobs on the cluster must be suspended. On an HA cluster, upgrades can instead be done in a rolling fashion. First, one of the Standby NameNodes is upgraded with new software and restarted. Then, the Active NameNode fails over to the upgraded standby and is subsequently upgraded and restarted. Then, the DataNodes are restarted with the new software. DataNode restarts can be performed in batches, as long as each batch belongs to the same rack, since HDFS is designed to tolerate a rack failure without loss of data. The process of rolling upgrades is automated with a specialized script running as a Jenkins job.

HA and rolling upgrades allowed us to eliminate disruptive system upgrades, improving cluster availability for the users. 

Java tuning

The ever-growing file system namespace requires periodic increases of Java heap size on the NameNode because it keeps the entire metadata in RAM for low latency access. As of this writing our largest NameNode is set to use a 380 GB heap to maintain 1.1B of namespace objects. Such a large heap requires elaborate tuning in order to provide high performance and avoid long pauses due to full garbage collections.

Java heap generations
Java heap is generally divided into two spaces: Young generation and Tenured (Old) generation. Young generation is used for transient objects, while Tenured generation is for long-lived objects that survive multiple rounds of garbage collection, such as file Inodes and blocks on the NameNode. As the workload on the NameNode increases, it generates more temporary objects in the Young generation space, while the growth of the namespace increases the Tenured space. In practice, the namespace growth correlates with the cluster expansion by adding more worker nodes, which proportionally increases the number of clients and the overall workload on the NameNode. Therefore, it is important to keep an optimal ratio between the two spaces. We currently keep the ratio between the Young and Tenured generations at around 1:4. By keeping Young and Tenured spaces appropriately sized, it is possible to completely avoid full garbage collections, which would result in a many-minutes-long outage due to the huge heap size.

Non-fair locking
NameNode is a highly multithreaded application. It uses a global read-write lock, which protects the entire namespace, to control concurrency. Traditionally, the write lock is exclusive—only one thread can hold it—while the read lock is shared, allowing multiple reader threads to run while holding it. Locks in Java support two modes:

  1. Fair: locks are acquired in FIFO order (Java default)

  2. Non-fair: locks can be acquired out of order

Fair locking tries to schedule lock acquisition by a thread in the order it was requested. So, while a writer thread waits its turn to acquire the write lock it actually blocks all the reader threads that requested the read lock after the writer thread, even though those reader threads could run in parallel with the current reader(s). Thus, non-fair mode allows readers to go ahead of the writers. This may look unfavorable for writers, but in practice it substantially improves the overall NameNode performance, since the workload is substantially skewed towards read requests, which comprise 95% of all namespace operations.

Dynamometer is a standard Hadoop tool open-sourced by LinkedIn to benchmark and stress test HDFS NameNode based on real-world workload traces. We first used Dynamometer to emulate the production workloads with fair and non-fair locking and then confirmed it in production. Figure 2 shows a latency drop in the middle of the graph when non-fair locking was deployed, illustrating that non-fair mode delivers an order of magnitude better performance over the fair mode.

  • graph-showing-decrease-in-latency

Figure 2. Non-fair locking improves NameNode RPC latency by 10x.

Satellite cluster

The small files problem
HDFS is optimized for maintaining large files and provides high throughput for sequential reads and writes that are essential for batch data processing systems. Small files cause problems for most file systems, however, and HDFS is not an exception. The problem is well understood; see, e.g., HDFS Scalability: The limits to growth and the sequel Apache Hadoop: the Scalability Update.

A file is small if its size is less than a block size, which on our clusters is set to 512MB. Small files in HDFS disproportionally inflate metadata compared to the aggregate size of data referenced by it. Since all metadata is stored in NameNode’s RAM, it becomes a scalability limit and a performance bottleneck. Block-to-file ratio reflects how many small versus large files are in the system: if all files are small, then the ratio is 1, while if all files are large, then the ratio is at least 2.

The logging directory
We started the satellite cluster project with the block-to-file ratio at around 1.1, which means that 90% of files in our system were small. Our analysis of file size distribution revealed a system directory /system, primarily composed of very small files with an average size of less than 1MB, representing configurations and logs managed by Hadoop-owned services, such as YARN, Spark history servers, and other system applications. The directory contained half of the files of the entire namespace but used only a small fraction, 0.07%, of the cluster data capacity. Since the system directory is used exclusively by Hadoop internal systems, we could move it into a separate cluster without any user-facing changes. This prompted us to create a new (satellite) cluster with the NameNode of the same size as on the primary cluster, but with 100 times fewer DataNodes.

Bootstrapping the satellite cluster
Bootstrapping required us to move the logging data from the primary to the satellite cluster. While the amount of data (60TB) was relatively small, the number of files (100 million) was big, which presented the first challenge.

Initially, we tried a straightforward approach of copying all 100 million files using DistCp (distributed copy)—a standard Hadoop tool for large inter/intra-cluster parallel data copying. Because we had such a big number of files, the DistCp job got stuck on the initialization stage trying to collect all file paths that it needed to copy. The job produced such an exhaustive load on the NameNode that it became unresponsive and inaccessible for other clients during that time. We estimated that even if we split the copy job into multiple manageable steps, it would still take over 12 hours to complete. By that time, new data would have arrived and some of the copied data would become obsolete. The solution turned out to be non-viable.

We decided to instead build a custom HDFS-compatible file system driver, called FailoverFS, which always writes new logs to the satellite cluster, while presenting the combined read view of both clusters’ files. This allowed new jobs to write their logs to the satellite cluster, and services accessing those logs, such as the Spark history server, could read logs from both clusters. The log retention policy service, which is set to delete files older than 1 week, eventually removed the logs on the primary cluster.

Very large block reports
DataNodes in HDFS send periodic block reports to the NameNode to let it know of all block replicas they own. For each block, the report contains block id, its generation stamp, and the length. The block report is partitioned into chunks corresponding to blocks from the same volume (disk) of the DataNode. The NameNode processes each volume report contiguously, holding a global namespace lock.

The satellite cluster contains only 32 DataNodes, because the amount of data is small. Thus, we have a large number of blocks (roughly equal to the number of files) distributed among a small number of DataNodes, which makes the per-node replica count, and therefore the size of a block report, very big. For comparison, a DataNode on the satellite cluster contains 9 million block replicas, versus 200K replicas on the primary. This presented another challenge, as the block report processing time on the NameNode became overwhelming, making it unresponsive for other operations during the processing time.

The solution was to split each DataNode drive into ten virtual volumes. This partitions the block report into a larger number of smaller-sized volume chunks and makes the report processing on the NameNode more granular.

Overall, the introduction of the satellite cluster allowed us to manage another round of doubling of the infrastructure. It resulted in an improved block-to-file ratio of 1.4.

Consistent reads from Standby Node

Motivation and requirements
In the era of Moore's Law dramatically slowing down—and eventually ending by 2025—the main limiting factor for HDFS scalability becomes the NameNode performance, due to CPU speed restrictions. In anticipation of the next cycle of cluster growth, we realized that even though we can keep up with metadata object growth by increasing the heap size of the NameNode, we cannot rival the increase in metadata operation workloads with a single metadata server. The workload should be parallelized between multiple servers.

An HA-enabled HDFS typically has two NameNodes, one in Active and another in Standby state. Any Standby node is a replica of the Active. Coordination of the metadata state between Active and Standby nodes is handled by Quorum Journal Manager. The Active NameNode publishes journal transactions to QJM, while the Standby tails the transactions from QJM in the same order in which they were executed on Active. This creates an opportunity for reading metadata from Standby instead of the Active NameNode. The Active will remain the only NameNode serving write requests as the sole source of truth for namespace updates.

Our analysis of metadata workloads in production showed that reads comprise 95% of all namespace operations. Therefore, load balancing of reads should substantially improve the total throughput of metadata operations.

However, traditional Standby NameNodes in HDFS are used only for failover and reject any client operations. To overcome this issue, we introduced a notion of an Observer node, which is a Standby that also serves read requests. The Observer node along with all related logic was fully implemented as part of HDFS-12943. We deployed it on our clusters and have been running it in production for over a year.

The main requirements that defined our design decisions included:

  1. Strong consistency. Clients should always get a consistent view of HDFS regardless of which NameNode they are connecting to.

  2. High throughput of metadata operations.

  3. Transparency for existing applications. Client APIs should treat NameNode as a single service.

Consistency model
In HA settings Active, Standby, and Observer nodes follow an identical sequence of events, where each event modifies the state of the namespace. “Namespace state id” characterizes the state of the namespace as it evolves through modifications. State id is a number monotonically increasing with each modification. Every modification on the NameNode corresponds to a journal transaction, so the state id is implemented as the id of that transaction.

The states of the Active and Standby nodes are the same when they reach the same state id. But Standby is a follower of its leader, Active, and therefore is always behind, as it consumes transactions that the Active has already executed.

The Stale Read Problem: from a client viewpoint, the namespace state of Observer is the same as Active’s, except for a limited set of recently modified “new” objects. Even though it can obtain correct information for the majority of “old” files and directories from either of the NameNodes, the Observer may return outdated information about the “new” objects.

The Consistency Principle informally states that clients should never see past states. It is a common assumption, which guarantees that clients look forward through the history of events even when they switch between NameNodes. More formally:

  • If client C sees or modifies an object at state s1 at time t1, then in any future time t2 > t1, C will see the state of that object at s2 >= s1.

We distinguish two major scenarios in which the consistency principle can be violated:

  1. Read your own writes (RYOW)
    If an HDFS client modifies the namespace on Active and then reads from an Observer, it should be able to see the same or a later state of the namespace, but not an earlier one.

  2. Third-party communication (3PC)
    If one client modifies the namespace and passes that knowledge to other clients, the latter ones should be able to read from Observer the same or a later state of the namespace, but not an earlier state.

An example of RYOW is when a client creates a directory on Active and then tries to ls it via Observer. If Observer has not yet processed the corresponding mkdir transaction, it will respond that the directory does not exist. This violates the consistency principle.

The 3PC scenario can occur, e.g., during MapReduce or Spark job submission. First, the job client creates job configuration and JAR files on HDFS. Then, it notifies the YARN ResourceManager to schedule the job tasks. Each task reads the job configuration and JAR files during startup. If the files are still not available on the Observer due to delayed transactions, the job may fail. The third-party communication between the job client and the ResourceManager here happens outside of the HDFS protocol.

In the next sections we explain how our design addresses both scenarios.

Journal tailing: Fast path
In order to guarantee consistency and performance of reads from Observer, journal tailing—the process of fetching new transactions from the QJM—should be fast, minimizing the delay in state updates between Standby and Active nodes. With a traditional implementation of journal tailing, the delay is on the order of minutes—2 minutes by default—but on our large clusters, it was up to 8 minutes. This would increase latency of read operations to minutes, while the maximum tolerable threshold should not exceed 50 milliseconds.

Active NameNode publishes journal transactions to a quorum of Journal nodes comprising the QJM service. When a Journal node receives transactions, it is required to persist them to disk in order to prevent data loss in case of failure. The transactions are written in segments, where a new segment file is rolled based on a configurable time interval (2 mins by default) or the number of transactions received. In the legacy implementation, Standby nodes could tail only entire segments, reading the last segment from disk via an HTTP call once the segment file was finished (closed) by a Journal node.

HDFS-13150 proposed and implemented Fast Path tailing through the following key modifications:

  1. It introduced in-memory caching of recent journal transactions. Journal nodes serve cached transactions to Standby nodes from memory instead of disk.

  2. A Standby node can request a sequence of transactions starting from a specified startTxId. Journal nodes then return all known transactions from the starting id up to a specified maximum batch size. This allows for a fine-grained journal tailing, since typically such requests would return only a few of the latest unapplied transactions.

  3. A request for the latest transactions is an RPC call, which is faster than segment file transfers via HTTP. The RPC call is implemented as a quorum read from Journal nodes to guarantee that Standbys see committed transactions only.

The use of Fast Path tailing is a configurable option. It is required for Observers. During startup or if a Standby node falls too far behind for reading from QJM cache, it automatically switches back to reading persisted segments, which has much higher latency but also higher throughput.

Performance evaluation of Fast Path showed substantial improvement from the original 2-minute lag:

  • The average client-perceived lag of an Observer node in RYOW scenario (the time in which a client could see a transaction on the Observer after applying it on the Active) is under 6 milliseconds.

  • The average transaction processing time, starting from the moment it is submitted on Active until it is applied on Observer, is 25-30 milliseconds.

Fast Path tailing allows Observers to keep their state very close to Active NameNode, but does not eliminate stale reads from Observer, which we will discuss next.

Read your own writes
NameNodes maintain their latest state id as LastWrittenStateId, which corresponds to the latest update of the namespace and the respective transaction written to NameNode’s journal.

We introduce LastSeenStateId for each HDFS client to indicate the latest state of the namespace observed by the client. It is automatically updated with every call to a NameNode and is set to the respective LastWrittenStateId. E.g., when client C sends a write request to Active node, the call once executed will also set C.LastSeenStateId = A.LastWrittenStateId of the Active. Now C sends a read request to Observer passing along its C.LastSeenStateId. The Observer verifies that its own O.LastWrittenStateId matches or exceeds client’s C.LastSeenStateId and postpones the request execution until it is caught up. The call to Observer also resets C.LastSeenStateId = O.LastWrittenStateId., so subsequent calls to Observer will not be delayed for C. The client can always switch to the Active without a delay because:

A.LastWrittenStateId >= O.LastWrittenStateId >= C.LastSeenStateId.

LastSeenStateId is passed seamlessly between servers and clients as part of the RPC header.

As we have seen with Fast Path tailing, the state lag of Observer to Active is small enough to be comparable to the time needed for a client to switch from one node to another. If an Observer falls behind the client state farther than an allowable threshold, it notifies the client, and the client switches over to another Observer or the Active NameNode.

LastSeenStateId guarantees that clients never see past states of the namespace in an RYOW scenario.

Third-party communication: msync()
In the 3PC scenario, one client C1 creates, e.g., a file on Active NameNode while another client C2 is expected to see it on an Observer. C2 has the knowledge that the file was created but does not know LastSeenStateId of C1, which can cause stale information to be read. We introduced a new HDFS API FileSystem.msync() to guarantee consistent reads for C2, along with a series of enhancements to avoid actually calling msync() in common cases.

msync() is similar to HDFS hsync(). The latter guarantees that the data is available for other clients to read, while msync() provides the same guarantee for metadata. When C2 calls msync() the client contacts the Active NameNode with the sole purpose of forcing the update of its C2.LastSeenStateId. Now that it has learned the current state of the namespace, which is guaranteed to be at least C1.LastSeenStateId, C2 can safely read the file from Observer.

Adding an explicit msync() call to existing applications contradicts our transparency requirement (#3) listed above. It is not feasible to expect all applications to change before the new feature is deployed in production. This prompted us to make clients automatically synchronize their states, thus avoiding explicit msync() in most cases.

  1. No-cost automatic client state synchronization on startup. When an HDFS client starts, it discovers the HA status of existing NameNodes. The namespace state id is piggybacked on these calls.

  2. Periodic synchronization forces clients to automatically call msync() after a configurable period of time.

  3. Always-msync mode is a special case of the former when the time period equals 0. It forces the client to automatically call msync() before every read request.

  4. Always-active mode. The client is configured not to use Observer for reads.

Startup synchronization (1) particularly solved the stale read problem for job submission frameworks, as discussed earlier. When a new MapReduce or Spark task starts, it instantiates a new HDFS client, which automatically catches up with the latest state of the Active node and thus prevents stale reads from Observer.

Always-msync (3) is an expensive option, as every metadata operation results in two RPC calls.

The enhancements above prevent explicit use of msync() for the majority of use cases. It remains necessary for a narrow type of long-running read-only clients. Such clients exclusively read from Observer and can be dragged to fall behind along with the Observer. They should either call explicit msync() at key moments controlled by the application logic or use always-active mode (4).

  • diagram-of-hdfs-cluster-architecture-with-consistent-reads-from-observer

Figure 3. HDFS cluster architecture with consistent reads from Observer.

Performance results
Our preliminary performance estimates using Dynamometer projected an overall gain of 2x in metadata operations throughput with consistent reads from Observer. The final results exceeded these expectations. On our largest production cluster, the total throughput of namespace operations increased 3x, with an average latency dropping to 1/3 of the previous. Now, the NameNodes perform 150K ops/sec on average, peaking at 250K ops/sec, with an average latency of 1-2 msec.

Scaling beyond the Hadoop ecosystem

This section is dedicated to the expansion of the HDFS ecosystem and its use cases by introducing existing features of HDFS and integrating them with internal tools, by developing new HDFS functionality, and by building new internal systems. This represents yet another dimension of scalability.

Port-based selective wire encryption
In recent years General Data Protection Regulation (GDPR) and California Consumer Privacy Act (CCPA) have put stringent requirements on access and collection patterns of personal data. Network security is an important part of data protection. A common practice in this regard is that local-area network (LAN) traffic, such as within a data center, is recommended to be encrypted and “links over wide area networks […] should always be encrypted due to the difficulty of guaranteeing the physical security of those channels.”

Sensitive data communications over WAN or LAN are always encrypted. However, encrypting traffic inside a data center incurs a performance penalty. We designed an approach that introduced multiple ports on HDFS NameNode for clients’ access to help solve this challenge. This way, one port is used to enforce encrypted communication, while another port accepts unencrypted calls. We further introduced firewall rules, which expose only the port for encrypted traffic to the outside clients. This guarantees that cross-data-center traffic is always encrypted, while intra-data-center communications remain efficient.

WebHDFS is a standard Hadoop tool, which provides REST APIs to access HDFS data over HTTP and HTTPS. Previously, we leveraged WebHDFS over secure HTTPS to achieve this goal. However, WebHDFS is slower than RPCs and direct data transfers, and it entails development costs, as we had to use different APIs for internal and external clients, thus implementing the same logic twice.

We thoroughly evaluated different solutions from open source, including the existing solution of selective encryption HADOOP-10335 based on allow lists. The major concerns with that approach are consistency and scalability, as it requires maintaining a list of allowed clients on all DataNodes. Keeping such configurations consistent across thousands of DataNodes is difficult, since adding or removing a single node requires changing all DataNodes on the cluster.

Our approach implements the following changes instead: 

  1. It generalizes NameNode RPC Server to listen on multiple ports. Each port has its own security configuration defining whether it enforces encryption or not.

  2. Clients can talk to different NameNode ports, which enables RPC call encryption when a client connects to the encrypted port.

  3. The NameNode sets a field in the block access token indicating the security policy it enforces. The token is returned to the client.

  4. The client in its turn presents the block access token to DataNodes. The DataNode validates the token and further enforces the same security policy as the NameNode. This warrants the data transfer encryption correlates with the RPC policies.

In addition, we apply the follow operational procedures:

  1. The firewall rules block the unencrypted NameNode port for cross-colo access.

  2. Clients are configured to connect to different NameNode ports depending on their location outside or inside the data center.

This approach bears minimal operational overhead. It requires only a few configuration changes to enable. Our benchmarks show that port-based selective encryption outperforms WebHDFS over HTTPS. We see 36-46% reduction in read/write latency and 56-85% increase of read/write throughput.

This feature was developed under HDFS-13541. See also our presentation at ApacheCon 2019.

Encryption at rest
A combination of wire encryption and HDFS encryption at rest enhances our compliance with data protection laws. Client-side transparent encryption at rest is a standard feature provided by HDFS. It allows users to set up certain directories as “encryption zones.” Any file written to an encryption zone is automatically encrypted with a file-specific data encryption key (DEK). HDFS clients transparently decrypt file data when reading them.

An encryption zone is associated with a dedicated encryption zone key, which is generated when the zone is created and is used to encrypt file DEKs. Encryption zone keys themselves are stored in a Key Management Service (KMS) and never leave it.

When a client creates a file, it asks KMS to generate a new DEK, which is subsequently used by the client to encrypt file data. KMS also encrypts the DEK with the corresponding encryption zone key and the encrypted DEK is stored as a file attribute on the NameNode. When reading a file, the HDFS client receives the encrypted DEK while opening the file, asks KMS to decrypt it, and then uses the DEK to decrypt file data. With client-side encryption, data remains encrypted both in transit and at rest.

LinkedIn has its own key management service, LiKMS, which is the only service certified and approved for managing cryptographic keys and secrets internally. We used pluggable interfaces such as KeyProvider supported by HDFS to integrate LiKMS with transparent encryption at rest.

The key goal of encryption at rest is protecting personal and confidential data from unauthorized access in persistent storage. There are different ways to address the goal.

  • Full disk encryption on the operating system level seems like the most straightforward approach. But although it provides adequate protection from unauthorized access to hardware devices, the data remains transparent when accessed via HDFS.

  • The drawbacks of application-level encryption include non-uniformity: every team needs to reimplement the same cryptographic techniques in their systems. It also propagates the problem of key sharing between data producers and data consumers to applications.

  • Column-level encryption targets specific columns containing sensitive information in stored datasets. The fine-grained encryption is efficient and convenient. The columns targeted for encryption are usually marked as such via annotations in the dataset schema. This, however, proved to be error-prone. As schemas evolve, human errors can cause improper annotations resulting in inadvertent leakage of confidential data.

We chose dataset-level encryption, which protects entire datasets that may contain confidential or private information. The approach utilizes HDFS encryption at rest with LiKMS and safeguards against all major threat models uniformly for all applications.

A lot of data produced on HDFS eventually needs to be transferred to other environments, such as online services that handle member queries. This data could belong to machine learning models, search indexes, etc. A few years ago, we noticed a proliferation of pipelines from different teams performing such transfers. Different teams came up with a variety of approaches. One of them was based on peer-to-peer file sharing techniques such as BitTorrent. While functional, the support burden and performance limitations left much to be desired.

The shared goal of these pipelines is distributing data from a single HDFS cluster to multiple instances of a geo-distributed service. We built Wormhole, a single library to facilitate this common pattern. A user simply performs a PUSH into Wormhole from their offline job and then a FETCH from each instance of their online app. The details of the data transfer are hidden under the hood.

  • diagram-showing-how-wormhole-streamlines-data-transfers-from-hdfs-to-online-services

 Figure 4. Wormhole streamlines data transfers from HDFS to online services.

Wormhole has two main ingredients to streamline data transfers. First, we leverage smaller HDFS clusters, called drop box clusters, that are located in each data center. When data is pushed into Wormhole, we replicate it to the set up drop boxes in the data centers with the service that requested the data. Service instances, which could number in the hundreds in each data center, will then fetch data from a colocated drop box cluster instead of fetching over the inter-data-center network. This minimizes the amount of cross data center traffic and provides low latency when the services fetch the data. The second ingredient is leveraging Apache Gobblin to perform fast distributed copies. Gobblin determines the appropriate level of parallelism, handles failure scenarios, and retries if needed.

In addition to the above infrastructure for efficient data transfer, Wormhole has an abstraction layer to help users manage data residing in it. This shim-layer provides simple versioning for datasets. Wormhole also groups together related datasets into collections we call namespaces. Access control can be done at the namespace or dataset level. Building Wormhole has greatly improved the onboarding experience of  online services that need to fetch data from HDFS. It provides a single pipeline into which we can invest sufficient resources to harden and optimize. Wormhole as an abstraction layer gives us the flexibility to exchange the underlying storage system for datasets with minimal user involvement, which is key in our journey into the cloud.


Building an Exabyte-scale infrastructure involves a continuous effort of many teams and individuals in creating new features and use cases, as well as supporting and evolving it. Big kudos to all. We would like to thank everybody who reviewed and helped improve this post: Ganesh Ananthakrishnan, Sunitha Beeram, Keqiu Hu, Jonathan Hung, Virajith Jalaparti, and Erik Krogen.