Bridging Batch and Streaming Data Ingestion with Gobblin

September 28, 2015

Genesis


Less than a year ago, we introduced Gobblin, a unified ingestion framework, to the world of Big Data. Since then, we’ve shared ongoing progress through a talk at Hadoop Summit and a paper at VLDB. Today, we’re announcing the open source release of Gobblin 0.5.0, a big milestone that includes Apache Kafka integration.

Our motivations for building Gobblin stemmed from our operational challenges in building and maintaining disparate pipelines for different data sources across batch and streaming ecosystems. At one point, we were running more than 15 different kinds of pipelines, each with their own idiosyncrasies around error modes, data quality capabilities, scaling, and performance characteristics. Our guiding vision for Gobblin has been to build a framework that can support data movement across streaming and batch sources and sinks without requiring a specific persistence or storage technology.

Our first target sink was Hadoop’s ubiquitous HDFS storage system and that has been our focus for most of last year. All of LinkedIn’s data (hundreds of terabytes per day) needs to get aggregated into Hadoop before being combined in interesting ways to build insightful data products, surface meaningful business insights for executive and analyst reporting, and provide experimentation-focused analysis. At LinkedIn, Gobblin is currently integrated with more than a dozen data sources including Salesforce, Google Analytics, Amazon S3, Oracle, LinkedIn Espresso, MySQL, SQL Server, SFTP, Apache Kafka, patent and publication sources, CommonCrawl, etc.

Open Source


We open-sourced Gobblin earlier this year and we’re excited by the amount of engagement and activity on GitHub as well as our discussion group since the very early days. In the past few months, contributors from different companies and continents have committed important bug fixes. Additionally, the community has contributed important features such as a byte-oriented Kafka extractor and S3 integration. The 0.5.0 release has two big features: a) production-grade integration with Kafka as a data source and b) support for operational monitoring and metadata integration.

Bye Bye Camus, Hello Gobblin


Camus was built by LinkedIn specifically to get Kafka data into Hadoop. However, over the years it has accumulated a fair bit of technical debt which would have taken us quite a bit of work to unwind and would be duplicative of work that we’re already doing in Gobblin. Most of the issues were related to operability, data integrity and flexibility to take advantage of different execution frameworks.

In the past few months, we’ve integrated Kafka as a supported data source for Gobblin. The figure above illustrates the Gobblin operator pipeline for Kafka ingestion. Compared to Camus, this gives us better support for robust hourly compaction, simpler configuration and overall uniformity in debugging and analysing ingestion performance and failures across all source types.

At LinkedIn, Gobblin is currently ingesting about a thousand Kafka topics that stream an aggregate of hundreds of terabytes per day. Over the next quarter, we plan to migrate all Camus flows into Gobblin. The current execution framework that we’re running in production is based on MapReduce but this lays the foundation for us to move to different frameworks in the near future.

What’s Next: The Path to Continuous Ingestion


One of the biggest challenges with building a single ingestion framework for both batch and streaming is dealing with impedance mismatches between the source, the sink and the execution environment.

As described earlier, currently in production, we run MapReduce based batch ingestion jobs every 10 minutes on Hadoop to pull data from Kafka into HDFS and publish the data every hour. This has served us well because these batches are simple idempotent retriable units of work. However there is an interesting efficiency cost to this. Every time a batch gets set up, it needs to acquire schemas for all of the topics it is going to ingest, work with the resource scheduler to set up mappers, once mapper slots are acquired, start up the JVM-s, pull data down for a few mins, persist checkpoints to disk and then tear down. This cycle repeats every 10 minutes. We observed that during really busy periods in the cluster, we were spending a lot of time in the setup phase compared to how long we were actually ingesting.

This motivated us to move to a classic streaming-based model where we could ingest continuously. However, there is a different efficiency cost here; you can provision your ingestion job to support the average throughput of the aggregate streams but data lag will suffer during peak times. If you provision for maximum throughput, resource utilization will be sub-optimal during off-peak times. Since we run these jobs in a shared hugely multi-tenant Hadoop cluster, we don’t want to hog resources without good reason. The ideal deployment scenario is where we can deploy Gobblin in continuous ingestion mode with the option to elastically expand or shrink the cluster as incoming data increases and decreases to maintain a configurable data lag.

To implement this, we are leveraging two projects, Apache YARN for macro-level container allocation and Apache Helix for micro-level resource assignment, fault-tolerance and re-allocation. We’ve previously talked about how these two projects can be combined to create auto-scaling distributed systems. Helix allows us to bin-pack work-units within the acquired YARN containers and supports elastic scaling on demand by releasing or acquiring new containers from YARN depending on the performance requirements. This work is currently in flight and we’re planning to roll this out in production next quarter. This will bring further latency reductions in our ingestion from streaming sources, enable resource utilization efficiencies and allow us to integrate with streaming sinks seamlessly. The Helix framework allows us the flexibility to support other container management frameworks like Mesos, Kubernetes, etc. We will welcome contributions from the Open Source community in this direction. Stay tuned for a future blog post with a more in-depth discussion of the design and implementation of this feature.

Team, Community and Outreach


The Gobblin team from left to right: (sitting) Chavdar Botev, Issac Buenrostro, Ying Dai
(standing) Pradhan Cadabam, Min Tu, Ziyang Liu, Yinan Li, Sahil Takiar, Abhishek Tiwari

At LinkedIn, we’ve been fortunate to have stellar partners who have all helped make Gobblin a better product. We would like to give a shout out to our data services and Hadoop operations team, the Espresso and Kafka teams, the Bizo and Lynda teams, and the Content ingestion team. Externally, we’re excited to see the community adoption of Gobblin and are working on making it even easier to use and extend. Special thanks to Kapil Surlaker, Greg Arnold and Alex Vauthey from the management team for their constant encouragement and support.

We’re going to be talking about Gobblin, Pinot, Kafka, Samza and our latest invention Dali at LinkedIn’s second annual Big Data Perspectives event at our NYC R&D office in the Empire State building this week. Hope to see you there!

Topics