Deployment of Exabyte-Backed Big Data Components

Co-authors: Arjun MohnotJenchang HoAnthony QuigleyXing LinAnil AlluriMichael Kuchenbecker

LinkedIn operates one of the world’s largest Apache Hadoop big data clusters. These clusters are the backbone for storing and processing extensive data volumes, empowering us to deliver essential features and services to members, such as personalized recommendations, enhanced search functionality, and valuable insights. Historically, deploying code changes to Hadoop big data clusters has been complex. As workloads and clusters grow, operational overhead becomes even more challenging, including rack maintenance, hardware failures, OS upgrades, and configuration convergence that often arise in large-scale infrastructure.

To help smooth our deployment experience, we built a rolling upgrade (RU) framework. Our RU framework ensures that our big data infrastructure, which consists of over 55,000 hosts and 20 clusters holding exabytes of data, is deployed and updated smoothly by minimizing downtime and avoiding performance degradation. The framework incorporates proactive measures to monitor critical events, provides deployment insights, and issues proper notifications to enable seamless deployments. As a result, the framework significantly reduces the need for manual intervention by introducing automatic pauses and resumes of the deployments based on multiple factors, as well as considering maintenance activities and the cluster’s health.

In this post, we’ll explain how we built our RU framework to power a frictionless deployment experience on a large-scale Hadoop cluster, achieving a >99% success rate free from interruptions or downtime and reducing significant toil for our SRE and Dev teams.

Historical deployment challenges

In the past, our approach to deploying big data components relied heavily on Secure Shell Protocol (SSH), which led to a series of challenges for our deployment operations. Scalability was a significant concern, as using SSH for multiple connections simultaneously caused delays, timeouts, and reduced availability. These SSH-based processes consumed resources, negatively impacting our server and service performance.

The potential for incorrect upgrades also became a risk, since the low observability and reduced reliability of the framework could lead to missing blocks, data corruption, and the possibility of an “under-replicated block storm,” where the namenode attempted to replicate data blocks to datanodes needing more replications. All of these contributed to cluster performance degradations and increased client-side latency, often resulting in timeouts during data reads.

The lack of a state data store, detailed reporting, integration with other tools, and orchestration capabilities also imposed significant manual overhead. This made verifying changes and managing complex workflows challenging, as the deployment framework failed to monitor critical metrics like RPCQueueTimeAvgTime for namenode, which needs to be less than 10 milliseconds, that are essential for meeting strict service level agreements (SLAs).

The historical upgrade system couldn’t adapt to architectural changes like the introduction of an observer namenode (Now handling a massive influx of read requests – 150K QPS – from services such as Trino), ZKFC auto-failover, HDFS federation, etc. These limitations impacted our ability to support our Hadoop infrastructure, with over 1 billion file objects for the larger clusters and approximately 600,000 YARN applications running daily, highlighting the need for a more robust and scalable deployment approach for the Hadoop Distributed File System (HDFS).

The new Rolling Upgrade framework

The new RU orchestration design significantly enhanced our big data components deployment process. Utilizing gRPC routines, the orchestrator communicates with agents and guides the process with thorough inspections before and during deployment. Scheduled evaluations periodically check missing blocks, under-replicated blocks, the safe-mode status of the namenode, configuration consistency, cluster capacity, and hardware failures.

The system maintains a robust deployment history, seamlessly integrating with tools like version drift to detect deployed version differences. During cluster degradations, the framework auto-pauses and resumes, mitigating potential intricacies. Highly customizable, it adapts to architectural changes and simplifies the addition of custom validations. Moreover, the system proactively alerts on deployment issues, providing signals with relevant inGraphs metrics, dashboards, and runbooks, reducing manual intervention and enhancing issue resolution efficiency.

Various deployment strategies are now available, including full deployment for all live nodes, canary deployment targeting a specific node percentage, tailored deployment on targeted hosts, and a restart-only mode performing service restarts without executing the actual deployment (Dry Run). The new orchestrator agent design offers versatility and significantly improves the big data deployment process, making it smoother and less prone to issues.

Architecture

The RU framework, written in Golang, consists of the orchestrator and the agent. The orchestrator coordinates the rolling upgrade process by executing the defined commands and managing interactions between multiple agents via gRPC routines, as depicted in Figure 1. The orchestrator’s implementation of the upgrade procedure’s logic performs most of the work.

Diagram of architecture
Figure 1: High-level rolling framework architecture

Agents run across all the nodes, such as namenodes and datanodes. They are responsible for updating the big data infrastructure, and agents get deployed during the node setup. The agent operates as a system service with enforced resource limitations, such as a Nice value set to 19, a memory limit of 512MB, and a CPU quota of 200%, using Cgroups to ensure it does not introduce additional load to the host. Agents ensure that the service is up to date by checking the deployed version of the component, as illustrated in Figure 2. If it’s not the correct version, agents make the necessary updates and restart the service when they receive the signals from the orchestrator.

Diagram of Agent state transition diagram
Figure 2: Agent state transition diagram

