Open Source

Building a better and faster Beam Samza runner

Co-authors: Yixing ZhangBingfeng Xia, Ke Wu, and Xinyu Liu

Since Beam Samza runner was developed in 2018 at LinkedIn, we now have 100+ Samza Beam jobs running in production. As our usage grew, we wanted to better understand how the Samza runner performs compared to other runners and identify areas of improvement. In general, for stream processing platforms, better performance usually means supporting more use cases and reducing costs.

To do this, we utilized Beam Nexmark suite to benchmark the performance of Beam Samza runner. Using the async-profiler, we were able to identify performance bottlenecks in the Samza runner. This led us to four key improvements that collectively improved the benchmark score of Beam Samza runner by 10x. 

In this blog post, we will discuss the results of the benchmark suite and the optimizations implemented along the way. Similar steps can also be applied to benchmark and improve performance of other Beam runners.

Background

Apache Beam is an open source, advanced unified programming model for both batch and streaming processing. It provides a rich and portable API layer for building sophisticated data-parallel processing pipelines that can be executed across a diversity of execution engines or runners, such as Flink, Samza, and Spark.

Apache Samza is a distributed stream processing framework with large-scale state support. Samza applications can be built locally and deployed to either YARN clusters or standalone clusters using Zookeeper for coordination. Samza uses RocksDB to support large-scale state, backed up by changelogs for durability. Samza does incremental checkpointing using Kafka. As of today, Samza powers thousands of streaming applications inside LinkedIn processing over 3 trillion messages per day.

In 2018, we developed the Beam Samza runner to leverage the unified data processing API, advanced model, and multi-language support of Apache Beam. It has been widely adopted at LinkedIn for processing complex streaming use cases such as sliding window aggregations and multi-stream session window joins.

Benchmarking with Nexmark

We know Beam Samza Runner performed well for our use cases in production, but we wanted to find out how it performs more generally and in comparison to other runners. We leveraged the Beam Nexmark suite for benchmarking. If you’re not familiar, Nexmark is a suite of Beam pipelines inspired by the “continuous data stream” queries in the Nexmark research paper to benchmark the performance of the Beam SDK and its underlying runners since the Beam 2.2.0 release. Using the async-profiler, a low overhead sampling profiler for Java applications, we were able to identify performance bottlenecks in the Samza runner and improve its Nexmark benchmark score.

Setup
For the performance tests, we ran the Beam Nexmark suite on a dedicated Linux machine with the following hardware:

  • Intel® Xeon® processor (1.8 GHz, 16 cores)
  • 64GB GB DDR4-2666 ECC SDRAM (4x16GB)
  • 512GB SATA SSD

Nexmark settings
We ran the Nexmark benchmark suite in streaming mode with Direct, Flink, and Samza runners. We configured the Flink and Samza runners’ state store with RocksDB, a high-performance persistent key-value store. Most of LinkedIn services’ state is much larger than the available memory, hence we use RocksDB as the state backend inside LinkedIn. We used the default state backend for Direct runner since it doesn’t support RocksDB.

We have set bundle size as one for the runners, which means all elements are processed in parallel. When we ran the benchmark, Samza runner did not yet support bundling.

Benchmark results
Following are the Nexmark benchmark results of Direct, Flink, and Samza runners of the 15 Nexmark queries, labeled Q0 through Q14. In this chart, the X-axis represents each of the queries and the Y-axis represents the throughput of the queries in QPS, the higher the better. For each query, the four bars represent, from left to right: the Direct runner, the Flink runner, the Samza runner before optimization, and the Samza runner after optimization respectively.

tables-comparing-the-nexmark-stream-processing-throughput-results

Figure 1: Nexmark stream processing throughput with different runners

Before sharing our optimization steps, we want to show the final improvement result first. By the above benchmarks, Samza runner throughput has been improved by more than 10 times across all queries, compared to Samza-original. In general, the Samza runner performed much better than any other runners in our test environment.

  • For simple queries (Q0 to Q2), where the operations are lightweight and monitoring overhead is relatively high, the performance of Samza runner was greatly improved by the optimizations on metrics update.
  • As for more complex queries where windows and states are used (Q3 to Q14), the performance of Samza runner benefited a lot from state cache and metrics update code optimizations.
  • For Q13, a query of side input join, Samza runner still has room for improvement.

Optimizations

Nexmark helped us benchmark the throughput gap relative to other runners. We leveraged async-profiler to profile CPU usage of the Samza runner. By analyzing CPU usage hot spots, we had a clear picture of how CPU resources were allocated, so we could identify and reduce the performance bottlenecks. 

Below, we will show how Samza runner throughput improved by 11.5 times and discuss improvements in two areas: Metrics Update and State Cache.

Nexmark throughput Metrics update improvement                                                                                                                                                                                        State cache Total
  Step 1 Step 2      Step 3      Metric overall    
Improvement Rate Avg.* 3.58X 1.97X 1.37X 9.66X 1.19X 11.50X

Note: Throughput improvement rate was calculated by averaging all increase rates of 15 Nexmark queries. Optimizations have been done step by step from Metrics update step 1 to State Cache. Each improvement rate was based on the previous step.

