Open Source

Asynchronous Processing and Multithreading in Apache Samza, Part II: Experiments and Evaluation

This post is the second in a series discussing asynchronous processing and multithreading in Apache Samza. In the previous post, we explored the design and architecture of the new AsyncStreamTask API and the asynchronous event loop. In this post, we will focus on the study of the performance of this feature with benchmark Samza jobs. Some of the interesting questions are:

  1. Can Samza scale well when a job is doing asynchronous remote I/O, or using multithreading?
  2. Does the capability of out-of-order processing increase the parallelism?
  3. By building first-class support for asynchronous processing, does it impact the performance of existing synchronous jobs?

We did the experiments for jobs with remote data access, local data access, and CPU-bound computations. To summarize what we found:

  • Experiments with remote I/O jobs show superior performance: the benchmark job is able to scale linearly with the increase of parallelism. Async I/O support further enhances the CPU utilization.
  • Experiments with RocksDB jobs show certain performance benefits when RocksDB access is frequent. But the performance improvement is far from linear.
  • For existing CPU-bound jobs, results show that the cost of synchronization is negligible when running all tasks on the event loop thread. When the job is running in the thread pool with short CPU-bound process tasks, the performance degrades significantly.

Part I: Remote I/O

Setup
We use a remote job which consumes a PageViewEvent topic of 10 partitions, and then request the member’s inbox information for each event using REST calls. The call duration varies from 1ms to 200ms. The job runs on a single worker-node Yarn cluster with one container (one core) and 1GB memory.

Baseline
The baseline of the process rate is about 20 messages/sec, which is measured when running all tasks in the legacy sequential execution model of Samza.

Experiment results
We tested both the blocking I/O and asynchronous I/O cases.

  • Blocking I/O: The following figure shows the performance enhancements from baseline to multithreaded execution. Initial thread pool size is 10, same as partition count. The process rate goes up to above 250 messages/sec. To test the concurrency within a task, we further increase the task max concurrency to 3. We also increase the thread pool size to be 30, so each task can process 3 messages in parallel. The process rate goes above 1,000.
async21
  • Async I/O: To evaluate the performance for asynchronous I/O, we use the asynchronous I/O client of Rest.li in an AsyncStreamTask. This client uses efficient non-blocking HTTP connections and does not require threads waiting for I/O completion. We set task max concurrency to be 1 first, and then increased it to be 3. The results are similar to the previous experiment with thread pool size 10 and then 30, and the resource utilization of CPU and memory is much less, since there are no additional threads waiting for I/O.
async22

Part II: RocksDB

Setup
We use a Samza job that reads/writes to RocksDB. The job keeps count of how many times each input message key has been seen. For each message, it extracts the key, looks it up from the local RocksDB store, and updates the count. The input stream has 8 partitions and is preloaded with 1G messages, with message size around 100 bytes. Each key is a unique number ranging from from 1 to 1G, so each process will have cache miss and access RocksDB. The job uses one container and runs for 30 minutes.

Baseline
The baseline of the performance test processes about 55K messages/second in the legacy single-thread execution model.

Experiment results
Since the task is synchronous, we run the job in the built-in thread pool. The test results are the following:

Tests Process envelopes (avg/sec) User CPU utilization(percent) Heap used (mean MB)
Baseline 54,926 17.3 1,578
Multithreading (threadpool: 0) 50,259 (-8.5%) 16.6 (-4%) 1,532 (-3%)
Multithreading (threadpool: 1) 63,601 (+15.7%) 24.2 (+40%) 1,644 (+4%)
Multithreading (threadpool: 8) 89,816 (+63.5%) 40.9 (+136%) 1,665 (+5%)
Multithreading (threadpool: 8, max concurrency: 3) 119,308 (+117.2%) 48.7 (+181%) 1,685 (+7%)

When thread pool size is 0, all tasks run on the event loop thread, and we see a small overhead of synchronization. When we increase the thread count, the multithreading model performs better and better. In the last experiment, we set the max concurrency per task to 3, and we see both processing rate and CPU usage increase more. During all the tests, the memory heap usage is mostly the same.

Part III: CPU-bound

Setup
Here, we conduct an experiment that compares the single-node peak performance with the previously published results. We used the same test job, which has randomly-generated keys of key space (1M) and let the cache size also be 1M objects to avoid the cache miss after the cache warms up. The job consumes from a topic with 48 partitions. The lookup and updates will happen mostly in cache, which is periodically flushed to RocksDB. The job has 24 containers and runs in a single worker-node Yarn cluster.

Baseline
As mentioned in the published results, Samza can process around 1.1M messages per second on a single node.

Experiment results
When running all tasks in the asynchronous event loop thread (thread pool size is 0), performance is on par with the baseline. After increasing the thread pool size to be 1, the performance degrades to about ⅓. This is caused by the overhead of the thread scheduling (which is about 5 microseconds on average in this host), much longer than the process time. This study shows that for CPU-bound jobs, single-thread execution is still optimal, and the asynchronous event processing model performs on par with the legacy synchronous processing model in this case.

Execution mode Process rate (messages/sec)
Baseline 1.14 M
Multithreading (threadpool: 0) 1.12 M
Multithreading (threadpool: 1) 0.37 M

Summary

In practice, we see both performance and efficiency gains in running Samza jobs with the new asynchronous processing API. We expect future improvements in resource utilization in our current Samza jobs.

Acknowledgements

Thanks to the Samza engineering teams at LinkedIn for the invaluable help with the design and implementation of this feature. Thanks to all Samza customers for their feedback on the use cases of remote access in practice.