State transition data is transmitted back to the orchestrator, and based on the response type, the orchestrator decides on its success, skipped, failures, etc., and updates its state in the data store on the progress of its execution. This real-time synchronization enables the orchestrator to make informed decisions. The orchestrator evaluates critical metrics, conducts pre-, mid-, and post-flight checks, and maintains deployment integrity (refer to Figure 3) that empowers the orchestrator to adapt to unexpected events, and ensure the deployment’s robustness. The deployment process continues by working in batches and systematically upgrading components, which enhances operational efficiency and increases the deployment’s reliability, making it a resilient approach for upgrading the big data components.

Diagram of a generalized rolling upgrade deployment flow
Figure 3: Generalized rolling upgrade deployment flow

Namenode deployment overview

The namenode is the central component of HDFS and is responsible for storing the metadata information about files and directories in the HDFS cluster. This metadata includes the namespace, file permissions, and the mapping of data blocks to datanodes. We have CRS (Consistent Reads from Standby) enabled in most of our clusters for high availability. It consists of four namenodes, named “HA1”, “HA2”, “HA3”, and “HA4”. At any given point in time, there is always one active (HA1) and a variable number of observer (HA3) namenodes (Framework supports multiple observer namenodes), and the other two are standby namenodes (HA2, HA4).

During the upgrade procedure, the framework automatically manages and changes the state of each namenode. There are eight stages in the upgrade plan, each with specific steps and verification points to ensure the procedure’s correctness. At the beginning of the upgrade process, in stage one, the orchestrator performs pre-flight checks, which include:

  1. Validity of the deployment package.
  2. No concurrent upgrades are happening within the cluster.
  3. The active namenode count should be one.
  4. RPCQueueTimeAvgTime needs to be under 40 milliseconds.
  5. Accessibility of all namenodes.
  6. 0 missing blocks.
  7. Under-replicated blocks are less than half a million.
  8. No namenode should be in safe mode.
  9. HDFS capacity is less than 85%.
  10. FSImage’s Last Checkpointing time to be under 18 hours.
  11. All namenodes have the same number of live nodes.

Various mid-flight checks are executed through the deployment life cycle, including verifying that required packages are available on the hosts where an upgrade is in progress. These checks also confirm the absence of active HDFS alerts and that the upgrading hosts have no hardware failures.

The complete process involves the following stages (refer to Figure 4):

Initial state of cluster: HA1 (active), HA2 (standby), HA3 (observer), HA4 (standby)

  1. Stage 1: Pre-checks for all the namenodes
  2. Stage 2: Upgrade standby namenode (HA4)
  3. Stage 3: Upgrade standby namenode (HA2) 
  4. Stage 4: Transition HA4 to the observer 
  5. Stage 5: Transition observer to standby (HA3 to standby)
  6. Stage 6: Upgrade standby namenode (HA3)
  7. Stage 7: Failover active namenode from HA1 to HA2 (this doesn’t incur any visible client-side downtime, since they handle the transitions through their inbuilt retry mechanism).
  8. Stage 8: Upgrade standby namenode (HA1 – previously active)

Final state of the cluster: HA1 (standby), HA2 (active), HA3 (standby), HA4 (observer)

Diagram of a high-level representation of orchestrator interacting with namenode for upgrade
Figure 4: High-level representation of orchestrator interacting with namenode for upgrade

Datanode deployment overview

datanode is a type of worker node in an HDFS cluster. It’s responsible for storing the original data by dividing it into blocks and serving it to clients when requested. Here is how our RU framework deploys the datanode in an HDFS cluster (see Figure 5 below):

  1. The orchestrator gets the list of live datanodes from the namenode and aggregates them based on the racks.
  2. It sends a batch of hosts for an upgrade. We use a batch size of three worker nodes and a sleep time of 30 seconds between each batch.
  3. The agent running on the datanode receives the request from the orchestrator and checks the host’s hardware, required packages, and datanode service status. Based on these factors, it gracefully stops the datanode process (Clients will retry all read requests for that datanode when it becomes operational again, within a timeout threshold - of under 30 seconds). The agent then upgrades the required RPM package for the datanode component.
  4. Finally, the agent starts the datanode and verifies whether the node reports back to the namenode. Depending on the status, it sends the report back to the orchestrator.
  5. After upgrading every small set of datanodes (e.g., 15/30 DNs), the orchestrator validates certain conditions and pauses in case any one of the following validations fails:
    1. No missing blocks.
    2. Under-replicated blocks are less than half a million.
    3. No active HDFS alerts are in place.
    4. RPCQueueTimeAvgTime needs to be under 40 milliseconds.
    5. No namenode should be in safe mode.
    6. HDFS capacity is less than 85%.
    7. All namenodes have the same number of live nodes.


Diagram of the high-level representation of orchestrator interacting with datanode for upgrade
Figure 5: The high-level representation of orchestrator interacting with datanode for upgrade

Version drift detection

