Open Source

Theory vs. Practice: Learnings from a recent Hadoop incident

Co-authors: Sandhya Ramu and Vasanth Rajamani

For companies and organizations, failure tends to be far more illuminating than success and the lingering effects of a failure can be harmful if the team moves too quickly and does not resolve the issue in a thorough and transparent manner. We recently ran into a large incident that involved data loss in our big data ecosystem and by reflecting on our diagnosis and response, we hope that our learnings from an impactful incident in our big data ecosystem will be insightful.

Here’s what happened: roughly 2% of the machines across a handful of racks were inadvertently reimaged. This was caused by procedural gaps in our Hadoop infrastructure’s host life cycle management. Compounding our woes, the incident happened on our business-critical production cluster.

Theory meets practice

Like most big data systems, our architecture has built-in safety nets that will save the day until a certain threshold. At LinkedIn, we have two large Hadoop clusters independently populated with data. Within each cluster, every data block is replicated three times providing rack-level failure redundancy. The two ingestion pipelines in each cluster have completely independent paths for ingesting tracking, database data, etc., in parallel. Below is a depiction of the high-level architecture diagram:

diagram-showing-hadoop-dual-ingest-pipelines

Our disaster recovery strategy has largely been focused around replicating data from a cluster in the event of data loss in another. Let’s compare how this practice held up with theory under the cloud of a real and significant incident.

Large consequences of small losses
Compared to the loss of machines that was inadvertently spread across a handful of racks, the data loss was relatively small (~0.02%). However, we knew this would result in permanent data loss for the (HDFS) blocks in which all three replicas happened to be on those affected racks. Had we lost all machines in the affected racks, the data loss would still be small (~0.15%). We did not lose all machines in the affected racks and hence, the number was likely to be even smaller (~0.03%). However, we discovered that the actual number of lost blocks was even smaller (approximately ~0.02%). We hypothesize that during the short interval when we were losing machines, the Hadoop NameNode diligently replicated some of the blocks reducing the overall impact. In aggregate, the issue affected 0.05% of all possible files.

At LinkedIn, the vast majority of scheduled workflows (Spark-based applications) are triggered via Azkaban. Corruption in a small number of files will cause a large number of Azkaban workflows to fail. The damaged files belonged to hot datasets that many workflows consumed. Further, when an Azkaban flow tries to read a large dataset containing many files, even a single file corruption will cause the flow to fail. The combination of these factors meant that while only 0.02% data was corrupted, 10% of our workflows failed (many of which were revenue-impacting).

To summarize, combinatorics worked in our favor to minimize the number of block losses. However, even this seemingly small amount of data corruption resulted in a significant impact on production.

Challenges

As alluded to earlier, our architecture is designed to have two Hadoop clusters in different data centers with their own parallel data ingestion pipelines. Data can thus be recovered from one cluster to another. When we first started responding to the incident, we assumed that we could easily recover all data generated in parallel across the two clusters and that the primary challenge would be in handling corruptions in the intermediate data generated by Azkaban workflows that are not replicated across clusters unless explicitly requested. Due to several external factors (capital expenditures, frequency of change, ease of regeneration, and recovery time objective), not all intermediate data generated by Azkaban workflows in one cluster is replicated to the other cluster by default and we encountered some unexpected challenges along the way.

The first discovery was that our recovery would require significantly more data copy than anticipated. This is because each Hadoop cluster, completely independent of the other, organized the data slightly differently. We use a time-based data layout while ingesting new data. Consequently, the files in a particular folder can differ across clusters due to inherent non-determinism. Further, we compact older data into large files in a single daily partition for query efficiency and system performance. The culmination of these subtle differences meant that it was not straightforward to identify the lost files in the unaffected cluster. We would in fact have to copy the entire partition for the day whenever there was a small corruption inside that daily partition. For further context, copying a single file might mean moving 1 GB worth of data, while copying an entire day’s partition would be ~1TB. The increase in the amount of data that needed to be copied increased the time for recovery significantly. 

To mitigate the process, we leveraged the following techniques:

  • At LinkedIn, we have QoS (Quality of Service) controls implemented that manage our network resources by setting priorities for specific types of data on the network. Member traffic takes precedence over HDFS data. In order to help our recovery efforts, network backbone paths were temporarily altered to give twice the bandwidth to significantly speed up data recovery.
  • We performed recovery in a data-driven manner. Recovery of datasets were prioritized based on a combination of the percentage of data lost in a given partition, freshness of the data, and the usage level of the data lost. 
  • We coordinated the deletion of the corrupted data with flow owners. The challenge was that Azkaban flows can retrigger based on a predefined schedule and some flow owners strictly preferred not to run their flows with partial data (with the deleted corrupted files), while others were sensitive to latency and had a strong preference to restart computation. If more than one flow used the same piece of corrupted data, a global policy had to be agreed upon across flow owners.
  • When we added 2x network bandwidth to speed up data copies across clusters, a configuration error in the network routing tier caused a higher stress to the overall system resulting in compounding failures. The stress on the system from an accelerated large-scale recovery can sometimes be riskier than performing a gradual recovery without pushing the system well past its limits. While we understood this intellectually going in, we did not fully grasp the risk involved.
  • The team published a few options for flow recovery. Based on business impact and costs, the application owners decide the degree of resilience for their respective flows. Some of our most critical business metric computation flows run on both clusters in an active/active HA (high availability) mode. These SLA-critical computation flows had zero impact due to this outage. However, this is also the most costly option. We provided three alternatives for flows without well-defined preexisting resiliency strategies: run the flow with partial data, wait for full data recovery (which could take several days), or run the flow in the D/R cluster that did not experience the failure. This last option worked well for some cases that had Azkaban flows running active/active mode; however, not all flows readily had this setup.

