Solving the data integration variety problem at scale, with Gobblin
February 24, 2021
Co-authors: Chris Li, Kevin Lau, and Subbu Sanka
Editor’s Note: Recently, the Apache Software Foundation (ASF) announced Apache® Gobblin™ as a Top-Level Project (TLP). For more information, visit https://gobblin.apache.org/ and https://twitter.com/ApacheGobblin.
Our big data ecosystem is larger than 1 exabyte and growing, while ingesting and processing upwards of seven trillion Kafka events per day. At this scale, data integration is an incredibly complex problem. This is not only because of the multitude and variety of online and offline systems within the company, but also because of the hundreds of third-party data providers and partners that we work with. Add the plethora of protocols and formats that each brings, and the permutations quickly become untenable.
The mission of the LinkedIn Data Integration team is to dramatically simplify the development and maintenance needed for seamless inter- and intra-company data exchanges. To achieve that goal, we strive to build leverageable connector libraries, plugins, and extensions in order to streamline and standardize data integration between LinkedIn and other third-party vendors’ systems. These products provide developers with over 80% of the code needed to create high quality integrations—with built-in capabilities for security, compliance and QoS management that each engineering team would otherwise have to build themselves.
Our vision as we build these products is that data integration in the cloud era should expect and seamlessly support a dynamic variety of industry standards, not just an established few. Therefore, our technology should be designed in such a way that a variety of standards are accommodated with constant scalability.
In this blog post, we will talk about our journey solving this rapidly evolving problem of integration in the big data space. Everything that we discuss is currently in production and used to serve our members with many of the technologies available to the open source community.
LinkedIn uses a distributed open-source platform called Gobblin for data integration. One part of the codebase that keeps growing extremely fast is the extractor library that interacts with various sources (i.e., connectors) because of the variety and the large number of B2B vendors. Previously, custom connectors were built to meet specific requirements. This “bag of connectors” strategy led to the following categories of problems:
Linearly growing engineering cost: The “bag of connectors” strategy averted the variety challenge by developing numerous variants of similar and common (e.g., HTTP/JSON) connectors, despite common primary elements like protocols or message formats. New use cases often cannot leverage existing connectors unless technical specifications are an exact match. In addition to the linear engineering costs, this strategy also subsequently causes outsized maintenance costs.
Long(er) lead times: The learning curve of the Gobblin framework for the development of the connector itself costs each development team on average weeks to months.
Varying levels of compliance and security: This is a second-order problem resulting from redundancy, but given the increasing requirements around compliance and privacy controls, not having consistent approaches to areas such as logging, audit, and data minimization has become a major issue.
To achieve the goal of efficiently supporting existing business cases and expediting go-to-market for new ones, we needed a different approach to address the variety problem. We could not simply refactor common elements into libraries; we needed to also create a configuration-driven framework that allowed for degrees of customization, which would ultimately enable rapid results.
The motivation behind Data Integration Library (DIL), a library of generic components backed by a multistage architecture, was to standardize and dramatically simplify the connector layer.
In contrast to the custom-build approach required by the "bag of connectors" strategy, DIL would provide all of the building blocks for data integration, and end users would only need to configure a standard pipeline to integrate with a new data source. As illustrated in the picture above, a small number of connectors supporting transfer protocols that cover the vast majority of the use cases are provided. In our solution, a configuration can be written in HOCON, YAML, or JSON, relieving the end users from writing (or maintaining) Java code. This can be a major advantage to companies that handle DevOps without a full-fledged engineering team, or for teams wanting to focus on their applications instead of having to become big data integration experts.
Having generic components, supported by a multistage architecture and a granular state store, is the key innovation that enables constant scalability, short lead times and consistent compliance.
We designed the connector framework with the variety of potential data sources in mind. The following is a real world scenario where multiple protocols and multiple data formats are being processed.
Given that we are now in the cloud era, the above scenario is becoming increasingly common. Therefore, while typical data integration tools on the market are still designed with individual integration points in mind, we believe data integration should work with multiple systems by nature. The following design shows how that was achieved with DIL.
While we might have endless different combinations of data transmission protocols, data formats, compression methods, pagination strategies, etc., the number of generic components that we can distill from those combinations is actually very limited.
The below chart illustrates this in an environment of N protocols, which are handled by the Source component in the above architecture, and M formats, which are handled by the corresponding Extractor component. Previously, we would have needed to build one connector for each of the MxN combinations. With DIL, we only need to build N connection handlers (Source) for N protocols and M formats (Extractor) for M formats. Therefore, we addressed an MxN problem with an M+N solution.
Building on top of the Gobblin execution framework, the modified architecture of DIL strives to decompose and abstract complex source API logic into multiple smaller units, or stages. In other words, this multistage framework breaks down a large, complex ingestion job into two or more smaller jobs and orchestrates their execution. Here is the system architecture depicting the major components of this framework:
Figure note: We are not extracting metadata from the source in the same data extraction job, as the combination of metadata and protocols can vary case by case; instead, we support having a separate metadata stage if needed.
While it is up to the end user to decompose a complex job, the framework provides a standard mechanism by integrating output from the source with metadata. This is called “secondary input,” as opposed to the job parameters, which are “primary input.” The metadata mixer is a key component to enable secondary input, taking in output from preceding stages and adding it to metadata to control the execution of the subsequent stages. This back propagation of information makes it possible to break large flows into smaller jobs.
Granular state store
To further improve efficiency and recoverability, DIL also supports slicing a job into smaller parts, by time and by categorical attributes, with each part called a work unit. Therefore, this architecture supports a 2D matrix of work unit definitions, with time-based watermarks on one side and attribute-based watermarks on the other side.
In runtime, each work unit maintains its own state in the state store. The granular state store keeps track of data loading status, and it also enables the recovery of individual work units. This standardized approach for processing large jobs proved to be a game changer by creating leverageable code to reduce complexity.
Success story: Google API integration
Google Search Console (GSC) integration used to be done through a GSC Connector. Previously, the custom-build GSC Connector leveraged the Google Search Console API SDK in order to maximize the data ingestion speed, volume, and API limits. However, using the SDK meant having highly-customized connectors, because of SDK restrictions and upgrades. For example, the SDK version of the connector would have to be upgraded constantly to avoid using unsupported API versions. Another factor that made the old connector expensive was that Google APIs actually include many applications, each with its own SDK. The SDK-based connectors require a multitude of customized connectors for each Google application.
With DIL, we built an integration with all Google APIs by composing and reusing generic components. The new approach bypassed the complexity in the SDK and achieved the same results with better development efficiency. The outcome is a simplified configuration, shorter lead time, lower maintenance cost, and greater scalability. Here is a before and after comparison.
While we have significantly simplified the design with a single generic HTTP connector, that alone is insufficient for asynchronous Google APIs and many others (Salesforce, Eloqua, etc.). The reason is that asynchronous API calls can be totally different between vendors. For example, Google Ads API has a four-step process for asynchronous requests, but Salesforce has a six-step process, and Eloqua has a five-step one.
In previous designs, each connector handled that multi-step asynchronous process on its own, which means the connectors were all unique. With the multistage architecture support in DIL, instead of managing all those steps in a single, massive job, we took a more distributed approach, running each step as a separate job and stitching those mini jobs together. The following diagram illustrates a before and after comparison of the workflow that ingests data from the asynchronous Google Ads API.
In the above diagram, the new multi-staged flow uses shared components that can be leveraged to integrate with many other Rest API and OData data sources, whereas the complex job requires a vendor-specific connector. The streamlined multistage flow dramatically reduces the work needed to apply it to hundreds of objects, getting rid of many one-off implementations previously deemed “essential” to handle some special objects, such as those illustrated above for retrying.
Last but not least, all the genericity and decomposition will not work without a fine-grained state store, with which each generic step and work unit is managed independently and collectively, allowing them to be restarted or reset separately.
Benefits for open source community
At LinkedIn, we have used DIL across dozens of use cases to greatly simplify operational complexity, while allowing us to scale as a company. The direct benefit of the design is quicker time-to-market, and the dramatic reduction of lead time to onboard a new business initiative. The design also creates high ROI, as fewer resources are required to build and maintain the connector libraries.
While these benefits across LinkedIn alone have been very significant, consider the tens of companies that are using Gobblin as their data integration platform: there could be hundreds or thousands of customized connectors being developed and used in private, which means DIL could create a far greater impact within the community. For those looking to adopt this framework for their data integration needs, the standardized design will greatly reduce the upfront investments, which results in accelerated adoption.
DIL’s generic components, along with its multistage architecture and granular state store, greatly standardize and simplify data integration by using the same principle of microservices that are used to replace monolithic applications. The new architecture decouples data source protocols from format, and it is proven to be scalable in complex integration environments with its protocol-format-agnostic design. At LinkedIn, we have replaced 89 independently-maintained connectors with generic connectors. We believe this architecture will greatly simplify Gobblin with regard to its connector repository, which will surely benefit the open source community.
So far at LinkedIn, we have applied the above design to a few dozens of data ingestion cases, positively impacting hundreds of valuable datasets. In the next few quarters, we plan to apply the same principles to outbound data.
Compared to ingestion, (outbound) egress involves additional requirements in terms of data security and compliance. Furthermore, we are targeting more usage patterns where users might initiate data pulling from their end, which will demand that the framework be able to serve data in more diversified ways.
At the time of this writing, Gobblin is being promoted to a top level Apache project (TLP), and this work is being considered as a sub-repository under the upcoming Apache Gobblin project.
Lastly, while the current implementation of this design is based on the Gobblin framework, the same design principles could be applied to other frameworks as well.
This work is heavily inspired by our partners in Marketing (LMS/DSO), Sales (Sales Productivity and LSS), and Artificial Intelligence (AI). A number of engineering teams contributed to the development of the connector framework. For all their hard work and support, we give thanks to Yogesh Keshetty, Eric Song, Pravin Boddu, Zhisheng Zhou, Haoji Liu, Alex Li, Varun Bharill, Yan Nan, Harry Tang, Steven Chuang, Varuni Gupta, Cliff Leung, Jimmy Hong, Sudarshan Vasudevan, Zhixiong Chen, Azeem Ahmed, Jyothi Krishnamurthi, Elma Liu, Carlos Flores, Susan Sumida, and others who provided feedback to this blog.