Data Streaming/Processing

Unified Streaming And Batch Pipelines At LinkedIn: Reducing Processing time by 94% with Apache Beam

Co-Authors: Yuhong Cheng, Shangjin Zhang, Xinyu Liu, and Yi Pan

Efficient data processing is crucial in reducing learning curves, simplifying maintenance efforts, and decreasing operational complexity. This, in turn, helps engineers to develop and deploy data processing applications quickly and easily, powering various business requirements, and enhancing member experience on LinkedIn. In the past, we often used lambda architecture for processing jobs, meaning that our developers used two different systems for batch and stream processing. This could create some challenges because the user needs to maintain two different codebases, implement different data code to read/write batch and streaming data, and learn/maintain two different engines in runtime.

To reduce this complexity, we began utilizing Apache Beam, which allows the user to write processing logic in the same code for both batch and stream jobs. By unifying these pipelines, we have saved 94% of processing time.

With the unified Beam programming model, we can now run a single codebase to do both real-time processing as a streaming job and periodic backfilling of data as a batch job efficiently by leveraging the capability of Apache Samza and Apache Spark runners.

In this blog post, we will share our progress, challenges, and lessons learned from implementing Apache Beam.

Background

Apache Beam is an open sourced, unified model for building both batch and streaming data-parallel processing pipelines. Using one of the open source Beam SDKs, users can build a program that defines the pipeline. The pipeline is then executed by one of Beam’s supported distributed processing back-ends (e.g. Samza, Spark and Apache Flink). This offers a way to easily create a large-scale common data infrastructure across different applications that consume the data. Please see Figure 1 to better understand Beam.

Graphic that shows Overview of Apache Beam

Figure 1: Overview of Apache Beam (source)

At Linkedin, Samza powers thousands of streaming processing applications to process 2 trillion messages daily with large states and fault tolerance. For batch processing, we use Spark to solve sophisticated batch scenarios and process petabytes of data with our industry-leading external shuffling service and schema metadata store

Beam Apache Samza Runner executes the Beam pipeline as a Samza application and can run locally. Beam Apache Spark Runner can execute Beam pipelines using Spark, just like a native Spark application. This is how Apache Beam is able to create a unified pipeline for both jobs.

Use case

Our focused use cases are pipelines that need both real-time computation to reflect immediate updates and periodic backfilling to refresh all the input data. Let’s take a look at one of them to explain the use case and the pain points of our current solution we have encountered.

Standardization is the process to map user inputs into a set of pre-defined IDs. For example, user input strings like job titles, skills, or education history will be first mapped into internal IDs according to our taxonomies. The standardized data can then be used for search indexing or running recommendation models. These processes are made possible by hosting a series of pipelines written in Beam and executed by Samza Runner. 

The pipelines contain AI models joining complex data like job types and working experiences to standardize user data for further usages. There are mainly two scenarios. 1) Real-time computation is needed to reflect the immediate user updates. 2) Meanwhile, we need periodic backfilling to redo standardization when new models are introduced. Figure 2 illustrates the pipeline.

Currently, real-time processing and backfilling are both executed as streaming jobs by Samza Runner. When doing backfilling, it is hard for a streaming job to meet time requirements and resources requirements:

  1. For every backfilling, the job needs to handle 900 million member profiles at a rate of 40,000/sec.

  2. As the training models are more and more complex, the backfilling jobs cannot finish within a reasonable time.

  3. Streaming cluster is not optimized for spiky resource footprint.

Diagram of Standardization process including backfilling and real-time processing

Figure 2: Standardization process including backfilling and real-time processing

Our Methods

Considering the issues we mentioned previously, we began implementing both a streaming job and batch job to execute the same logic with Lambda architecture. However, while this helped, it still required excessive manual effort to build and maintain both a streaming and a batch pipeline. Users would need to maintain two quite different codebases and two different tech stacks. Engineers would need to be familiar with different languages and experience two learning curves. If there were any issues, engineers would also need to reach out to different infra teams for support. 

Instead, we decided that our goal would be to execute backfilling as a batch job using Beam Spark Runner while users only need to write code once in Beam API for the better cost-to-serve as well as engineer productivity. To achieve this goal, we needed to support reading/writing different types of data sources via the same Beam API as tables in both streaming and batch environments.

Architecture

With our new architecture (as shown in Figure 3), developers only need to develop and maintain a single codebase written in Beam. If the target processing is a real-time one, the job is deployed through Samza Cluster as a streaming job. If the target processing is a backfilling job, the job is deployed through Spark Cluster as a batch job. This unified streaming and batch architecture enables our team to take advantage of both computing engines while minimizing development and maintenance efforts.

Diagram of Unified streaming and batch processing architecture

Figure 3: Unified streaming and batch processing architecture

Beam Pipeline

A Pipeline in Beam manages a directed acyclic graph of processing logic. PipelineOptions is an interface provided by Beam used to configure Pipelines. PipelineOptionsFactory constructs a PipelineOptions or any derived interface that is composable to any other derived interface of PipelineOptions

With Beam APIs, developers can implement the processing logic the same way for both streaming and batch jobs. Figure 4 is an example of a Beam pipeline. This pipeline reads ProfileData; joins the data with sideTable and then applies a user defined function called Standardizer(); finally, writes the standardized result to databases. This code snippet can be executed both in Samza Cluster and Spark Cluster.