Version drift can be a significant concern in maintaining a large-scale system. This issue typically arises when some hosts undergo operating system upgrades or maintenance, necessitating their temporary removal from the rotation. During these periods of downtime, the RU framework may skip these hosts for the upgrade process. However, once these hosts return to the system, version drift identifies these nodes and prompts the RU framework to deploy the latest configurations.

The version drift framework consolidates data from various sources, as shown in Figure 6, to create a comprehensive list of worker nodes currently running outdated versions. This framework operates on the scheduler, periodically polls relevant metrics, aggregates data, and determines which nodes have drifted. For real-time monitoring of the system’s health and version information, each worker node runs a process called “grid health stats.” This process continuously sends metadata information to Kafka, including health reports and version data, among other details. This data is then extracted, transformed, and loaded by Apache Gobblin and stored in the HDFS. The data is accessible through Hive and Trino, allowing queries for different dates and timestamps. A hardware checks service also collects and maintains hardware failure reports for the worker nodes in a MySQL database.

The version drift framework queries the Namenode JMX to obtain the live datanodes currently reporting to the namenode and analyzes version drift through the grid health stats for all those nodes. It ensures that nodes with active hardware failures are skipped, submitting a filtered set of healthy hosts back to the RU framework for the required upgrades. Ultimately, version drift ensures the timely and reliable upgrade of all healthy drifted nodes, helping to meet compliance requirements for live nodes in the system.

Diagram of version drift detection overview
Figure 6: Version drift detection overview

Challenges and lessons learned

Building the Rolling Upgrade framework presented several significant challenges. The deployment complexity was evident from the start, as we had to deal with multiple moving components, including Linux containers, System Init, hardware checks, configuration management, version drift, namenode and datanode health metrics, and intricacies of the HDFS itself. Managing corner cases for these components required attention to detail to prevent deployment failures and potential impacts on our production environment. Coordinating overlapping events and the progression of rolling upgrades across many hosts was another substantial challenge. We needed a deep understanding of system dependencies to ensure a smooth deployment process.

Prioritization was essential in building this robust framework. We initially focused on the stability of the deployment framework and only later addressed secondary priorities like environment dependencies, customization ability for future architectural changes, version drift, and observability. We also developed a thorough testing process with the newer requirements and enhancements by simulating the deployment process on the test clusters. These became essential steps in our strategy to mitigate potential disruptions during the RU execution on production clusters. Through these measures, we aimed to ensure a smoother and more reliable upgrade process of our big data system in the face of this multifaceted deployment challenge.

Accomplishments 

The deployment platform exhibits a high success rate of over 99% for datanode deployment (In 1% of the failed nodes, it also includes the skipped hosts due to hardware issues). In comparison, deploying the namenode component approaches a success rate of 100% without apparent issues, as highlighted in Figure 7. There were zero production incidents throughout the Rolling Upgrade deployment, and it has successfully demonstrated its production readiness by providing a reliable and robust service. The framework has been extensively utilized in numerous namenode and datanode deployments, solidifying its role as a foundational service for HDFS.

Moreover, the namenode restart-only deployment mode has proven to be a reliable solution for addressing issues like GC pauses. In cases where these prolonged pauses are unavoidable, this mode provides a robust mechanism to execute active namenode failovers and service restarts promptly, ensuring the continued reliability of the cluster’s ability to handle traffic seamlessly. The restart-only deployment mode also played a pivotal role in the reliable restart of all grid services for the SSL certificate renewal on each grid host. These improvements represent a substantial advancement in the current state of the art in end-to-end deployment of the big data components.

Image of the success rate of HDFS deployment across multiple clusters over time
Image of the success rate of HDFS deployment across multiple clusters over time
Figure 7: The success rate of HDFS deployment across multiple clusters over time

Conclusion

Through collaborative endeavors and ongoing efforts, we developed a deployment framework for big data components that places paramount importance on system stability and guarantees reliability throughout the different phases of the deployment lifecycle. By incorporating state transitions, metric assessments, and comprehensive pre-, mid-, and post-flight checks, this in-house framework ensures the precision and reliability of the systems while upgrading the components in the intricate landscape of the big data systems holding exabytes of data.

Acknowledgments

The successful development and implementation of our Rolling Upgrade (RU) framework, which plays a pivotal role in ensuring the seamless deployment and updates of our extensive big data infrastructure, would not have been possible without the contributions, support, and dedication of many individuals and teams. First and foremost, we extend our heartfelt gratitude to the Grid Hadoop Team, our talented and dedicated developers, and SREs who have been instrumental in shaping this deployment framework. We sincerely appreciate the continuous support and guidance provided by our leadership team, including Sandhya RamuAyyappadas RavindranAndy SantosaAmit BalodeChen QiangMario CarvalhoVirajith JalapartiAkbar KM, and Colin Pereira.

We also want to sincerely thank the peer reviewers of this blog, including Akbar KM and Venu Ryali, for their valuable feedback and insights. Furthermore, a special mention goes to our incredible TPM, Jayshree Chatterjee, for her outstanding leadership and drive in overseeing this project.