Asynchronous Processing and Multithreading in Apache Samza, Part I: Design and Architecture
January 4, 2017
As part of the Apache Samza 0.11 release, we rebuilt Samza’s underlying event processing engine to use an asynchronous and parallel processing model. The new model is unique among current open source stream processors because it not only supports running traditional synchronous processing in parallel on multiple threads, but also provides first-class support for asynchronous processing, which is useful for applications that need to perform non-blocking I/O. With this support, a user job can now perform either in-order processing or out-of-order processing with certain processing semantics guaranteed. This post introduces the new Samza asynchronous API and model, explores the details of the asynchronous event loop, and finally discusses the semantics guaranteed when processing messages.
One of the common problems faced in stream processing is data access. The requirements, as discussed here, are to support the following access patterns at scale: read/write data, for maintaining the application state; and read-only data, for looking up adjunct data. To address these problems, modern stream processing frameworks have been advocating local data access, such as an in-memory store (Apache Spark, Apache Storm, Apache Flink) or RocksDB store (Apache Samza), backed by durable storage such as HDFS or Kafka. When the data can be co-located with the processor, the local data access greatly improves the performance. As reported here, a Samza test job with local RocksDB state store is able to process 1.1 million requests per second on a single machine (1.7T SSD, 48G RAM).
The challenges of remote data access, however, remain largely unchanged: performance bottlenecks caused by slow I/O, parallelism limited by the execution unit (processes/threads), and increasing hardware cost at scale. The challenges are further compounded by the complexity of handling the ordering of the event processing and checkpointing. For example, a Samza application named Air Traffic Control (ATC) is used for delivering LinkedIn emails and notifications to members. It aggregates multiple channels, such as member chats, activities, and network updates. These communications require looking up data from multiple remote data sources, including invitations, mailbox, connection graph, network feed, and comments. In the absence of efficient remote data access support, ATC has to manage its own threads to parallelize the remote data access for better performance and resource utilization. ATC also handles the process ordering and checkpointing internally by itself, which increases the development cost and code complexity dramatically.
Modern application/service platforms, such as Node.js and Play framework, address the remote data access problems by supporting asynchronous user applications. These applications make asynchronous I/O (or non-blocking I/O) calls to start I/O communication, and then perform other operations which do not depend on the results of the communication. This allows overlap of processing and I/O, with notification of I/O completion. A process can perform multiple I/O requests and allow the kernel to handle the data transfer details. As discussed here, this allows asynchronous I/O to boost application performance greatly, as well as reducing CPU and memory footprint.
To the best of our knowledge, none of the existing open source stream processors support asynchronous I/O applications. In terms of parallelism, most of them have concurrency on the task level (a task usually processes a single workload after grouping). Flink is the only processor that support parallel instances of a task, but the ordering and checkpointing semantics are not specified.
Asynchronous processing and multithreading model
For the reasons listed above, Apache Samza will now provide first-class support for asynchronous processing. The new Samza asynchronous task API uses a callback-based approach to support asynchronous I/O. This allows easy integration with most async I/O libraries. For the applications using synchronous processing, Samza allows parallelism among tasks by simple configs. Samza further supports parallelism within a task, in case even more parallelism is required. If you are new to Apache Samza, this article introduces the basic concepts of Samza.
We had two requirements for the design of the Samza asynchronous API:
- Support asynchronous processing with non-blocking I/O calls.
- Support various concurrency libraries, such as Akka (actor-based), Parseq (task-based), and JDeferred (deferral-based).
To meet both requirements, we designed the API using the most primitive construct in concurrent programming: callbacks. Sequential execution can be implemented using synchronous callbacks, and asynchronous execution can be implemented using asynchronous callbacks triggered by the user thread. Furthermore, callbacks can be seamlessly integrated with concurrency libraries by invoking the callback after the concurrent computation is done. For example, multithreaded execution can be achieved by running the tasks in a thread pool and invoking the asynchronous callbacks upon completion.
The full AsyncStreamTask API is defined here. Below we use an example task to illustrate the API:
In this example, the task makes an asynchronous request to some remote service (http://example.com/resource) using Jersey client, and triggers the task callback from the Jersey callback invocation thread. The callback will notify the Samza engine that the message has been processed completely by the task, and the next message will be dispatched for the task to process (assuming task max concurrency is 1).
In Samza, the event loop in a container is responsible for running multiple user tasks for consuming/producing messages, windowing, and checkpointing. (For more information on windows and checkpoints in Samza, see this article). To support callback-based asynchronous processing, we implemented a brand new event loop. The following flowchart illustrates how the event loop works for one task:
The event loop works as follows:
When a message event (1) comes from the system consumers, Samza will check Cond 1. Internally, Samza keeps a counter of the outstanding callbacks for each task, and invokes 1a task.processAsync() if the counter value is less than the max concurrency allowed. When the user task finishes the asynchronous process, it will trigger callback in the user thread. Then in 1b, the callback will notify the event loop and update the counter.
When a window timer event (2) comes, Samza will check Cond2 to make sure there are no outstanding task actions. Then Samza will invoke 2a task.window(). When the user window function completes, the event loop will be notified as in 2b and continue to process future events.
When a checkpoint event (3) comes, Samza will do the same check as for a window event (2). Then Samza will invoke 3a checkpoint for the task. When the checkpoint completes in 3b, the event loop will be notified and continue to process future events.
Loop 1 to 4 runs until all tasks have outstanding actions (processAsync, window, or checkpoint). Then the loop will block itself until the next task becomes available to handle new events.
In order to provide useful asynchronous processing in Samza core, we designed the following semantic models and guarantees. With them, the user job is relieved from error-prone synchronization so that it can truly focus on the processing itself.
With the above processing model, Samza provides multiple levels of parallelism. If the callback is invoked synchronously, all tasks will be run in the same event loop thread. In this scenario, the model provides the traditional Samza container-level parallelism. If the callback is invoked asynchronously, Samza supports task-level parallelism. Tasks can process in parallel without blocking each other. This is because the invocation of processAsync in one task doesn’t require waiting for the callback/window/commit from other tasks. Furthermore, Samza supports within-task-level parallelism. This is achieved by allowing multiple processAsync invocations for one task without waiting for the callbacks to complete. The max concurrency, configured by task.max.concurrency, is enforced when scheduling the next processAsync inside the event loop.
It is also straightforward to run the synchronous tasks in parallel with multithreading. Samza ships with a built-in thread pool which makes multithreading work out of the box. Without any code change, the user can simply configure the property job.container.thread.pool.size and then the tasks are executed in parallel. However, asynchronous I/O is the prefered way to access remote data due to superior performance and better scalability for more parallelism.
Let’s discuss the ordering guarantees under different scenarios of parallelism. For parallelism on the container or task level, messages are guaranteed to be processed in order. For parallelism within a task, Samza guarantees processAsync will be invoked in order for a task. The processing or completion, however, can go out of order. With this guarantee, users can implement sub-task-level data pipelining with customized ordering and parallelism. For example, users can use a keyed single thread executor pool to have in-order processing per key while processing messages with different keys in parallel.
Samza guarantees it will only checkpoint the messages that are completely processed. Samza uses a low watermark model for checkpointing. It maintains a queue of the completed callbacks, sorted by the callback sequence number. When checkpointing happens, the max offset of the contiguous callback sequence (the low watermark) will be committed. The head of the queue will advance to the next callback for the future checkpointing.
Memory visibility and happen-before semantics
The following semantics are guaranteed in the above processing model:
Event processing within Samza is thread-safe. You can safely access your job’s state in the local RocksDB key-value store, write messages, and checkpoint offsets in the task threads. Any other state or code shared between tasks, e.g., global variables or static fields, is not thread-safe if it can be accessed by multiple threads. Samza guarantees the mutual exclusiveness of process, window, and commit so there will be no concurrent modifications among these operations and any state change from one operation will be fully visible to the others.
Each processAsync is guaranteed to happen before the next invocation of processAsync of the same task. If task max concurrency is 1, the completion of the processing is guaranteed to happen before the next invocation of processAsync of the same task. If task max concurrency is greater than 1, there is no such happens-before constraint.
window is called when no invocations to processAsync are pending and no new processAsync invocations can be scheduled until it completes. Samza guarantees that all previous processAsync invocations happen before an invocation of window. An invocation of window is guaranteed to happen before any subsequent processAsync invocations. The Samza engine is responsible for ensuring that window is invoked in a timely manner.
checkpoint is guaranteed to only cover events that are fully processed. It happens only when there are no pending processAsync or window invocations. All preceding invocations happen before checkpointing, and checkpointing happens before all subsequent invocations.
Samza has made the first attempt to bridge the gap between stream processing and asynchronous I/O for remote data access, which is commonly found in modern application/service asynchronous application/service platforms. Samza provides support for different granularities of parallelism, from the container-level to concurrency within a task. Samza also supports both in-order and out-of-order process. Finally, Samza provides practical semantic guarantees to reduce the user job complexity.
Thanks to the Samza engineering teams at LinkedIn for the invaluable help with the design and implementation of this feature. Thanks to all Samza customers for their feedback on the use cases of remote access in practice.