Metrics update
Beam allows gathering and exporting metrics to external systems. We found that the metrics update has a huge impact on the overall performance of the Samza runner. For example, we have seen Nexmark throughput would increase by 12 times on average by disabling all Beam metrics in Samza runner. This implies that the calculation and updating of the Beam metrics is far more expensive than the message processing itself. While disabling Beam metrics wasn’t an option, it led us to reduce the metrics update cost to achieve a better throughput performance. Here’s a step-by-step overview of what we did:

  • Step 1. CPU time profiling of Nexmark Query0, a simple pass-through query, showed that the Samza runner called updateMetrics() method four times for each bundle. This was a bug which incurred a significant performance cost. We removed these three unnecessary updateMetrics() calls and according to Nexmark benchmark results, Samza throughput improved 3.6 times across all fifteen queries.
CPU-profiling-results-showing-four-update-metrics-calls

Figure 2: four updateMetrics() calls shown in CPU profiling of Query0

  • Step 2. Investigating the CPU profiling results further, we also found that MetricKey.toString() took about 64% of the CPU time for each operation. The metric key is a large string containing several parts which were concatenated together by String.format(). We removed an unnecessary part from the metric key and switched to the plus operator to concatenate strings, which is more efficient compared to String.format() according to the benchmark results presented by Redfin engineering. This time, Samza throughput improved by 2 times across all fifteen queries.
cpu-profiling-results-showing-the-high-cost-of-metric-key-to-string

Figure 3: CPU profiling results unveiled high cost of MetricKey.toString()

  • Step 3. Another performance issue identified was how Samza runner updates metrics for each PTransform. Samza would try to update not only the metrics of the current PTransform, but also metrics of the other PTransforms which introduced redundant cost if there were multiple PTransforms. For example, the cost of updating extra PTransforms was almost 23% of the CPU time for Query2, which uses simple filters. We modified this method to only update the metrics for the current PTransform. With this change, Samza throughput again improved by 1.4 times across all fifteen queries.

As we analyzed the above, the upper bound of performance gain in metrics update is about 12 times, which can be achieved by disabling all Beam metrics. Of course, we don’t want to simply disable all metrics. So far, we successfully improved Samza runner throughput by 10 times on average by reducing the cost of metrics update. This is very close to the upper bound.

State cache
From the profiling results, we also noticed that encoding and decoding of a job’s intermediate state takes a significant portion of total execution time. For example, it took almost 38% of the CPU time in Nexmark Query 4, as shown below. This was much more than expected as we had leveraged the in-memory write-back LRU cache in Samza to cache hot items.

cpu-profiling-results-showing-the-time-cost-of-encoding-decoding

Figure 4: CPU time cost of encoding/decoding in purple

Analysis showed that Samza runner only cached serialized bytes but not Java objects, so there was still a big overhead of encoding and decoding.

Beam solves this by supporting different coders for state key and value (de)serialization so we needed to let the caching layer be capable of deserializing bytes using the corresponding coder. The solution is a wrapper class containing state data and the corresponding coder in the cache layer, and only persisting bytes in RocksDB. During a cache miss, the caching layer reads bytes from RocksDB and puts it in a wrapper object first. Then Beam decodes it using the coder provided in the wrapper.

Here is our Beam state value cache strategy:

Before

  Beam state   In-memory cache   RocksDB
Write value encode → byte[] byte[]
Read value ← decode byte[] byte[]

Now

    Beam state   In-memory cache   RocksDB
Write   value coder + value encode →
byte[]
Read Cache hit value coder + value    
Read Cache miss value ← decode byte[] byte[]

To benchmark the improvement of the new state cache, we ran stateful Nexmark queries, query 3 to 14, with large enough cache size to cache all the state in memory. On average, the throughput of these stateful processing had improved by 1.19 times.

Summary and future work

Following the benchmarking and optimizing of Apache Beam Samza runner, we found:

  1. Nexmark provides data processing queries that touch a variety of use cases. It helps us benchmark throughput performance in different areas with different runners and would be even better if Beam Nexmark could be extended to support multi-container scenarios.
  2. Async-profiler is a very useful profiling tool that helps us to identify the root cause of performance bottlenecks, hence allowing us to take a more targeted approach to improve performance.
  3. Through the series of optimizations, Beam Samza Runner’s performance is now on par with or better than other runners with RocksDB as state storage in benchmark results. The optimizations have addressed the excessive metric updates and cache serialization. We are excited that the improved Samza runner will be in production soon at LinkedIn.
  4. Future work will be benchmarking parallelism and bundling of the runner. Most of the runners can run tasks in thread pools in parallel and bundling helps to reduce the metrics update cost further and we’re excited to explore the potential performance improvements and optimizations there. Also, side input can be optimized to improve the performance of Query13.

Acknowledgements

Special thanks to Bingfeng Xia for his expertise in performance analysis and guidance on every aspect of the project, and to Xinyu Liu and Ke Wu for their technical guidance on the project. Many thanks to the Samza team and the Performance team for supporting this project. This project would not have been possible without the collaboration of both teams. Many thanks to the reviewers of this blog: Winston Zhang, David Schachter, Sharad Singhai, Hai Lu, and Boris Shkolnik. Last, but not least, thanks to the Beam open source community for providing useful tools for performance analysis.