Data Streaming/Processing

From daily dashboards to enterprise grade data pipelines

Within a matter of hours of each day beginning, we ingest tens of billions of records from online sources to HDFS, aggregated across 50+ offline data flows, and visualize 40+ metrics across our business lines to support a dashboard providing company leaders with timely and critical business insights. Dubbed the Daily Executive Dashboard (DED), this dashboard contains critical growth, engagement, and bookings metrics that indicate the health of LinkedIn’s business. In this blog post, we’ll spotlight the challenges to DED’s delivery and performance posed by the growing complexity of computing and aggregating these metrics, and how this dashboard transformed our approach to data pipelines across the company. 

Since DED's inception, the complexity of computing these metrics has grown not only in terms of the “what”—it now relies on 70+ online sources that span various business and product lines—but also in the “how,” for which there have been countless questions and challenges we’ve had to tackle over the years. How should data be ETL’ed (Extracted, Transformed, Loaded) from online sources to offline systems? How should data be stored and processed? How can we ensure data availability and quality? While the answers to these questions varied depending on the state and needs of the business, our data infrastructure has continually evolved such that DED has upheld its strict delivery requirements and withstood the test of time.

Growing pains

Under the hood, DED is not only at the mercy of a growing amount of data, but also relies on every layer of LinkedIn’s data technology stack. This includes our online applications triggering tracking events on member engagement, real-time data streaming systems (i.e., Kafka), offline data storage and flows, and our data analytics platforms. If any part of this end-to-end pipeline is not performing as intended or cannot handle the pressure of growing data volume, DED may not deliver as required. Thus, ensuring DED remains timely and performant has required accountability throughout the data technology stack, pushing our data ecosystem to scale through any growing pains, of which there have been many.

DED’s unique positioning at the center of business criticality, its dependencies across our data ecosystem, and its benchmarking for how each of its components is performing has put DED at the forefront of major changes through LinkedIn’s data tech stack evolutions. This post will cover the major evolutions of DED and how we solved for several iterations and challenges. For the purpose of this post, we will primarily focus on the offline data warehouse technologies involved in producing DED and how they have evolved. We’ll break down the history of DED across three distinct phases: surviving, scaling, and thriving.

Part 1: Survive

In the early years of LinkedIn, our startup mindset drove product and engineering teams to iterate quickly in order to find product-market fit and, subsequently, keep up with the rapidly growing business. Scale and survival were P0.

At the time of DED’s first implementation in 2009, Oracle was used in our data warehouse for data storage and processing, and was adopted for DED’s use case. This fit our initial needs because the data processing size for DED was within the capabilities of our internal Oracle infrastructure and we were able to iterate quickly to bootstrap a version of DED on our existing stack. 

Aside from Oracle, DED leveraged several other technologies within our data warehouse stack that were externally licensed or sourced. For example, we needed the capability to automate and coordinate an entire dependency tree of data ETL flows, pulling data for different business lines and from various upstream (e.g., Kafka, Oracle) sources, to produce the final set of metrics. For this, we utilized AppWorx as the scheduler and workflow executor to automate our data flows to aggregate and calculate the DED metrics. Informatica was used as an ETL tool and MicroStrategy was used to visualize the report. 

By early 2011, LinkedIn surpassed more than a billion page view events every day, which rapidly increased the demands on our data infrastructure. At that scale, our engineers were often working around the clock to salvage the database and play catch up on our data flows. With the survival of DED at stake and our infrastructure becoming expensive to scale alongside the increasing data volume, the next step was moving to processing on Teradata. While at the time LinkedIn had premature adoption of Hadoop, it was still in its nascent stages and it lacked tools for building Hadoop ETL flows. Furthermore, when evaluating Hadoop processing (i.e., using Pig) as an alternative, we found it lacked user friendliness and debuggability at the time, in addition to requiring high engineering efforts and costs for migration. Therefore, for the sake of business continuity and keeping our site “up,” Teradata was chosen to meet our growing demands in batch processing.

past-architecture-of-the-daily-executive-dashboard-pipeline

Figure 1: Past architecture of the Daily Executive Dashboard pipeline

Amidst these underlying data infrastructure changes that allowed DED to survive its growing data needs and meet its SLA of 5:00 a.m. PT delivery, a key attribute of our data warehouse (DWH) was that it remained a closed system. This meant that a central team, Big Data Engineering (BDE), built and maintained the DWH’s data flows and datasets, including those for DED, as well as other use cases requested by partners such as Data Science, AI, and Business Insights teams. This became both a blessing and a curse. 

On one hand, the team could work closely with the database administrators to control prioritization and resources. For DED, this was especially useful in ensuring that its dependent data flows could receive higher priority in resources than non-DED data processes, such that they could meet DED’s strict SLA. Furthermore, several components in the data infrastructure also had closed access (e.g., AppWorx, Informatica) to a limited number of teams, which allowed for tight governance and ownership.

