Stream processing with Apache Samza - Current and Future
January 26, 2016
At LinkedIn, events pertaining to application and system monitoring, member behavior tracking, inter-application communication, etc., are all ingested into our pub-sub messaging system (Kafka). A staggering 1.3 trillion events are published into Kafka per day with peaks of 4.5 million messages/sec per cluster (these numbers don’t correlate directly with site traffic). Another super-critical source of events often overlooked in discussions of stream processing, are database updates. We use Databus as our change capture system to make these updates available for downstream processing. For example, every time a LinkedIn member changes his or her profile, a ton of downstream applications need to know what happened and react appropriately. Currently we process more than a trillion events per week out of Databus.
Apache Samza is the open source event/stream processing platform we use at LinkedIn to process this deluge of events in real time. Several other companies like Netflix, Uber, MetaMarkets, Intuit, etc. also use Samza (a top level Apache project) extensively for their stream processing needs.
In the past we have written about the rise of Samza at LinkedIn. A lot happens in the course of a few years. In this post we will discuss the current state of Samza at LinkedIn and some of the awesome improvements that have been made as part of the new Samza 0.10 release. We will also discuss the main new areas where we expect to focus on Samza over the next year.
Big thanks to the 37 distinct contributors in the open source community for submitting patches in this new release!
How is Samza used to improve LinkedIn’s real-time user experience?
Here are some of the applications that use Samza at LinkedIn:
- Air Traffic Controller: It is critical that our members have a good experience with the emails and notifications they receive. As explained in detail in this post, instead of every application at LinkedIn sending emails independently to members, all emails to members are funnelled through a centralized email distribution Samza job that aggregates all email requests per member and sends a summarized email to the member based on certain policies and windowing criteria.
- Ad Relevance: Samza is used in this application to calculate click through rate (CTR) for the ads. In this fairly typical CTR application Samza is used to join an ad-click events stream with an ad-impression event stream and aggregate on many dimensions to calculate CTR.
- Feed: Relevant content in the news feed is a great way for our members to stay informed about their professional interests. Fresh, relevant content is even better! We use Samza to determine feed quality for each member by joining an event stream that has “user view” events, with another event stream which has all the feed items that were served to the client. Billions of events are processed each day in this application. More details can be found here.
- Standardization and Relevance pipeline: In a previous post we discussed details of a standardization and relevance platform at LinkedIn which is built on top of Samza. For example, a member coming into LinkedIn can mark his title as “chief data nerd.” The title standardizer uses machine learning models and rules to standardize the title for use in job and other ad recommendations.
- Monitoring impact on site speed during A/B Testing: This application is used to monitor metrics in real time during an A/B test with the goal of observing the impact on site speed over 10-minute windows during the test and potentially generate alerts based on violations.
- As a transformation pipeline: In order to integrate with existing data systems like our Graph engine and Pinot (analytics database), developers at LinkedIn use Samza as an easy way of doing schema transformations and simple aggregations to generate events into their Kafka-based real-time event input stream.
- Call Graph: As explained in this previous post, every single member activity on LinkedIn generates many service calls. Every service call is tracked in Kafka. This Samza application aggregates all service call events based on a unique ‘TreeID’ for the specific member activity. This job is used to generate reports on site health at LinkedIn. This is an old application, but it is interesting because it processes a million messages a second.
- Security: As you can imagine, we have built security infrastructure at LinkedIn to monitor and protect the site against denial of service and other security breaches. This set of Samza applications analyse the stream of events to detect security violations in real time and generate alerts and perform corrective action.
In addition to the above there are several other applications of Samza at LinkedIn ranging from general event processing, to a news relevance pipeline, to online machine learning.
What are the main advantages of using Samza?
There are so many other event/stream processing systems out there. Most of the systems provide a horizontally scalable model for event processing.
1. An efficient model for building high performance stateful applications
Most event processing applications need to either read data from a remote database or need to maintain internal state to produce results. In our experience, the primary bottleneck in these applications is either the I/O and or CPU spent in performing the I/O.
Let us split this discussion into two parts.
a. Managing state that is derived as a byproduct of the event processing.
Let us take the scenario of “controlled email distribution to members.” The goal is to ensure that our members have a good experience with emails. So all emails to members are funnelled through this central Samza application. Previously if a member got ten independent connection requests,they used to get ten emails. Now we keep track of all new connection requests for each member and then periodically send one aggregated email summarizing all the connection requests. To achieve this, we have to keep temporary state about each member’s activity and trigger emails at specific periods.
There are multiple ways to build this.
- Using a remote database. We could update the count and other information of connection requests per member in a remote database. This model can be useful in the case where the throughput is low enough. However in scenarios that require very high scale, the remote access on every call is going to kill performance. Independent of performance, provisioning a DB for this kind of high temporary read-write data is expensive.
- Using the local RocksDB store provided by Samza. Samza has built-in support for an embedded store that is maintained per Samza task. Using the built-in RocksDB store, we can maintain the information for every member locally on the Samza processor till the job is ready to produce the aggregated email to be delivered to the end. At our scale, the Samza model works great.
b. Accessing “adjunct data” to process events.
Let us consider this example: when a member views a web page, it generates a pageView event. In order to process this event, the first step might be to access the member’s profile data and potentially annotate the event before the event is processed further. In this article we will refer to this as “accessing adjunct data.”
One possible approach would be to make a direct call to our member database as part of processing the events. In addition to the potential performance issues, this approach also exposes our member database to a very high read per second workload. This puts our site performance at risk if the event processing application has to work through a big backlog.
The solution that we prefer at LinkedIn is to maintain a minimal partitioned clone of the member database alongside the Samza processor. Essentially the Samza job listens to the change stream from the member database (using Databus) and copies the changes to the local RocksDB store. As a result, in order to process the pageView events the application now just has to do a local RocksDB lookup. This model works efficiently for partitioned NoSQL databases (e.g. Espresso, DynamoDB etc.) as the local RocksDB copy is also partitioned across the Samza job. This approach also work well for non-partitioned but small databases (say 50MB) as the overhead of keeping a copy of the database on every Samza processing machine is low.
In many scenarios the local RocksDB “adjunct” data is populated from a Kafka topic which receives data derived offline in hadoop/spark.
Our performance team has explained the performance tradeoffs pertaining to how the state is managed in your Samza application in this article.
2. Ability to decouple the application logic from the underlying message transport.
There are many event sources in most companies. At LinkedIn, both Kafka and Databus are critical sources of events/data. To enable this, Samza has a first class notion of pluggable message sources (consumers) and sinks (producers). At LinkedIn we use the Kafka and Databus system consumers. Other companies use event sources like MongoDB change log, ZeroMQ or AWS Kinesis directly with Samza. This ability to keep the business logic independent of the underlying message transport apis is critical.
The alternative approach is to explicitly move data from all different sources into a standardized messaging system (e.g. Kafka) before processing. This works very well in many scenarios, but in other scenarios the additional hardware cost, operational complexity and increased runtime latencies of this approach is not acceptable.
What’s new in Samza?
Here are some of the top features that are now available with the latest Samza 0.10 release.
1. Host Affinity and State reuse
Stateful jobs are a key focus area in Samza. But to really have a stateful job in production we have to think about how the job is going to behave during upgrades, temporary application or machine failures or during application maintenance.
Specifically, during application upgrades, the Samza containers get re-distributed across the machines in the cluster. This means that any local state that was accumulated in RocksDB by Samza has to be re-seeded from Kafka. Once the state is reasonably big, this can take a long time (some of our applications took 15-20 minutes to reseed the state). For a real time application, this doesn’t work. In addition when a large amount of state has to be read it can saturate the network and possibly impact other applications in your production datacenter. In addition to upgrades, this issue occurs when a single machine fails or gets recycled or if a samza application is stopped temporarily.
With the new “host affinity” feature, when a Samza job starts, the mapping from Host to Samza container is stored durably in what we call the “coordinator stream.” When the Samza container is stopped (during restart), we cleanly close the RocksDB store and store an offset file indicating that the rocksDB is consistent as of a particular offset in the changeLog. During a restart, we request YARN to allocate the container in the same machine as it was running earlier.
When the job comes up, it checks if there is existing local state in RocksDB and whether the RocksDB store was “cleanly” closed. The job is now able to continue where it left off and reuse all of its accumulated local state.
2. Broadcast Stream
There are some scenarios where you would want to to broadcast events/messages to the entire Samza application. You might want to dynamically change the behavior of the entire application by changing some “configuration parameters.” Prior to “Broadcast stream” there was no easy way to broadcast such dynamic changes to the entire application without restarting the Samza application.
This has been contributed to Apache Samza by the open source community.
3. Coordinator Stream
Job configuration was passed to the job over the command line as an environment variable. This implied that the size of the configuration was bounded. Several jobs were bumping against the limit. The other issue was that the configuration was static and any change in config required the job to be bounced.
In addition as explained in the section on “Host Affinity,” there was a need to persist “metadata” pertaining to the running job durably. For example, the mapping of host->container->task was required to be stored durably for the ‘host affinity’ feature described above.
In this new release we have a centralized durable stream called “coordinator stream” which can be used to store configuration and other relevant system information. In essence, the job configuration is stored durably (for e.g. into a Kafka log compacted topic). This allows a central place where dynamic configuration about the job can be changed. Each config element is stored as a separate message in Kafka. With this model the size of the configuration of the job can be arbitrarily large.
4. RocksDB TTL
Many applications in Samza have a requirement to clean up/expire data in the state store. For example, some applications use the state store as a durable cache. The RocksDB TTL feature provides automatic expiration of old data in Samza’s state store. In the absence of this feature, applications that needed to write cumbersome and often inefficient logic to periodically clean up old data.
5. New System Producers for HDFS and ElasticSearch
Samza now has first class integration to write the output of the application directly to HDFS and ElasticSearch. HDFS producer and ElasticSearch producer were contributed by the Open Source community!
The following features have already been developed and will be open sourced in an upcoming release of Apache Samza.
6. Seamless upgrade of Samza Framework
Samza is currently designed to operate as a multi-tenant stream processing service.
One of the big pain points right now is that Samza developers have to package Samza into their application and completely manage deployment of Samza. The issue here is that it is a lot of work for developers to keep on upgrading to newer versions of Samza to pick up new fixes and features. This model doesn’t work at LinkedIn as we have grown to a large number of applications.
In this new model, the applications (if they so choose) will no longer have to package Samza as part of their application. Instead Samza binaries will be pre-installed on the Samza machines. The applications will load the version of Samza that is pre-installed in the cluster.
7. Start and Stop Capability
As the name suggests, we are working on exposing a REST API to allow the ability to start and stop individual jobs. Often times, we find that a Samza job is not behaving the way it should and needs to be stopped and restarted. This REST API will let you perform these operations (and more in the future) without requiring you to understand YARN.
8. Static partition assignments to enable non-YARN deployments of Samza (e.g. EC2)
Netflix has recently contributed this feature that allows Samza to be used without YARN by having a static set of partition assignments for the containers in a Samza job. This cool feature enables Netflix to run Samza applications in AWS EC2 instances without any YARN dependency.
What’s coming next?
The Samza team at LinkedIn has spent most of the last two years working to operationalize Samza and make it suitable for production workloads. Here are some of the improvements we will focus on in 2016.
1. Samza as a library
Samza currently depends on YARN. Several companies, however, use Mesos, Kubernetes and other cluster management systems. Also, if a company is running in a cloud computing platform like AWS EC2 /Microsoft Azure Compute/Google Compute, a cluster manager like YARN/Mesos might not be required at all.
To address these issues, the idea is to decouple the coordination logic in Samza from the YARN hosting logic. The static partition assignment feature that has been already developed and is mentioned in the above section, does take a step in this direction. However it is still very complex to configure and setup. This effort will make it seamless to run a Samza application in a compute platform of your choice.
2. Allow concurrency within a process
Samza currently ensures that separate applications live in separate containers (processes). Each process has a single thread of execution. The process isolation between applications makes the system more reliable and also makes it easier to debug. We surely want to retain this aspect. However the model of only a single thread per process is not very memory efficient from the point of view of a single application.
For example, many times Samza jobs have to perform I/O to push results/data to databases. To perform more parallel I/O, applications end up allocating a lot of containers. The issue is that the memory corresponding to the JVM and other objects is not shared among processes. Also in many occasions, all tasks in the Samza job might need to look up a shared system-wide cache or store. By having many containers, the caches also have to be wastefully duplicated in every process.
To address this issue, we work towards allowing for more concurrency within a single process in Samza.
3. First class support for DAG (Directed Acyclic Graph)
Many Samza applications comprise of a set of linked Samza jobs. Today a Samza developer explicitly creates the links between Samza jobs. To simplify the developer’s experience, we would want Samza to have a simple way of expressing a DAG. This would not only help simplify the configuration and creation of multi-stage event processing pipelines, it would also make them easy to deploy, monitor, and upgrade.
4. Window and Join operator
Currently applications have to write plumbing logic to perform joins across streams over a configured window of time. The time window could be either based on when the event arrived at the stream processor or when the event actually happened. Doing the processing based on event arrival time can sometimes lead to inaccurate output since events can arrive late as a result of delays in the event transportation pipeline. If the application is okay with ‘cutting the window’ based on when event arrival time, then the plumbing logic is fairly straightforward. However, if the application requires accuracy of results, then the plumbing logic required to do the processing based on event creation time is non-trivial. As a result, many Samza applications at LinkedIn that require accuracy end up using Lambda architecture. This means they periodically process the entire dataset in a batch processing system (Hadoop/Spark). Note that doing offline processing in Hadoop without accounting for event transportation delays would also produce inaccurate results. However, given that typically such batch jobs are run over larger windows of time (one hour or one day), the inaccuracies happening at the edges of the window are less visible (and sometimes more acceptable).
We have started working on a Window and Join operator in Samza which will enable Samza applications to generate accurate results even in the presence of late event arrivals. With this, application developers will no longer have to duplicate their logic in an offline batch processing system just to fix accuracy issues in their real-time event processing application.
Thanks to the Samza engineering and SRE teams at LinkedIn for helping to review this blog post and for all their contributions over the last year. Thanks to all the Apache Samza contributors for making it a better product. Finally, a big thanks to all Samza customers at LinkedIn and outside for their die-hard support and feedback!