FastIngest: Low-latency Gobblin with Apache Iceberg and ORC format
January 6, 2021
Data analytics and AI power many business-critical use cases at LinkedIn. We need to ingest data in a timely and reliable way from a variety of sources, including Kafka, Oracle, and Espresso, bringing it into our Hadoop data lake for subsequent processing by AI and data science pipelines. We use Apache Gobblin for data integration, including to ingest time-sensitive Kafka events emitted when members take certain actions like viewing a page, sharing a post, or commenting on someone else’s post. Previously, we were running MapReduce-based batch jobs to pull hundreds of terabytes of data every day from thousands of different Kafka topics and ingest them into HDFS, with a latency of about 45 minutes.
Today, we are proud to introduce a new evolution of Gobblin, called “FastIngest,” that improves ingestion speed and efficiency, as well as query performance. As part of this update, we’ve developed a new streaming-based Gobblin Pipeline that reduces ingestion latency from 45 minutes to only 5 minutes, while also being resource consumption-friendly. In addition, Gobblin now supports the ORC file format and Iceberg metadata catalog to provide our data customers with high-query performance.
Since Iceberg is designed for slow-moving data and this is the first time it’s being used as metastore for fast-moving data with a separation of data and metadata, we want to share the key decisions and tradeoffs we made in designing FastIngest to help other companies with similar use cases navigate these issues.
Data flow at LinkedIn
Why do we say goodbye to old reliable batch mode?
As described in previous blog posts, Apache Gobblin is a data management framework that was designed and built to address all phases of data lifecycle management (ingestion, retention, compliance, and export) for enterprises with large numbers of data assets. In addition to its ability to ingest data on a massive scale, Gobblin is highly extensible and provides many attractive features such as auto-scalability, fault-tolerance, and the ability to handle data model evolution. These factors help make Gobblin the data integration framework of choice both at LinkedIn and several other companies.
MapReduce-based batch ingestion has served us well for more than five years to ingest a large swath of LinkedIn’s data at scale. A single execution of such a Gobblin job ingests a fixed amount of new data. At the beginning of each execution, the Gobblin Kafka batch connector determines the set of Kafka topic partitions to process and groups them into a set of tasks. Each task contains the offset ranges for one or more topic partitions to consume in the current run. The tasks then run on mappers to actually consume records from Kafka, as determined by the offset ranges, and write these records into HDFS. When tasks finish, the last consumed offsets for each topic partition are committed to a state store, which determines the starting offsets for the next run. For more details about Gobblin’s architecture, please refer to the documentation here.
The batch mode, while reliable and simpler to operate in production compared to an “always-running” pipeline, does suffer from several limitations that make it hard (if not impossible) to guarantee low-latency:
- In MapReduce (MR) mode, each Gobblin execution incurs overhead associated with setting up a new YARN application, i.e., acquiring containers and copying jars. This overhead can be amortized if the containers can be preserved for longer periods of time.
- Since different topic partitions have different volumes of data, despite the even distribution of tasks, there can still be skews in mapper execution times due to unevenness in data volume and variance in load across different hosts. In addition, Gobblin jobs execute with job-level concurrency disabled to maintain correctness. Thus, the slowest mapper will determine the latency for all of the topic partitions in a single job.
In addition to the above-mentioned limitations, our Kafka ingestion pipeline suffered from two additional limitations unrelated to the batch mode of execution.
First, the pipeline landed the data in Avro storage format. This resulted in an additional conversion pipeline to convert the row-oriented Avro formatted data to the more efficient columnar storage format, like ORC. This additional step introduced hours of latency for Presto and Spark pipelines that depend on the efficiencies provided by the ORC storage format.
Second, the ingestion pipeline registered hourly partitions in a Hive table as it wrote new data to HDFS. However, the Hive table format does not guarantee read/write isolation. Worse, to ensure read/write isolation, downstream pipelines artificially introduce latency in their flows to ensure all data for a given hourly partition is complete.
To address these limitations, we embarked on building FastIngest: a Gobblin-based Kafka-to-HDFS pipeline that:
- Continuously writes to HDFS using Gobblin’s streaming mode,
- Directly writes in ORC columnar format, and
- Uses Apache Iceberg’s table format to register metadata that guarantees read/write isolation and allows downstream pipelines to consume data incrementally on HDFS.
We will focus on the data ingestion and briefly introduce metadata ingestion in this blog post, and will discuss metadata ingestion in greater length in a future post.
Hello, Gobblin FastIngest
In production, the data ingestion pipeline of FastIngest runs as a Gobblin-on-Yarn application that uses Apache Helix for managing a cluster of Gobblin workers to continually pull data from Kafka and directly write data in ORC format into HDFS with a configurable latency. At LinkedIn, we set this latency to 5 minutes based on experimentation and measurements of file counts on HDFS. Setting the latency to a lower value, while providing a better user experience, will produce more files, since one file is created per latency period. The architectural diagram below shows the main components of the pipeline and the interactions between them:
The Gobblin-on-Yarn application is launched via an application launcher running as an Azkaban driver job (shown as AzkabanGobblinYarnAppLauncher in the figure). The GobblinYarnAppLauncher is also responsible for creating a new Helix cluster, if necessary.
The Yarn RM starts up a GobblinApplicationMaster (AM) instance, which in turn initializes a locally hosted YarnService. This service is responsible for auto-scaling the application by requesting new Yarn containers as well as releasing unused containers as needed. The Gobblin AM, along with the Yarn containers it launches, joins the Helix cluster created by the GobblinYarnAppLauncher. The Gobblin AM loads the Gobblin data ingestion job config files, which are submitted as a Yarn resource. A Gobblin Kafka source running inside the AM then queries Kafka to get topic partition information and creates ingestion tasks, which are distributed to the Gobblin workers running inside the Yarn containers using Helix.
Each Gobblin task running inside a Yarn container reads data from one or more Kafka partitions, and writes data to files on HDFS. Periodically, each task:
- Publishes the output to HDFS.
- Emits a Gobblin Metadata Change Event to Kafka to be consumed by a metadata ingestion pipeline (more on this later).
- Commits Kafka offsets to a watermark store backed by Zookeeper.
- Emits tracking events and metrics that are consumed by a downstream pipeline for monitoring the health of the pipeline.
In migrating our data ingestion pipeline from the batch mode to the new architecture and providing low-latency ingestion as well as high-query performance, we had to address a number of technical challenges, the most important of which are summarized below.
|Worksize estimation||Start/End offsets are known before execution starts||End offset unknown; Worksize computed based on prediction of future traffic; Traffic may change and require rebalance|
|Data writer||Ingest in Avro mode; Convert to ORC in another job||Directly ingest with ORC; Performance challenge with Hive ORC writer|
|Continuous data publish||Publish upon completion; Fetch schema on start; Fixed schema during the run||Tasks are long-running; Tasks need to publish data periodically; Schema may change|
|Monitoring||Short-running job, has a summary for all MR when job finishes||Thousands of containers; Long-running job and no summary; Hard to detect issues during run|
|Metadata registration||Register with Hive; Job driver registers partitions after task completion||Register both Hive and Iceberg; Periodic commits inside task|
Balanced work allocation
We needed a strategy to establish the work size for each topic partition and evenly distribute them across Gobblin workers, in order to keep up with the volume of incoming data while also having an efficient usage of resources. Also, we needed to rebalance the work dynamically because the data volume could vary from time to time.
Currently, the Gobblin framework does not support dynamically adding or modifying tasks of a running job. In order to trigger recomputation and rebalancing of work units across workers, we introduced the notion of a Replanner: a scheduled Azkaban job that detects environment changes, such as variations in data volume, consumption stalls, and new topic partition additions, and determines whether to restart the running Gobblin jobs. Once it determines that a Gobblin job should be replanned, the replanner writes the new job configuration file to a well-defined HDFS location. The ApplicationMaster (AM) periodically polls the HDFS location for new configuration files and restarts the data ingestion job(s) when it finds one.
The interaction between the replanner and a Gobblin cluster can be depicted as follows:
Since the containers are already provisioned (except for new container allocations needed for cluster expansion), job restarts in a Gobblin-on-Yarn cluster are lightweight and bounded by the time required by Helix to do task assignments.
Besides the replanner, the Kafka source is also enhanced to use historical statistics (such as rate of traffic production and average record size) to estimate the work size of each topic partition and evenly distribute work across Gobblin workers. The traffic production into a Kafka topic is tracked by hour of day and day of week, to take into account daily and weekly variances. The traffic estimates and the record sizes are tracked and updated by the data pipeline inside the Kafka extractor (which is responsible for polling records from Kafka) and written to the watermark store along with the offsets when data is flushed.
Direct ORC ingest
A key decision we made as we began working on the new pipeline architecture was to write the data out directly in ORC format. This decision was prompted by the fact that we wanted to bring the benefits of the ORC format, such as improved I/O efficiency and predicate pushdown capabilities, to the compute engines like Spark and Presto, without having to incur the additional latency of the Avro-to-ORC conversion pipeline.
Given the goal of writing data directly in ORC format with low latency, we performed benchmarking experiments to compare the write performance of different ORC writer implementations and against writer configurations. With the results of the experiments (which are consistent with other published studies investigating ORC write performance), we finally decided to use native ORC Writer without enabling dictionary encoding. We concluded that disabling dictionary encoding was an acceptable tradeoff between the writer performance and the query performance. The main reasons are two-fold:
- The data landed by FastIngest is re-materialized by a daily compaction job, which enables dictionary encoding when writing out ORC data.
- Only a small fraction of the data queried by a user (i.e., the “recent” data) will typically be in a sub-optimal format.
Continuous data publish
Batch pipelines, which have a finite work size, publish data when tasks finish, fetch the latest schema at the start of the job, and use this schema to write out the data on HDFS. In contrast, the streaming pipeline, which has long-running tasks, must be able to publish data periodically and handle schema changes dynamically.
To this end, we introduced the concept of “Control Messages,” which are injected into the event stream and passed down the Gobblin construct chain. Each Gobblin construct (such as the converter and the writer) is enhanced to handle these messages when operating in the streaming mode.
In the case of a flush, which happens periodically (every five minutes at LinkedIn, based on our latency settings), the Kafka extractor injects a “Flush” control message at regular intervals as defined by the ingestion SLA. This message is injected as a record in the Kafka events stream, and when the downstream writer sees the control message, it closes the current file and acknowledges the control message. After the control message is “acked,” the extractor publishes the data files and then the checkpoint offsets.
To handle schema changes, we have a special converter that tracks the latest schema from Kafka Schema Registry. In addition, it caches all the schemas it has seen so far since the beginning of the current execution. When seeing a record with schema change, it updates its latest schema and sends a “MetadataUpdate” control message to signal the downstream writer to close the file. If it receives a record with an old schema, it up-converts the record to the latest schema.
Monitoring and alerting
The data ingestion pipeline is instrumented to emit tracking events into Kafka, which are used to monitor the health of the ingestion for each topic partition. The tracking events, which are emitted by each Gobblin task, contain information such as low/high watermark, bytes consumed, record counts, and ingestion latency. In addition, the events contain information, such as the Yarn container ID, that allows pipeline operators to quickly locate a problematic container. Also, each Yarn container reports container health metrics like process CPU load, overall system load, memory usage, and GCcounts and durations. A separate Samza-based monitoring pipeline is used to consume these tracking events, populate a Superset dashboard, and alerts on pipeline health issues. The figure below describes the overall architecture of the monitoring sub-system.
For more details about our monitoring infrastructure, check out our previous blog post, "An inside look at LinkedIn’s data pipeline monitoring system."
Async metadata publishing
As described earlier, each Gobblin task commits data every five minutes. The data written to HDFS is accessed by compute engines such as Presto and Spark via metadata published by the data ingestion pipeline. Publishing metadata “inline” (and in particular, Iceberg metadata) along with the data has the following drawbacks:
- Too many concurrent Iceberg commits, resulting in frequent conflicts and retries, adding to the latency of metadata publishing (sometimes on the order of minutes),
- Too many metadata files exacerbating the small file problem on HDFS. Further, compute engines have to read out a large number of Iceberg manifests to calculate file splits and data diffs.
To address these limitations, we made a key design decision to asynchronously register metadata. Our main idea is to introduce a separate metadata ingestion pipeline that consumes file publish events (referred to as Gobblin Metadata Change Events or GMCE) emitted by the data pipeline, aggregates them, and registers metadata at periodic intervals. By consuming the GMCEs from Kafka, we were able to leverage the streaming pipeline architecture of the data pipeline for the metadata ingestion. The interaction between the data pipeline and the metadata pipeline can be visually depicted as follows:
Metadata ingestion pipelines leverage most of the architecture of streaming data ingestion pipelines, just using a different Gobblin writer to register into a metadata store instead of writing into HDFS.
Tradeoffs between batch and FastIngest
In conclusion, migration to a streaming mode significantly reduces ingestion latency. It also yields other operability benefits, such as elimination of the spiky access patterns on Kafka servers. Writing data out directly in ORC format allowed us to deprecate an additional data conversion pipeline and reduced operability overhead of managing this pipeline. What’s more, by migrating to a new table format, i.e., Iceberg, we enabled snapshot isolation and incremental processing capabilities in our offline data infrastructure.
Supporting streaming mode, however, brings a number of challenges. For instance, the pipeline requires a dedicated allocation of resources in our multi-tenant compute cluster. Further, there is a higher degree of development effort involved in making the pipeline robust and operable compared to the batch pipeline.
Currently, the Gobblin framework only supports computing work units at the start of the job. For now, we overcome this limitation via a replanner job that forces job restarts in response to traffic pattern changes. However, job restarts are expensive, since a given job may be processing multiple topics in addition to the ones experiencing spikes. We are working on providing a fine-grained, container-level re-planning so that the pipeline will be able to add or modify the tasks to a running job to rebalance partitions.
Special thanks to Sudarshan Vasudevan and Shirshanka Das for their technical leadership, and to Lei Sun and Hung Tran for their contributions to this project. Also we want to thank Krishnan Raman, Joey Salacup, and Jayavel Sengodan for their contributions to the monitoring infrastructure, which was instrumental in making this pipeline robust and operational.