Stream Processing Hard Problems Part II: Data Access
August 22, 2016
This post is the second in a series of posts that discuss some of the hard problems in stream processing. In the previous post, we explored the use of lambda architecture in stream processing and discussed techniques to avoid it. In this post, we’ll focus on one of the main bottlenecks in high scale stream processing applications: “accessing data.”
Before we dive into why data access is a hard problem in stream processing, here is some background information. At LinkedIn, we develop and use Apache Samza as our stream processing framework, Apache Kafka as our durable pub-sub messaging pipe, and Databus (and its next generation replacement) for capturing change events from our databases. Our streams infrastructure team gets feedback from application developers across the company (and from the open source community) on scalability, reliability, usability, and other problems that they encounter in their production applications. The learnings and techniques described in this post are in essence a summary of problems that we have faced so far and our efforts to address them. Although this post doesn’t require deep knowledge of Samza, a basic pre-read of Samza might help.
Data access requirements
Let us now dive in and explore why data access is challenging at scale. There are two main data access patterns we deal with:
Here is a scenario at LinkedIn that illustrates the read-write data access pattern. Many applications at LinkedIn need to reach out to members, either through email or notification. To make sure our members don’t get too many emails, we have a Samza-based application called Air Traffic Control (ATC) that is responsible for controlling email and notification delivery to end users. For each member, ATC keeps track of the last time an email was sent to a member and of all the new email requests to send mail to that member in the future. In essence, ATC maintains (reads/writes) state for every LinkedIn member.
Read-only data (adjunct data):
To illustrate this pattern, let us take an example where we want to build an application that listens to events generated every time a LinkedIn member clicks on an ad. This application needs to generate an AdQuality event, which highlights some of the features of the member who clicked on a particular ad. This AdQuality event would then be used for training a machine learning model that recommends ads. To process an AdClick event, the application looks up the member profile of the member who clicked on the ad. In this post, I will refer to such read-only access as accessing “adjunct” data.
Key attributes of the data being accessed
In addition to the two patterns described above, the following two key attributes of the data being accessed will have a big impact on your event processing architecture.
Is the data access “partitioned”?
In the scenarios described above, the input Kafka topic is partitioned on MemberID. To process these input events, the application looks up the member profiles dataset. The key point here is that since the input topic is already partitioned based on members (MemberID), every event processing node only accesses a mutually exclusive set of members. Later in the post, we will see how we are able to perform various caching optimizations when we have such ‘partitioned data access.’
Let us slightly augment the scenarios described above. Let us assume that to process each event, we also have to lookup the Company database to get more information about the company where the LinkedIn member works. Now, every event processing node would need to look up potentially every company. This would be an example of a non-partitioned access to the data.
How large is the data set?
When we compare solutions below, you will observe that the way you would scale your solution to access a 5MB dataset could be very different than what you would do to access a 5TB dataset. For example, you can can store a 5MB dataset its entirety on every processing node. However, you wouldn’t do that with a 5TB dataset.
Data access solutions
The following image illustrates the two broad data access solutions.
Using a remote store: This is the traditional model for building applications. Here, when an application needs to process an event, it makes a remote call to a separate SQL or No-SQL database. In this model, write operations are always remote calls, but reads can be performed on a local cache in certain scenarios. There are a large number of applications at LinkedIn that fall into this category.
Another pattern is to use a remote cache (e.g., Couchbase) that is fronting a remote database (e.g., Oracle). If the remote cache is used primarily for reading adjunct data, then applications use an Oracle change capture stream (using Databus) to populate the remote cache.
Using a local (embedded) store: This is not the traditional model. The idea here is that the state required for processing the events is co-located on the same hosts (machines) where the events are processed. In essence all state access is local, and hence, highly efficient.
Samza has first-class support for using a local embedded database. Samza supports embedding RocksDB into your event processor. This state is backed up by a Kafka log compacted topic, which allows the state to be resilient against any machine failures.
There are other frameworks, like Microsoft ServiceFabric, that have built-in support for local application state. ServiceFabric supports the notion of durable collections, which are stored in the local disk but are replicated among the processor instances for durability. It also lets these durable collections be automatically backed by Azure storage, which is geo-replicated and can support a much higher level of durability.
Where the event arrives versus where the event gets processed
The discussion of local versus remote storage is relative to where the event gets processed. Depending on the framework you use, the events might arrive on a different node compared to where the event gets processed.
Several frameworks, like Google Dataflow, support reading from non-partitioned input sources like Google Pub-Sub. In this model, upon event arrival, the processing runtime first figures out which processing node should process the event and then sends or redirects the event to the appropriate node for the actual processing. Microsoft Orleans is another example of this approach. Orleans exposes a virtual actor-based programming API whereby upon arrival an event first gets redirected to the appropriate node where it then gets processed.
Stream processing frameworks like Samza, Spark Streaming, and Flink inherently understand the partitioning of events in sources like Kafka, AWS Kinesis, and Azure EventHub, and hence don’t require an additional redirection before the events get processed.
If your application is bottlenecked by network bandwidth or compute power, the ability to process events on the same node where they arrive without an additional redirection can be an important performance optimization.
Independent of this redirection of events prior to processing, the discussion in this post regarding accessing data during event processing is still applicable for all event processing frameworks.
Data access solutions: Details
It is easier to compare the solutions by considering separately if the data being accessed is partitioned or not.
Partitioned data access
If the data access is partitioned, both the solutions (remote versus local data access) work well. Most applications at LinkedIn fall into this category.
Using a remote store
Let us explore how a remote store can be used efficiently for accessing read-write and read-only data.
Using a remote store for read-only/adjunct data
Distributed No-SQL databases (e.g., Espresso or Cassandra) can typically be accessed directly from the event processor without a cache in the middle. For a non-scaled out database (e.g., Oracle), it becomes important not to hammer the database directly from the event processor. Typically a remote cache, such as Couchbase, is used to protect the remote database and improve processing time. The following image illustrates how at LinkedIn we use change data capture from Oracle using Databus to keep the remote cache populated.
It is also possible to use a local LRU based read-through cache embedded with the event processor to speed up read even further.
Using a remote store for read-write data
As explained above, with partitioned data access, you can assume that each node is accessing a mutually exclusive set of keys. This makes it possible to have a local embedded cache in front of your remote store in many scenarios. If you have to perform a write, you would update both the remote database and also the local cache. All reads could be from the local cache. This model would only work in scenarios where the stream processor is the only writer to the remote database. This model has another issue to consider. During an application upgrade or process recycle (for whatever reason), you would have to rebuild your cache. During this time, in addition to having an increased processing latency, there could also be a significant amount of load on the remote database, which you would have to provision for. This might still be acceptable for some workloads (especially when the size of the dataset is small). Typically a negative local cache has to be created as well to keep track of keys that do not exist to avoid cases where events in the input stream refer to records that don’t exist in the remote DB.
Using a local store
Let us explore how local state can be used for accessing both read-only and read-write data.
Using a local store to access read-only/adjunct data
The following image illustrates how we use a local RocksDB database to look up adjunct/read-only data.
In the above example, both the AdClick Kafka topic and the member database updates in Databus are partitioned using the same key: “MemberId.” However, since the Kafka topic and the member database have different load characteristics, the number of partitions of the Kafka topic doesn’t match the number of partitions of the member database. As a result, we use another Samza job to re-partition the member updates stream to match the same number of partitions as the AdClient event stream. The AdQuality processor has logic to read from the re-partitioned member update stream and populate a local RocksDB database. With this, processing the AdClick event will entail reading the embedded RocksDB-based member database as opposed to making remote reads from the Member database.
The above pattern is very efficient. However, it takes some work to wire up the above model. In contrast, it is trivial for a developer to just make a remote call to the Member database.
Some production jobs at LinkedIn have a few TB of partitioned RocksDB-based local state. A few TB of state is fairly large. However, if your dataset is on the order of 100TB, then you probably want to use a remote database, which will allow you to scale your database independent of your event processing tier. It is much easier to share a remote database cost effectively among many stream processing applications.
Using a local store for read-write state
It is very easy to use a local store for read-write state in Samza, which is backed up using a durable log compacted Kafka topic. This works well for both simple applications where you might be doing an aggregation (like sum, average, median, etc.) over a window of time, or for performing more complex joins over multiple input streams (e.g., joining an AdViewStream with an AdClickStream).
Local state can also be used for general purpose stateful event processing applications, like ATC (mentioned above). The ATC application uses RocksDB to store the last time we contacted each member and new application requests to contact members. It is then able to send a summary update to the members (in our effort to improve member experience).
In summary, embedded state in an event processor is useful to store intermediate application state. However, most event processing applications will store their final results in a separate remote database/data warehouse. It is not common to expose the embedded read-write RocksDB state of an event processor to queries from outside of the stream processing application.
Non-partitioned data access
If you have a read-write data access pattern and if the data access is not partitioned, then by definition you need a shared (remote DB) store across all the processing hosts.
If you have a read-only data access pattern, then you could have a local embedded cache in front of a remote store, or have a local store if the dataset is small and fits on every processing host—for instance, if the Company dataset could possibly fit on every processing host. However, if the Company dataset is large, then having a remote database, possibly with an external shared cache (e.g., Couchbase), would be the way to go.
Essential framework features for efficient data access
Incremental checkpoints versus full snapshots
Stream processing frameworks periodically (e.g., every 60 seconds) checkpoint the runtime state of the application to avoid a large restart time if the application is restarted. Many frameworks (e.g., Spark Streaming/Flink ) currently don’t update the state incrementally. In other words, they save the entire application state on every checkpoint as opposed to just saving the state that got modified from the last checkpoint. This might be okay when the state is trivial, but once the application state is non-trivial, this model doesn’t scale. On the other hand, Samza saves only the incremental/delta changes on every checkpoint. This makes it ideal for applications that have a large runtime state.
Async versus sync processing
Most of the popular open source stream processing frameworks today have a synchronous processing model. This means that accessing a remote store or making a service call in the middle of event processing can be a big bottleneck. Most of these frameworks also couple the max event processing parallelism to the parallelism in the input stream (e.g., number of Kafka partitions). Some applications work around these limitations by re-partitioning the input streams. However, this is not always reasonable. For example, if you are processing 1MB per second, it is not reasonable to to have a 500-partition Kafka topic just to get 500 processing threads. Other applications manage their own processing threads and take full control of checkpointing in the application. This approach, however, increases the development cost and code complexity of the application.
To address these issues, we have built a truly async and parallel event processing model that will be part of the Samza 0.11 release(available Sept 2016). This improves application performance when accessing remote databases or services from a stream processor as now the remote calls can be pipelined in a non blocking fashion. Not all application developers are comfortable with the async programming paradigm. To achieve better parallelism within each container for synchronous processors, Samza users will be able to configure the number of processing threads. When using this feature when applications try to parallelize or pipeline processing within a task, Samza will preserve its processing guarantees, by making sure that checkpoints only cover events that are fully processed.
Performance comparison between local and remote state
A remote database works very well when the reads/writes per second are not too high (e.g., < 10,000 requests per second). But as the requests per second goes up, this model becomes expensive in terms of hardware cost.
Local embedded databases, in contrast, work very well even as the throughput needs go up. In performance tests, we have been able to process 1.1 million requests per second on a single SSD based machine. The state of this job was backed up by a log compacted (and replicated) topic in 3 node Kafka cluster. When the same Samza job was converted to read from a remote database, we got less than 10,000 requests per second with an Espresso cluster with 3 SSD-based storage nodes. A 100X difference in performance is hugely significant.
This comparison does depend on the type of NoSql database and how it is being used. There are many things at play with remote database access. Some notables include:
In the above performance test, we expected high data consistency (e.g., read after write guarantee). If you relax the data consistency requirements of read-write operations, then you can use eventually consistent databases, which can have better performance characteristics. For example, we were not using the Espresso feature that allows reads to be performed on replicas.
In this test we used synchronous i/o and didn’t use batching. If the test were to be rewritten with async i/o and batching, I would hope to get a big improvement in performance.
The choice of the storage engine makes a big difference. In the above comparison it is worth noting that Espresso uses MySql with InnoDB. Some key-value data systems use simpler storage engines, which trade off performance for other capabilities.
The characteristics also change dramatically depending on whether the dataset is small enough to fit in memory in the database cluster.
Even if the remote database access were 10x faster, they still fall short of the performance achieved by using local state.
What does it take to use local state in your production applications?
Looking at the performance numbers with local state, one might be tempted to use local state in every stateful stream processing application. In this section, I will discuss some of the key lessons that we have learned and implemented based on our experience with local state in Samza for some of the high throughput stream processing applications in LinkedIn. After reading this section, hopefully it will be clear that although local state is great for performance, there are many things that your stream processing ecosystem has to support to make local state feasible.
Application or infrastructure maintenance
Once a stream processor has an embedded database, it is critical that, during application or infrastructure maintenance, the data is not reshuffled across the cluster. If the processing tasks do not remain sticky to the host where they were previously running, then the runtime state has to get reseeded from the remote backup.
There are a few things that Samza does to make sure runtime state doesn’t get reshuffled during upgrades:
Samza durably records (in Kafka) the hosts where a job is executing. When a Samza job is restarted, Samza requests YARN to allocate the same set of hosts where the job was originally executing.
The embedded RocksDB database can be associated with each processing unit (“task”). Samza stores the database outside of the scope of YARN so that the state can live independent of the app lifecycle. A cleanup agent deals with garbage collecting the databases corresponding to applications which have been removed completely from the cluster.
Samza maintains a durable mapping of container→task. With this, when the application is restarted on the same set of hosts, Samza makes sure that the tasks restart on the same hosts where they were running before. The application resumes from where it left off without having to reseed any state from the durable state backup (Kafka).
Reseeding of application state during a restart is not a big problem when the state is trivial in size, but as the size become bigger, reseeding state will cause a large pause in your stream processing application. More information on this feature can be found here.
Auto-scaling the processing tier
If a Samza job has local state, then the addition or removal of hosts in the Samza cluster has to be handled with care. We don’t want the processing units (tasks) to be redistributed across all the nodes, as doing so will cause all the application state to move. Instead, what Samza does is carefully re-allocate the minimum set of tasks to the new processing containers that have been added. This allows you to scale your Samza job with local state without causing outages. This approach to minimizing the amount of state that moves is also useful in dealing with the case where a machine in the cluster dies.
In order to avoid pauses when you expand the number of containers in a Samza application, there also have been discussions in the Samza community about starting a new Samza container first in ‘standby’ mode, where it would reseed the data for its assigned tasks in background and it would be changed to ‘active’ once the state is caught up. I am hopeful that this will materialize at some point in the future.
As you can see, auto-scaling a stream processor which has local state is not as simple as auto-scaling a stream processor which keeps all its state in a remote databases.
Repartitioning of the ingestion pipeline
Local state just doesn’t work if you change the number of partitions in your partitioned input source (e.g., Kafka/Kinesis). This is because all the data in the system will have to be redistributed based on the new partition to key distribution. Today, we expect Kafka topics to be appropriately partitioned from the outset, with consideration of future scale needs. However, this can be a difficult policy to adhere to. To address this, we have started working on a solution that will allow you to expand capacity of your Kafka topic without impacting stream processing applications that use local state.
Dealing with disasters: Backup and restore of your data
Disasters happen. We just need to have a way of dealing with them. A key feature of any respectable (SQL or No-SQL) database system is the ability to backup the database periodically and save it away. In the event of a disaster (e.g., someone accidentally deletes the database), you can restore from your last backup and you are back in business (with possibly some minimal data loss). This is clearly a big attraction for application users to use a remote database (MySql, Oracle, etc.) that is being periodically backed up.
In the case of Samza, we use local RocksDB state, which is backed up by a Kafka log compacted (and replicated) topic. So we have to worry about the situation that could occur if the Kafka topic is accidentally deleted. We would need a mechanism to continuously copy your Kafka log compacted topic reliably over to another safe location. Alternatively, many Samza jobs are setup to run in two data centers, so that if one data center has a problem, the other one takes over seamlessly.
We are working on making improvements in Samza to make it easier from an operational standpoint to handle disasters.
Remote state continues to be the more common pattern for accessing both read-only and read-write state from a stream processing application. This is because not all stream processing applications need to process tens of thousands of events per second. When accesing remote state, using an async processing model and performing async i/o in the stream processor can significantly improve efficiency. Having partitioned access to remote state allows you to use a local cache, which can dramatically improve read performance.
Local state can be very attractive in running high scale applications at fairly low cost. As described, we have put in a lot of work to make local state practical to use, and we have seen good success in using this pattern in several large scale applications at LinkedIn.
If you are interested in stream processing, please join us at our next stream processing meetup, where we have exciting talks on the latest developments in Kafka, Samza, and their surrounding ecosystem.
We are also hiring! If you are passionate about building large-scale distributed streams infrastructure, then please reach out to me or one of the members of the LinkedIn Streams Infrastructure team. You can find more information on the work being done in our teams in our previous blog on what’s new in Samza and our blog on Kafka improvements.