Given the scale of Hadoop at LinkedIn, we had to work with several flow owners (our customers) to hone in on their strategies in real time. Some owners, whose flows were latency-sensitive, chose to immediately work off partial data while others chose to wait for the complete recovery of the data.

The logistics for speedy recovery

While we missed the traditional mechanisms (i.e., gathering in person in a war-room) for organization due to remote working, we spun up a virtual team over night across U.S. and India time zones. Organizationally, this looked like the following:

  • A hub-and-spoke model consisting of multiple tracks with a designated track lead. Each track lead established success criteria for recovery in their respective tracks.
  • Track-specific group messaging was set up to reduce chatter in the central team messaging. This provided a space for tracks to handle ad hoc queries as they surfaced during the multi-day recovery process. Similarly, additional messaging channels were created for specific customer groups to increase transparency and coordination.
  • Communication cadence was established with regular touch points among track leads and within tracks in smaller groups to keep everyone on the same page. 
  • The Grid team was responsible for tying all the tracks together and broadcasting email communications at a regular cadence about the status, while keeping a detailed log of the recovery.

Overall, recovery was split into two major initiatives: data recovery and flow recovery. While data recovery was handled by the Grid team, flow recovery had dedicated track leads who worked within their business units to identify the best approach for their flows. They then proceeded with the most viable option and tracked recovery progress to share more broadly (see options in the next section). The Grid team established a phased approach, based on global priority across business units, to keep our complex system balanced and ensure that they would not tip over under undue stress created by the combination of data recovery and flow recovery compute needs happening in tandem along with regular workloads. The team added compute capacity gradually to keep up with excess demand from the recovery.

Learnings institutionalized

There are always learnings at the end of a large incident. Here are several that we are following through on:

  • Establishing a robust and a more comprehensive host life cycle management for Hadoop Infrastructure.
  • Building a better understanding of our network behaviors across data centers under load and ensuring that an automated way to modify network routing on demand is in place.
  • We’re currently building our next-generation infrastructure on Azure including Hadoop stack. For the medium term, we will have an additional cluster that is built on a completely different tech stack, which should further help with redundancy. 
  • Investigating the feasibility of other architectures as a part of our Azure migration. For example, we could ingest the data once and copy the same data to a D/R Cluster and eat the latency costs through data layout and query planning optimizations. We are in the process of adopting Apache Iceberg as our table format. With Iceberg, we should be able to perform targeted recovery for just the affected files significantly better. In the interim for our current architecture, we have built several tools that allow us to aid with recovery (e.g., recovering all but the corrupt data, recovering large files from the other cluster more easily, etc.) and built runbooks around it for easy access.
  • Working towards auditing our flows to ensure that they have well defined disaster recovery protocols.
  • Increasing the frequency of our fire drills, in addition to reviewing the scorecard of how the flows performed in the fire drills against their stated recovery policies.
  • Continue investigating our tooling around understanding lineage as it proved to be extremely useful in identifying dependencies around flows and data. This will also offer the ability to understand the ecosystem’s connection graph end-to-end—this will be invaluable during a large coordination event like a disaster recovery.
  • Some flow owners are baking in resiliency in their application workflows itself. For example, latency-sensitive applications that generate key business hourly and daily metrics are making an explicit data staleness vs. resiliency tradeoff in the application logic itself.
  • Focusing on improving our ability to project data availability SLAs for data recovery to have the ability to publish them quickly if an event of this nature were to happen again. Our internal data consumers can use these SLAs and make informed decisions with respect to their decision choices in the recovery protocol.

Acknowledgements

Our virtual team spanned large swaths of the organization ranging from teams in product engineering, AI and data science, infrastructure, to systems and networking. Acknowledging everyone individually would take more space than the contents of this blog! A heartfelt thank you to everyone that helped out, especially to those in the virtual tiger team that consisted of engineers from Grid, Artificial Intelligence, and Product engineering partners. Last, but not the least, thanks to our data engineering leadership team for their unyielding support during the entire episode.