However, as data use cases and volume increased, this closed system became a bottleneck and single point of failure. With the overhead of maintaining a DWH that had grown to 1400+ datasets, the team recognized not only that we needed to consolidate to provide more leveragability, but also that, in order to effectively scale, partner teams would need the capability to build out their own data pipelines. This, coupled with the expensive costs of scaling and adding resources to a Teradata system, which we had started to already outgrow, made it apparent that we needed to move on from merely surviving to the next stage: scaling.

Part 2: Scale

By 2018, LinkedIn had developed and rolled out various data tooling and platforms that enabled us to decentralize the development of our data warehouse, move away from the usage of legacy systems, and cut down on licensing costs. At that time, Kafka at LinkedIn was processing 4.6 trillion incoming messages per day. It was critical that our data infrastructure continue to cost effectively scale to meet demand, uphold high reliability, and improve the productivity of our engineers. Namely, by then, technologies such as Hadoop, Spark, and Hive had matured both industry-wide and internally. Key internal developments, some of which are also open sourced, that DED and our DWH now leverage include:

  From: To:
Workflow automation
  • AppWorx: a licensed tool that was utilized for automating and scheduling data flows. A closed system, as only a subset of engineers had access.
  • Azkaban: an open sourced tool, developed within LinkedIn, that automates jobs on Hadoop. An open system, as any engineer at LinkedIn can develop and deploy workflows on Azkaban.
ETL (Extract Transform Load)
  • Informatica: a proprietary tool that performed ETL from various sources to targets. For example, it was used to ETL from the ODS on Oracle to the DWH on Teradata. A closed system, as only a subset of engineers had access.
  • Camus: an open sourced solution developed within LinkedIn to ingest Kafka into Hadoop. One of several platform-specific ingestion frameworks within LinkedIn at the time.
  • Brooklin: an open sourced, distributed system, developed within LinkedIn, that enables streaming data between various source and destination systems and change-capture for Espresso and Oracle.
  • Gobblin: a distributed data integration framework leveraged within LinkedIn since 2014 to ingest internal (e.g., Kafka, Oracle, Brooklin) and external data sources to Hadoop. Replaced platform-specific ingestion frameworks (e.g., Camus) to provide operability and extensibility. Has also made strides to enhance query performance, benefitting downstream ETL flows (e.g., FastIngest).
ETL/Ad-hoc querying languages
  • Teradata SQL
  • Pig: a data flow oriented programming language that was not chosen at the time of switching between Oracle and Teradata, but was later gradually adopted as our Hadoop infrastructure developed and warranted ETL/ad-hoc capabilities on large datasets.
Metric framework
  • Python, Unix, Java scripts/libraries: to develop and automate data flows that produced metrics/dimensions, various libraries were created to provide certain utilities, such as data triggers and date functions. However, these were primarily developed and utilized within individual teams, thus leading to duplicate/inconsistent frameworks.
  • Unified Metrics/Dimension Platforms (UMP/UDP): a common platform in which you can develop, maintain, and manage metric/dimension datasets. An open system, as any engineer can develop their own metric/dimension dataset use case within this centralized platform, thus also allowing for better governance.
Reporting
  • MicroStrategy: a licensed business intelligence tool that was utilized for creating and automating data visualizations, such as DED’s PDF report.
  • Retina: an internally developed reporting platform, custom fit to LinkedIn’s data visualization needs and supporting complex use cases, such as DED.

These shifts in our data infrastructure were monumental for use cases such as DED and our offline data warehouse. Migrating DED datasets and flows from the legacy stack to the new technologies and testing if they could meet DED’s strict SLA pushed the envelope. In being early adopters of this new tech stack, it became a mutually beneficial relationship, as the team could request features and improvements of these platforms that ultimately benefited the rest of the organization, too. For example, a crucial component of our originally closed system was priority and resource allocation to our DED flows. Born out of this need, “High Priority Flows” was implemented in the UMP/UDP framework and is now adopted for other critical data flows. 

This movement away from our legacy systems of Teradata, Appworx, and Informatica to open systems such as Hadoop, Azkaban, and UMP/UDP enabled decentralized dataset development within LinkedIn. There was no longer a single team bottlenecking metric and dimension dataset development. Instead, this enabled us to focus on consolidating the bloated DWH into fewer source of truth datasets, resulting in us serving DED metrics (as well as other metrics for the company) using only one-third as many datasets. This ultimately enabled greater leverage, data consistency, and governance in our DWH.

current-architecture-of-the-daily-executive-dashboard-pipeline

Figure 2: Current architecture of the Daily Executive Dashboard pipeline

All hands on deck
Adoption of this new architecture (DED on UMP) came with its own set of challenges. By the second quarter of adoption, we were only meeting DED’s SLA 81.13% of the time. The rate at which we were missing DED’s SLA was unacceptable, and thus, an all hands on deck began. Through this eight-month effort, the BDE, Site Reliability, and Spark teams worked closely together to implement solutions to the challenges at hand, improving SLA to 99.22% in the most recent quarter. 

Stability improvements
DED on UMP was originally set up as active-passive on two production Hadoop clusters, meaning the data flows that produced the necessary datasets and metrics for the final report ran in parallel on both clusters, while automation to generate and send out the report was only enabled in the active cluster. The process to failover to the passive cluster was a manual, hand-held process for the SRE team. To automate this process, a solution was implemented to send the DED report from whichever cluster first completed the full report generation and enable an active-active setup on the two production Hadoop clusters. 