Graphic of a Beam Pipeline

Figure 4: An example of a Beam Pipeline

In the following sections, we will explain the code in Figure 4 with more implementation details, including unified PTransform and unified table join.

Unified PTransform

Although Beam pipelines expose the same source code for both batch and streaming internally, during execution, there are still differences between streaming and batch, e.g. IOs (Input is read from one or more sources. Output is written to one or more databases.) 

In streaming processing, input data is always from unbounded data sources, like Kafka. However, for batch processing, input data comes from bounded data sources, like HDFS. Similarly, on the output side, a streaming job can update DB directly, but a batch job will normally produce a dataset. So, different IO behaviors should be applied differently according to pipeline types, as shown in Figure 5. To hide the IO differences, we extended Beam PTransform. A PTransform represents a data processing operation, or a step, in the pipeline. 

Diagram that shows how An example on how Unified PTransform works with I/Os

Figure 5: An example on how Unified PTransform works with I/Os

Unified PTransform is a special PTransform that provides a unified interface to users but allows different implementations according to pipeline type. Unlike the PTransform provided by Beam, any implementation of this interface needs to provide two expand() functions for streaming and batch, which allows the topology differences. At runtime, we can detect the pipeline types and call the desired expand() accordingly.

public static class Read extends UnifiedPTransform<PBegin, PCollection<String>> {

  @Override
  protected PCollection<String> expandStreaming(PBegin pBegin) {
    return pBegin.getPipeline()
       .apply(KafkaIO.<String>read()
                     .withTopic(getStreamingInput()))
        .apply(...);
  }

  @Override
  protected PCollection<String> expandBatch(PBegin pBegin) {
    return pBegin.getPipeline()
        .apply(FileIO.match().filepattern(getBatchInput()))
        .apply(...);
  }
}

Unified Table Join

Beam Join API by default provides the functionality to do joins using CoGroupByKey – no matter if it is a batch processing or a streaming. When applied to unbounded data, either non-global windowing or an aggregation trigger is needed to perform a CoGroupByKey. Also, CoGroupByKey requires data shuffling which sometimes needs large resources. 

During investigation and implementation, it is not efficient to do joins via CoGroupByKey if the side table supports key lookup. So we added this unified table join PTransform to optimize the performance and reduce resource cost. Based on the table characteristics, the actual join could be: lookup the keys from tables directly or do data shuffling to join or even broadcast the table to do a join.

Diagram of An example on how Unified Table join works

Figure 6:  An example on how Unified Table join works

Performance Gains and Summary

After migrating our standardization to a unified stream and batch pipeline, the performance gains were encouraging. When we ran backfilling as a streaming job, the total memory allocated was over 5,000 GB-Hours and the total CPU time was nearly 4,000 hours. After migrating to a Beam unified pipeline, running the same logic as a batch job, the memory allocated and CPU time both were cut in half. The duration also dropped significantly when we ran backfilling using Beam unified pipelines – from seven hours to 25 minutes.

Graphic of a comparison table for implementation of Apache Beam

These comparisons are based on running the same backfilling logic as a streaming job versus a batch one of a unified pipeline.

To summarize, with unified streaming and batch pipelines, we see the following wins: 

  1. Engineer productivity. Engineers can write the code once and run it in different environments no matter its streaming or batch. With the same codebase, it is easier to develop and maintain. 

  2. 50% Resource reduction. We saved 94% processing time and around 50% of resources with unified pipelines. Based on the cost to serve analysis , the operating cost was reduced by 11 times.

Future Works

Using Beam to write user logic in the same code base is just the first step toward a truly end-to-end convergence solution. Moving forward, we’re also working to solve the following existing challenges:

  1. Data sources in batch and stream environments are different (i.e. one side is Kafka, the other side is HDFS). We used a conditional branch mechanism in the Beam source to hide the details and created a convergent data source abstraction. However, this abstraction is embedded in this particular Beam pipeline and needs to be replicated if we want to use the same source abstraction in other convergence Beam programs.

  2. A lack of temporal lineage between physical source datasets for stream and batch jobs. As of today, to ensure that stream job will continue processing the input data from where the batch job stops, we need to estimate the time-based overlap between Kafka and HDFS input datasets to the convergence jobs and manually rewinding the stream job start offsets to a few hours or days earlier to ensure there is no gap. This is prone to toil and error.

  3. Users still need to learn how to run, tune, and debug two different runtime binary stacks (i.e. Beam Samza runner in stream, and Beam Spark runner in batch). The operational and maintenance costs of runtime on the two engines, and the maintenance of the two runner codebases is huge.

Acknowledgements

Like anything complex and high quality, this project was built from the ground up by an amazing team of engineers in the Managed-beam team, the Waterloo team and Grid team. Special thanks to Xinyu Liu for leading and also the technical guide and support on the project,  and our collaborators Daniel Gmach, Aaron He, Fang Peng from Waterloo team, Chen Qiang from Grid team. Also thanks to the management team Bingfeng Xia, Gary Yang, Anthony Asta and Renu Tewari and previous manager Amir Rosenfeld for their help in team collaboration and support on the project. Last, but not least, many thanks to the reviewers of this blog:  Hai Lu, Lucas Wang and the LinkedIn Editorial team for your reviews and suggestions.