Expediting data fixes and data migrations
July 9, 2019
With over 630 million members, the LinkedIn platform delivers thousands of features that individually serve and store large amounts of diverse data. Protecting, maintaining, and serving data has always been of paramount importance for enriching the member experience and ensuring service reliability. In this blog post, we’ll address a critical part of data management involving ad hoc operations through data migrations or data fixes. More specifically, we’ll detail how we developed a centralized scaleable self-service platform for implementing and executing both data fixes and migrations.
Let’s review the two primary types of data operations. Data migrations generally involve a process of collecting, transforming, and transferring data from one field, table, or database to another. Data fixes on the other hand generally involve selecting and transforming data in place. To elaborate on these types, let’s go through an example:
Suppose member first names are not limited in length. A number of members have entered first names that are millions of characters long. A product decision is made to enforce a limit of 1,000 characters. A data fix might involve truncating the first names of all members with first names of length greater than 1,000 characters.
Suppose member profile data exists in a relational database that we would like to transfer to a NoSQL database. In the past, we migrated profile data from multiple Oracle databases to a NoSQL Espresso database in order to leverage existing technology developed internally by LinkedIn. This would be an example of a data migration.
From cleaning invalid or corrupted data, to migrating member data from legacy systems, these operation types are all use cases that we frequently encounter at LinkedIn.
As LinkedIn and its datasets continue to grow rapidly, executing these data operations becomes increasingly difficult at scale. Specifically, data fixes can carry a weight of urgency as immediate solutions to production issues. For both data operations, reducing the engineering effort required provides immense member value by preserving data quality and expediting feature development—especially when data migrations can require several months to complete and verify. Historically, across the company, data operations have been conducted through a decentralized variety of mechanisms including ad hoc scripts, use case-specific CLIs, deploying new migration services, and many more.
However, in order to develop a generic platform, we had to keep in mind a few requirements and considerations:
Problem 1: Ease of use
This new system should maintain a level of simplicity such that any data owners can write a datafix or migration without significant effort. Achieving this requires abstracting concepts away from users writing jobs through simple interfaces, as well as flexible support for multiple languages, frameworks, and data engines (i.e. Java, Python, Pig, Hive, Spark, etc.). To reduce the engineering effort required for data operations, we need to abstract away multiple aspects such as resource/topology management, request throttling/scaling, code deployment, iterating data operations on records, and many more. The goal of these abstractions and interfaces is to improve both the developer experience and speed of implementing data operations as much as possible.
Problem 2: Data correctness
With any data operation, we must ensure and maintain data correctness. This entails keeping data in a valid state for use by features/products and preventing data from entering a state such that member features break. Any data platform should focus on preventing these implications if at all possible by allowing for strong validation. For any software or engineering company at the scale of LinkedIn, designing systems that have perfect validation and will preempt all future data quality issues is close to impossible. With a multitude of data stores, schemas, and services, not all corner cases and holes in validation can be fully prevented or protected against. Any client, system, or script can compromise data quality. Similarly, any data operation job could do the same, potentially exacerbating pre-existing issues. Therefore, any existing validation must be honored regardless if data changes occur organically via live traffic or inorganically through a data operation platform.
Validation comes in all shapes and sizes. Simple validation can range from type checks, formatting checks, range checks, and many more. Alternatively, complex rules can include checks across multiple tables within a database or cross-service calls to multiple sources of truth.
Often times, these simple rules can be fully enforced at the database layer, but not all validation rules can or should be implemented this way. Enforcement is not always feasible at the database layer and may require an intermediate service, especially in the case of complex rules. To satisfy both use cases of simple and complex rules, it is imperative that a data platform be flexible enough to access any service or database that could contain necessary validation rules, ensuring any fixes or migrations maintain high data quality.
Problem 3: Scaling and throttling
When modifying large amounts of records, any data migration must be able to iterate on records quickly. For example, we recently needed to purge deprecated profile fields, such as associations and specialties, from millions of members as part of a decision to remove the elements of the profile that no longer seemed relevant to the product. Generally, data migrations may have to apply across millions or hundreds of millions of profile records, which requires an immense scaling of this data operation process. If we were to modify all 630 million members’ records using a data migration job that can only modify 10 records per second, this migration would take almost two years to complete! Our data platform must be able to modify large amounts of records quickly across large datasets.
On the other hand, complex systems will inevitably have bottlenecks and capacity limits (database, service, and event consumption limitations). High query frequency and volume can easily take down under-provisioned services or exposed bottlenecks, and require careful control of data operation rates. This system will require consistent throttling mechanisms to ensure services do not take on an unmanageable load.
Solution: Data operation platform
In order to solve these problems, we implemented a flexible self-service platform for users to rapidly implement, test, and execute data operation jobs. At a high level, the platform filters down large datasets to records that need to be operated upon and then iterates through each of these records to conduct corresponding updates to data until the job is complete.
The high-level components consist of the following:
User inputs: User-implemented dynamic and static job configurations, an offline script, and online data operation code.
Artifact repository: Source code repository for user implemented job build artifacts.
Azkaban workflow: A reusable generic job workflow consisting of an offline filter phase and online mapper phase. Azkaban is a batch workflow job scheduler created at LinkedIn to run Hadoop jobs.
Existing data services and infrastructure: Existing data infrastructure and data services that the workflow interacts with to read and write data for executing a data operation job.
Throughout this blog post, we’ll delve into how these components interact with each other and how they function together to address the technical requirements.
Users of the platform may build data jobs for a wide variety of reasons or use cases to be supported by the platform. To handle this, the platform workflow is intentionally made generic such that the user inputs are used as plugins during the job workflow’s execution. This allows the platform to be agnostic to the details of each job’s implementation. The platform does not define the user logic but instead controls the structure, interfaces, and execution of data operation jobs.
This approach to abstraction addresses the first problem of ease of use by removing the need for:
Complex user implementations through simple interfaces provided by the platform
Maintaining and understanding the tedious details common among most jobs
Managing computing and storage resources required for job execution
The platform abstracts away these implementation details such that clients only provide the user inputs in the form of a static distributable and dynamic configurations.
The static distributable includes specifications defined once for each given job involving an offline script, online data operation logic, and a static manifest configuration file. The offline script can be a Pig, Hive, or Spark job that limits input datasets or sources them into a filtered set of records to be operated upon. The online data operation logic is an implementation of an interface for conducting necessary reads or writes based upon a single record result from the offline script. These three subcomponents are compiled and built into a JAR to be stored into the artifact repository that will then be downloaded during the Azkaban workflow job execution.
The dynamic configurations provide easily modifiable parameters that users can adjust for each execution of a given job. To complete a data fix or migration, it’s likely multiple executions with different flavors will be required. Some examples of these modifications include request rates, throttling limits, data center fabrics, or other environment variables.
Once users provide the above components, they are ready to run their jobs. Once the platform downloads and processes the user inputs, the job begins with the filter phase. This phase narrows down the records from a given dataset to a closer representation of the records that need to be fixed or migrated. For example, even though 630 million members exist, we may only need to fix 1 million of those member records. Filtering allows jobs to iterate only on the records that actually matter and reduces unnecessary data operations.
Filter phase workflow
With the downloaded distributable JAR from the artifact repository, the platform will first extract the offline script and static manifest. Then, the offline script will be executed to read defined datasets from HDFS and apply filtering logic based upon the values and records from those datasets. Finally, the filter phase will produce a limited set of filtered records as output to be operated upon in the mapper phase.
Once the job has produced a set of records to operate on, the workflow can conduct the necessary writes. The platform will then use the user-defined online logic to conduct reads and writes to online services or data stores.
Mapper phase workflow
In order to handle large amounts of data operations, the platform must be able to dynamically scale computing resources. This is done through the use of Gobblin, a universal data ingestion framework for extracting, transforming, and loading large volumes of data. Gobblin libraries allow the platform to create tasks based upon an incoming data source, such as the filtered set of records in this case. With the online data operation logic, the workflow then constructs execution tasks to be queued, throttled, and carried out. This is how the platform addresses the second problem of scaling and throttling. Using the dynamic configurations provided, the platform scales and throttles the execution of these tasks.
Since the online logic is plugged into the tasks for execution, flexibility remains for calling existing online data services and databases and satisfying the requirement for data correctness. Therefore, users or owners of data services and databases can enforce validation rules as they so choose. Users can then ensure that their data operation logic calls the desired services or systems that contain those strict validation rules.
Lastly, the existing LinkedIn data infrastructure will propagate these job writes from databases into HDFS through ETL pipelines. As more and more updates propagate to HDFS, the number of filtered records will continue to decrease until the remaining filtered set becomes zero. Now, the loop is complete! We’ve successfully cleaned up invalid data or prepared legacy systems for deprecation and decommissioning!
Conclusion and next steps
By abstracting away a majority of the implementation details required for running data operation jobs, engineers can put aside tedious aspects—such as throttling, scaling, deployments, and resource management—allowing for faster fixing and migrating of data. Some simple data jobs can be implemented within a few hours or even a single day.
Currently, the platform supports offline to online jobs (an offline filtering phase with an online mapper phase). This will be evolved to support additional job types:
Online to online: In some cases, offline data may be unavailable or unreliable, which may require filtering with online sources of truth directly.
Nearline to online: In other cases, it may be desirable to trigger migration logic on database changes or other nearline events instead of offline filtering.
Additionally, we also plan to integrate existing quota systems into the data operation platform so that we can ensure that the throttling of data operations can be controlled by the service receiving the data operation requests, rather than on the client-side (data operation platform) that is doing the triggering. This can help ensure that systems are robustly protected against excessive data operation traffic through enforcement.
This project would not have been possible without the backing and support from Sriram Panyam. Thanks to Arjun Bora, Charlie Summers, Abhishek Tiwari, Xiang Zhang, Ke Wu, Yun Sun, and Vince Liang for early feedback on the design. Thanks to Jingyu Zhu and Estella Pham for contributions to implementation and further design enhancements. Thanks to Jaren Anderson, Heyun Jeong, and Paul Southworth for feedback on this blog post.