However, in the event of a code change for a data flow that was simultaneously rolled out to both clusters leading to a break, or in the event of data inconsistencies between clusters, the purpose of an active-active setup would be defeated. Thus, implementing incremental cross-cluster rollouts and a canary for testing changes was essential to ensuring the stability of DED amidst data flow and platform (i.e., UMP/UDP) changes. Furthermore, we engineered solutions to compare hourly data output across both Hadoop clusters and proactively alert on potential issues in data quality, along with tools to monitor cross-cluster workloads and health.

Performance improvements
Out of the approximately 40 data flows involved in DED on UMP’s dependency graph at the time of this all hands on deck effort, only one did not increase in duration on both Hadoop clusters over the first seven months on the new infrastructure. On average, the flow durations increased by 60% during that time, even though the data input size of those flows had increased on average by only 11%. This indicated that we not only needed to optimize our data flows to support an increasing amount of data, but also holistically check our Hadoop platform for other issues that would contribute to this disproportionate increase in runtime. 

A variety of strategies were taken to optimize our long-running flows, depending on the underlying technology and infrastructure that were leveraged by the flows. Long-running data flows that were written in Pig, for instance, were migrated to Spark to give an average of 2-3X runtime improvement. The storage format of our upstream data was converted from Avro to a columnar format (e.g., ORC), which yielded up to 2.4X runtime improvement with no additional tuning. More than ten DED data flows that were already written in Spark were further optimized by leveraging Spark features such as materialized views and bucketed tables to reduce the need for shuffle operations and speculative execution to retry long-running Spark tasks. The LinkedIn Spark team also released enhancements to the Spark engine to improve shuffle efficiency (see blog post on Project Magnet). 

This holistic approach to improving flow performance and monitoring also involved evaluating and fixing issues on the Hadoop platform itself. For example, an elusive bug that resulted in a cumulation of orphaned cgroups (Linux control groups), which locked computing resources in the Linux kernel on Hadoop nodes, was diagnosed and fixed. By cleaning up these orphaned cgroups and thus releasing server resources, we could improve DED flows’ runtime dramatically without any code changes. In one instance, a DED flow that was running as slow as 3.5 hours saw an improvement of its average runtime to 30-40 minutes.

Granted, even with these infrastructure and code improvements, the performance of our flows could still fluctuate given the variance in the Hadoop clusters. Hadoop nodes can become bottlenecked on system resources that the ResourceManager doesn’t track (such as CPU, Disk IO, Network IO), negatively impacting flow performance and making it difficult to measure the impact of our aforementioned improvements. Thus, additional monitoring and tools were put in place to enable platform and ETL engineers to isolate system issues from performance issues. While we had numerous alerts in place for our Hadoop infrastructure, their main focus was on the server side. To measure how the performance of Hadoop jobs was impacted, a set of representative flows reading from fixed input data was implemented to run periodically throughout the day. These flows served as a proxy for variance in the Hadoop clusters and enabled the team to better interpret the results from testing the performance-related code and tuning changes.

Part 3: Thrive

Moving forward, the engineering innovations and challenges DED faces will not cease. It continues to be at the forefront of infrastructure migrations; for example, the DED team has been an early cloud computing adopter as part of LinkedIn’s effort to migrate to Azure. Furthermore, our DED flows are primarily written as daily batch processes, for which we are exploring micro-batching to spread processing over the entire day. This continuous exercise to ensure the performance and stability of our DED pipeline will also require implementing more productive and proactive alerting on data health, data flow performance, and data quality. 

The scope of this is far beyond just delivering a dashboard in our executives’ inboxes by 5:00 a.m., but rather codifying and pushing for best practices in ETL development and maintenance. By setting an example, other critical data pipelines have modeled after DED in high availability and disaster recovery through active-active and redundant setups. In summary, we will continue to collaborate closely with infrastructure and platform teams to keep pushing the boundaries and ensure that not just DED but the entire data ecosystem thrives.

If you are interested in working on cutting edge technology and on challenges to scale high visibility, business-critical projects, we would love to have the opportunity to work with you. Tip: to find opportunities related to the technologies and challenges mentioned in this post, use keywords such as “big data” or “distributed systems” in your LinkedIn job search.

Acknowledgments

It has been no small feat to develop and maintain the Daily Executive Dashboard, abstracting away the underlying architectural migrations to the end user and keeping its 5:00 a.m. SLA through all its changes. It takes a village, so thank you to the various teams involved: BDE, Data Flow/Ingestion SRE in the U.S. and Bangalore, ESR Data Science, Grid, Retina, and Spark. 

To the countless teammates involved in the development of DED, spanning from individual contributors to managers and directors, thank you for all your hard work, dedication, and leadership in not only building DED but also pushing our data ecosystem forward to where it is today.

Special thanks to Hong Liu, Pavlo Pyatetskyi, Hoanh Dai, Ketan Duvedi, Ray Shannon, David Lu, Sunitha Beeram, and Jan Bob for helping edit and review this blog post.