An inside look at LinkedIn’s data pipeline monitoring system
October 30, 2019
Monitoring big data pipelines often equates to waiting for a long-running batch job to complete and observing the status of the execution. The status can result in “Failed” or “Successful” or even “Incomplete.” From there, it’s the team’s job to understand the impact and troubleshoot the situation to identify a solution. This way of monitoring isn’t effective given the lack of visibility into the job progress and the amount of time involved in waiting for a job to complete.
In this blog, we explain how we designed an architecture to logically break down a big data pipeline into smaller measurable segments using events. We will walk you through an example of Kafka Ingestion Pipeline to illustrate the time and resources saved.
The pain point
After a migration effort, our Kafka data ingestion pipelines bootstrapped every Kafka topic that had been ingested up to four days prior. At the time, our monitoring was primarily focused on the pass and fail status of the job via a home-grown Azkaban job monitoring dashboard called Kyaraben. The colored icons next to each job represented the status of the last five job runs: green representing success, red representing failure, and blue representing a running job in progress.
Although the jobs were successfully completed, the ingestion jobs were deemed unhealthy under the hood. However, we didn’t have the metrics to tell us that we had experienced a bootstrap problem. All we knew was that the watermarks indicating where we were in the ingestion of a Kafka topic were no longer moving forward. Therefore, we often didn’t notice the problems until 1.5 days later when the downstream consumers—in this case, our engineers and data analysts—began to ask us why their data was unavailable. We quickly rolled back the change, but given the consumption rates into Hadoop Distributed File System (HDFS) at that time, the ingestion took almost two weeks to catch up after being behind 1.5 days.
From the time of this outage, we began to improve our monitoring capabilities. We lacked real-time monitoring and clarity in the resulting insights.
The scale of our monitoring use case
The data pipeline seemed like a black box and left many questions unanswered:
Did the job execution actually complete all the tasks successfully?
Did some datasets fail to be processed?
How long has the job not been processing any data?
Why is it taking so long to ingest data?
When will my data be available?
Is there an upstream issue that is causing delay in ingestion?
Is the pipeline impacted by environment instability like Hadoop slowness or Kafka replication issues?
We decided to tackle these problems using Run Time Job Level events and a set of aggregated Dataset level events.
Leveraging Gobblin events
The data ingestion into Hadoop is predominantly facilitated by Apache Gobblin, a distributed data ingestion framework. We use Gobblin to ingest data from several sources, the larger ones being Kafka, Espresso, and Oracle.
Gobblin events are fire-and-forget reports of milestones of the execution, enriched by metadata relevant to that milestone with context information derived from metadata tags. The various events are exposed as timers, counters, and gauges. From a Hadoop job perspective, they can be aggregated to provide the full picture.
For example, every time we finish processing a file, a “FilePublished” event is emitted to Kafka containing detailed information like the number of records read, number of records written, and the location where the file was published. These events can be used to get historical information on past executions, as well as detect and report failures.
Job level runtime monitoring
A typical batch job processes hundreds of datasets and can run for hours, depending on the size of the dataset. In order to understand what happens under the hood, we model the long running job as composed of multiple logical steps specific to the data pipeline. Gobblin emits job progression event specific to the step along with timing information.
These events are captured in time series and help us understand “when” an event occurred and “how long” it took to complete a logical step.
Below are some examples of events.
WorkUnit Creation Timer: Indicated the amount of time required to create Gobblin work units. The context varies depending on the source (i.e., Kafka, HDFS)
Job MapReduce(MR) RunEvent: Indicates how long it took for a MR Application to complete.
Job Run Timer: Indicates how long it took to complete Hive registration for datasets.
TaskFailed Event: Indicates that a Gobblin task has failed and includes the metadata that contains information about the specific datasets and the failure cause.
Unlike traditional metrics like load average, QPS, or latency, most batch metrics are not able to alert users on the basis of a single metric and need correlation between various metrics to understand what is happening to arrive at a potential root cause or avoid false alerts.
Let’s explore some scenarios.
Auto-triage issues: When a long running job triggers an alert using our internal automated health monitoring system, AutoAlerts, we check a sequence of other alerts before arriving at an appropriate cause.
As illustrated above, we create a plan using Nurse, an auto-remediation tool used widely within LinkedIn. Nurse plans are YAML-driven and typically consist of a sequence of steps to sequentially test the state of alerts (on whether they are set or unset) and branch out based on the results. This plan forms the basis of correlation of various other alerts in testing. In the above example, we have extended the correlation checks to Hadoop infrastructure metrics that were initially surfaced as delays in Gobblin JobRun Event.
Snippet from a Nurse plan
Avoiding false alerts: It is possible that we do not see any work units, a Gobblin-specific term that indicates the data to be processed, created for an extended period of time. This leads to the conclusion that the job is slow or hung. We can correlate with datasets/files “publish” events to make sure that there is still progress at the rate we expect and then possibly take the appropriate remediation.
Aggregated dataset level monitoring
To monitor ingestion at a dataset level, let’s take the example of clickstream data from various LinkedIn applications. The apps write to Kafka tiers which are then written into HDFS by Gobblin.
We aggregated the data from various metrics to understand key metrics:
Data ingestion lag
Ingestion Lag: Ingestion lag is defined as the number of source hours ingested within a given ingestion hour. A source hour is the full hour of the clock time when the data was produced. The ingestion hour is the full hour when it was ingested into Hadoop. Each source hour can be ingested across multiple ingestion hours.
For example, when the time is 9:30 a.m. (ingestion hour: 9), we can be ingesting data produced at 8:10 a.m. (source hour: 8). The ingestion lag gives insights into when in an event timeline our data is being consumed. In order to derive the lag information, we need to aggregate events from HDFS audit events.
We use a customized version of Flume to parse the raw HDFS log data into custom metric events. Running on the Hadoop NameNode, the Flume process parses the local HDFS audit event log, transforms them to Avro events, and emits them to Kafka.
We visualize lag by stacking bars representing the number of records ingested for a given source hour into columns representing the Gobblin ingestion hour. The most recent source hour is at the top of the stack and moves down to older source hours. As seen below, the “normal” graph shows that we are primarily ingesting two source hours per Gobblin ingestion hour and most ingestion is occurring in the top most recent source hour.
The graph on the right is labeled “abnormal” due to multiple older source hours being ingested per ingestion hour, which is an indicator of lag. Prolonged lag will force Gobblin to spend less attention consuming data for the most current source hour. The column to the far right in the abnormal graph shows that more attention is being spent for the previous source hour. When not kept in check, lag can lead to SLA misses for a given topic.
Data comes from Gobblin events that show record counts available at the source for a given mapper task and HDFS auditing events that show total records written per mapper task.
For a known ingestion time range, did any mappers during that time have data loss? When ingestion windows are known for a source hour, data loss can be used to guarantee that all records that arrived for a given source hour were written into HDFS.
Looking at the data from Gobblin events and HDFS auditing events, we can pinpoint which mapper tasks are losing data and, sometimes, report why. In the below MR task list showing data loss, all loss for the above tasks are caused by Avro decode errors. Another reason behind data loss can be due to late data from upstream sources. For each instance of data loss, we also have links to the consumption job, the MR job, and MR task attempt logs to investigate further into why the data loss occurred. Given that each job runs hundreds of times a day and produces several hundred mapper logs each, being able to pinpoint the exact log showing data loss is invaluable for the debugging process.
Now that we have the lag and the loss metrics, how do we measure the data availability for an event at a specific hour? We make use of two methods.
The faster method
In the faster method, data is scraped from the Kafka audit rest API in 5-minute intervals for the previous hour and provides times for stabilized message counts for both the producer and Kafka tiers.
This data is used to calculate:
Percentage available in HDFS as compared to the Kafka tier (available once we’ve tied data in with HDFS Auditing Events)
Late Kafka arrivals
Time to write data into HDFS once data has stopped updating in Kafka
The above metric is required if we want to know when a percentage of total data was available in HDFS.
With this method, we can usually certify a pass in as little as 15 minutes after the end of a source hour, but there are a few issues with this “faster” method:
Kafka Audit, which is an internal application used at LinkedIn to publish counts of events, relies on Samza that makes use of the “at least once” semantics. This can result in counting the same record more than once, which causes the producer or Kafka tiers to show more records than actually produced.
The method depends on costly operations. The audit consumers must open every message and parse the audit header for the timestamp.
The timestamp used by Kafka audit and the timestamp used to group the source hour directory on HDFS are not the same. They are usually milliseconds in difference, which causes small inaccuracies that do not usually cause problems for SLAs. With some events, however, this can cause significant differences, such as when a mobile device sets the source timestamp. Due to this alignment issue, some events will produce false negatives.
The slower method (also known as the Consumption Test method)
In order to alleviate some of the above concerns, we rely on a slower, but more precise method that makes use of three metrics calculated from Gobblin Tracking events and HDFS auditing events: the topic level lag metric, timestamp of the last file written into HDFS, and data loss.
The first metric checked for a given source hour is the topic level lag metric. This metric validates that the ingestion pipeline has moved past Kafka watermarks for the source hour in question by checking that the source hour was a minority contributor (less than 10% of total ingestion) for the ingestion hour occurring between two to three hours after the start of the source hour and that the topic ingestion lag was healthy. This method is slower due to the use of the lag metric. We need to look for lag for the source hour in question at the next whole hour after the 150-minute time window terminates or after three hours from the start of the source hour.
In the above example for the ingestion hour from 9 a.m. to 10 a.m., we are performing the lag test for the 7 a.m. source hour. The test on the left resulted in a pass due to the 7 a.m. hour only contributing 5% of the total ingestion and the majority of consumption was occurring for the 9 a.m. hour. The test on the right shows a failure because the 7 a.m. hour was not a minority contributor and the majority consumption was not occurring at the 9 a.m. hour.
The second metric that we look at comes from HDFS auditing events and is the timestamp of the last file written into HDFS. The timestamp shows if we have met the time-based requirement of our data availability requirements.
The final metric that is checked to validate data availability is data loss. Given the window of time that the source hour was consumed begins at the start of the source hour and ends at the time when the last file was written into HDFS, there should be no mappers associated with the topic source hour in regards to data loss.
If all three tests pass, we mark that the source hour as passed.
In the above example dashboard, the column labeled “HDFS SLA” marks source hours that have passed SLA. Hours tagged with just “PASS” make use of the faster method for calculating SLA. Hours tagged with “PASS (CT)” make use of the slower “Consumption Test” method.
What a data availability alert looks like
Now that we have the job and the dataset level metrics captured in TimeSeries Visualization and the Apache Superset, we will describe how these are tied together to serve as a platform for monitoring and alerting.
Samza: Samza is a distributed stream processing framework that is utilized to ingest Gobblin metrics. Samza can stream metrics from multiple Kafka topics and can be deployed on Hadoop YARN. We use Samza to aggregate Gobblin events using the windowing functionality of Samza and emit the aggregates to a time series visualization. There is also work underway to move other aggregations done on a dataset level to Samza SQL. Apache Samza is an open source project created at LinkedIn.
Apache Superset: Superset is a web-based application with a rich set of data visualizations based on D3 charts and a Flask backend. It also provides a rich set of data visualizations that is used to view aggregated metrics. Superset is backed by MySQL and is used to store both raw and aggregated metrics that we read from Kafka. The computed metrics are then populated as visualizations in various dashboards that are used by various offline/streaming teams to understand Gobblin ingestion status. The dataset level monitoring is completely visualized with Superset.
We also built a number of REST APIs based on the aggregated metrics in Superset that returns dataset information for a specific alert, which is then used to provide context and passed on to the remediation framework.
Nurse: Developed at LinkedIn, Nurse is an event-driven automation platform for workflow execution, data gathering and data remediation that provides integrations with various other infrastructure tools. We decided to extend Nurse to the offline world by adding a connector to Azkaban that serves as the Hadoop job scheduler. We created Nurse plans to correlate various alerts triggered by Superset and AMF and have used it as our engine to drive our remediation efforts.
Given the number of job runs processed on Hadoop, the need for auto remediation is quite high. Monitoring individual jobs can be tiring and error prone when issues are not properly diagnosed and when they happen intermittently at odd hours.
When a job execution does not seem to be making any progress, we need to take thread dumps in timed intervals to identify the root cause. Only once these are done and other logs are captured can we consider restarting the job.
When a problematic dataset arises in the job, we can programmatically blacklist it until corrective measures are taken.
In cases where metrics indicate that we do not have any issue with ingestion of a dataset or potential issue with Kafka audit counts or upstream, we can override the job parameters in Azkaban at runtime thereby mitigating the false alert and taking corrective action.
Once we identify a potential issue through Nurse after the correlation process, we try to remediate it by taking an appropriate action. The APIs that we built on top of Superset are able to obtain the dataset information or context necessary to act. An example is overriding Azkaban job level parameters automatically using flow overrides and launching ad hoc jobs to remediate issues.
Looking ahead, one of the next items we'll be focusing on is extending our auto remediation efforts to add functionality to potentially blacklist datasets that have been identified as bad actors in the data pipeline within our onboarding services.
As data at LinkedIn continues to grow and the number of teams depending on these datasets increase, we must be able to measure dataset SLO and publish datasets in a timely manner. Monitoring and alerting data pipelines in real time has been instrumental for pinpointing bottlenecks and helping the oncall engineer to quickly identify and triage issues without spending time digging into the logs.
In developing this framework, we leveraged most of the tools built at LinkedIn to detect data ingestion problems. This has enabled us to focus on developing applications and designing alerting plans for specific parts of the data pipeline rather than working on creating a monitoring infrastructure.
A big thank you to the Data Ingestion-SRE and ETL-Infra teams for all the great work that has been done to make data pipeline monitoring successful at LinkedIn. A huge shout out to Matt Knecht and Keshav Tyagi on their guidance on Nurse, as well as the other partner teams like Samza and Monitoring-Infra for helping us build the platform. Thanks to the leadership team, Jayavel Sengodan and Sandhya Ramu, for their continued support and investment in the platform. Lastly, a final thanks to our partners from the Engineering Communications team for reviewing the blog content: Anne Trapasso, Szczepan Faber, and Jaren Anderson.