Announcing Gobblin 0.7.0: Going Beyond Ingestion

About a year ago, we open sourced Gobblin, a universal data ingestion framework that aimed to solve data integration challenges faced by people working on big data problems.

We have described how LinkedIn is using Gobblin to ingest data at massive scale from a variety of sources to HDFS, in many previous blog posts, publications, and talks. Today, we are very excited to introduce several new features in Gobblin’s latest release, which are important steps in Gobblin’s evolution from a data ingestion tool to a data lifecycle management tool. In addition to ingesting an ever increasing amount of data, the latest release of Gobblin now provides several value-added features around the entire data lifecycle management stack.

After achieving success at ingesting a large swath of LinkedIn’s data at scale, it became apparent to us that we needed to turn our attention towards making it easy to manage this large amount of data.

Here is a sampling of the most common requirements that we were being asked to address:

  1. Can you also copy this data onto these other Hadoop clusters?
  2. Can you purge some rows for compliance reasons? Can this be done continuously?
  3. Can the data be automatically registered with Hive, Presto, etc.?
  4. Can you provide certain datasets in a more optimal format like ORC?
  5. Can you retain data for a period of time and then purge it on an ongoing basis?
  6. Can you guarantee that the data doesn’t have duplicates?
  7. Can I easily declare lifecycle management policies that apply to datasets, dataset groups and clusters?

While many of the questions above don’t directly seem ingestion-related, we found that these requirements often involve data ingestion or data movement as a prerequisite. For example, a requirement to have three copies of our PageView dataset on Hadoop spread across the globe required setting up pair-wise HDFS to HDFS copies between clusters. Similarly, tasks like purging records for compliance need the same scalability, fault-tolerance, and state management capabilities as data ingestion jobs. To address these commonalities, we’ve layered Gobblin’s architecture logically as shown in the figure below.

The Dataset Definition abstractions are at the lowest level and work with basic HDFS, Hive-based systems, as well as custom catalogs like Dali (LinkedIn’s Dataset API for Hadoop). Dataset Operators are building blocks of data management such as ingestion, replication, and retention, that do the bulk of the heavy lifting and interact with the data systems through the dataset definition layer. Finally, there is a Data Management layer that provides declarative configuration to orchestrate data ingestion, movement, and lifecycle processing. Let’s revisit the earlier requirement of keeping data synced across multiple clusters and see how the new features help us solve this elegantly.

DistCp is a command line tool provided by Hadoop to copy data between Hadoop clusters. For a long time, we had been using a souped up version of DistCp, which had slightly enhanced capabilities built into a wrapper layer. Even with the wrapper layer, DistCp had several limitations. First, it is expensive—all file listing is done through the namenode. Secondly, when one file causes a failure, the entire job fails. Finally, it does not have the notion of a data set; all copies are done with file level semantics. Consequently, there were recurring operational support issues with maintaining our DistCp pipeline. Making things worse, DistCp doesn’t provide operational metrics that could help shed light when addressing support issues.

The general and extensible nature of the Gobblin framework allowed us to solve all the challenges above. When designing the Gobblin-based DistCp pipeline, we opted for a byte-oriented copying approach instead of record-oriented copying to keep the pipeline efficient. Consequently, we added constructs to traditional Gobblin flow that allow representing input streams as records. One fundamental limitation that our Gobblin-based implementation solved was the efficient listing of files that are to be copied. Gobblin contains a DatasetsFinder interface that serves as an extension point for optimizing the discovery of datasets to copy. We are no longer constrained to relying on the Hadoop ls call (which can be very expensive for the Hadoop Namenode) for file listings. Depending on the use case, we query the list of files that have changed through calls to the Hive metastore or in the future, by watching the HDFS edit log. Further, dataset aware optimizations are easy to incorporate. Because Gobblin 0.7 has dataset management as a first class building block, it is easy to provision for fault isolation minimizing the impact of failures in one dataset from affecting others. We are already observing far fewer operational issues as a result. Working on top of Gobblin allowed us to leverage Gobblin metrics that automatically emits notifications for any failures as well as whenever data is available. This has greatly eased the operational burden of monitoring our numerous intercluster copy flows. The gist below highlights how to setup a Hive based table copy between clusters.

After solving the single-pair data copy problem, we focused on simplifying the management of multi-cluster data copies. When there are multiple clusters each running multiple jobs, and each job operating on multiple datasets, ease of configuration is critical. Different clusters, different jobs, and different datasets will typically share a lot of common configuration, yet in some cases, have their own unique configuration. Our goal was to design a backend and a flexible client library for storing, managing, and accessing configuration that can be used to customize the processing of thousands of datasets across multiple clusters.

Gobblin’s configuration management module, which is an extension of Typesafe Config, is an attempt at solving this problem. It uses an immutable versioned configuration store which represents the configuration properties as a hierarchical structure. This allows us to provide global default values and dataset specific overrides, as well as the ability to group configuration values that are specific to a group of datasets by using tags. For example, we can tag a group of datasets as “high-priority” and associate specific configuration values with them. This enables the reduction of properties across clusters, jobs, and datasets. Along with the ability to rollback and audit every change, global configuration management is much less prone to errors.

The figures below shows an example of using this configuration mechanism to set up retention on two clusters (which we’re calling C1 and C2) with special provisions for datasets (D1 and D2) so that we have:

  • Default retention of 30 days for all datasets on cluster C1
  • Default retention of 2 years for cluster C2
  • Disable retention of dataset D2 on cluster C1
  • Retention of 3 years on dataset D1 on cluster C2

This was a short preview of the new capabilities in Gobblin. Gobblin 0.7.0 features a JDBC Writer to ingest data into any RDBMS, automatic Hive registration of ingested datasets along with a bunch of other improvements and bug fixes. If you want to learn more, please join us for lively discussion and food and drinks at the second Gobblin meetup hosted on June 30th at LinkedIn’s Mountain View offices. We will go into greater detail about several of the topics mentioned in the post.

The Gobblin team includes Chavdar Botev, Issac Buenrostro, Pradhan Cadabam, Ying Dai, Abhishek Tiwari, Min Tu, and Vasanth Rajamani. Special thanks to Shrikanth Shankar, Kapil Surlaker, Shirshanka Das and Igor Perisic for their leadership, support and encouragement.

Thanks to Jinhyuk Chang who contributed the JDBC writer. Also, special thanks to our alumni Ziyang Liu and Sahil Takiar and our external committers, many of whom have contributed extremely important modules or fixed critical bugs: Joel “kadaan” Baranick, Eric Ogren, Andrew Hollenbach, Akshay Nanavati and Lorand Bendig.