Stream Processing Hard Problems – Part 1: Killing Lambda

June 27, 2016

We live in an age where we want to know relevant things happening around the world as soon as they happen; an age where digital content is updated instantly based on our likes and dislikes; an age where credit card fraud, security breaches, device malfunctions and site outages need to be detected and remedied as soon as they happen. It is an age where events are captured at scale and processed in real time. Real time event processing (stream processing) is not new, however it is now ubiquitous and has reached massive scale.

There are many hard problems in stream processing. This is the first in a series of posts where I will discuss some of the important problems that we have faced and are trying to solve at LinkedIn.

In this post, I will focus on the main reasons why people routinely use Lambda architecture in stream processing applications and suggest alternatives. There has been a lot of prior material explaining Lambda, so I will skip going into the basics here. Lambda solves some important problems for stream processing applications. However, there are some key issues with the Lambda architecture: for example, the duplicative development effort in building the hot (nearline) and cold (offline) paths of their processing pipeline, additional overhead of reprocessing, the overhead of merging the results of the online and offline processing before serving.

It should be noted that this post is not meant to cover offline data analysis scenarios where existing Hadoop and Spark-based batch solutions work well. Although LinkedIn uses Apache Samza for stream processing most of the discussion in this post is applicable to other streaming systems as well.

Let us dive into the reasons why developers lean towards using a duplicated (nearline+offline) processing model.

1. Making stream processing generate accurate results

This is a well-researched area and there are some great papers written on this subject. However, it is not always easy for people to understand why stream processing doesn’t always generate accurate results. To make the problem obvious, let me zoom into an example of how this problem manifests itself routinely at LinkedIn.

LinkedIn is deployed in multiple geographically-distributed data centers. To react to site issues, we transparently failover user traffic between data centers many times a week. Now imagine that we have a stream processing application which joins a stream (Kafka topic) of events generated when a user views an advertisement (AdViewEvent) with another stream of events generated when a user clicks on an advertisement (AdClickEvent). This application produces an AdQualityEvent indicating whether an ad was good or bad. The application logic could be as simple as: If a user clicks on an ad within one minute of seeing it, then the ad is good.

  • Stream-processing-1-lambda-1

As you can see in the above picture, the events from both data centers are replicated, so that the stream processor gets the superset of all events from both data centers.

Under normal operations the stream processor gets the AdClickEvent within one minute of receiving the AdViewEvent. However, when there is a data center failover, the AdViewEvent from a member session might be produced on DataCenter1, but the AdClickEvent could be produced in DataCenter2 as depicted below.

  • Stream-processing-1-lambda-2

During a data center failover like the exampleabove, we could have a “late arrival,” i.e. the stream processor might see the AdClickEvent possibly a few minutes after the AdViewEvent. A poorly written stream processor might deduce that the ad was a low-quality ad when instead the ad might have actually been good. Another anomaly is that the stream processor might see the AdClickEvent before it sees the corresponding AdViewEvent. To ensure that the output of the stream processor is correct there has to be logic to handle this “out of order message arrival.”

In the example above, the geo-distributed nature of the data centers makes it easy to explain the delays. However delays can exist even within the same data center due to GC issues, Kafka cluster upgrades, partition rebalances, and other naturally occurring distributed system phenomena.

Doesn’t Lambda Architecture fix this issue?

At LinkedIn, many source event streams get sent to both the real time Samza-based stream processing system and to the Hadoop and Spark-based offline batch processing system.

A common assumption is that since batch processing happens within a much larger window of time (e.g. one hour), the inaccuracies caused by late and out of order arrival of data usually only impact the window of time at the edges of the interval. For example, you could have late arrivals that impact the first five minutes and the last five minutes of a job that processes an hour worth of data offline in Hadoop. Is a five-minute window in a job that processes 60 minutes of data insignificant? The answer depends on the kind of application you are developing. In most cases, these inaccuracies are not acceptable. The inaccuracies become even more obvious if you run your batch job every 15 minutes or so.

At LinkedIn, to address such inaccuracies in batch processing for some of the high value data sets, we employ the following correctness checks:

For example, to process the events between 12 p.m. and 1 p.m., we would start the Hadoop job at 1:20 p.m. This gives 20 minutes for the data to be mirrored via the Kafka pipeline and moved into HDFS. The Kafka ecosystem at LinkedIn has a service called Audit Service which keeps track of how many messages each production cluster has published onto a set of topics over a period of time. Before the hourly Hadoop job starts, it queries Audit Service to find out how many events were produced into Kafka in the last hour for the topics in question. It then checks to see if the number of events that have been ingested into HDFS from Kafka approximately matches the number of events that were produced in the production cluster.

As you can see, getting accuracy out of a batch job requires a lot of care and attention. Although the above approach improves the accuracy of the batch job, it suffers from the issue that if there are delays in the pipeline then the correctness check fails and the batch job fails to run. It also forces the batch job to start running only after a period of time (20 minutes in the above example) to account for the process of moving the data into HDFS.

Instead, most users want applications to process events as soon as they are generated but also have the ability to update the results when late arrivals and out of order events happen.

So how can we improve accuracy?

The MillWheel and Dataflow papers from Google have detailed discussions on this subject. Tyler Akidau continues to explain in this blog how a stream processor can deal with late and out of order message arrivals. Most streaming platforms out there are adding plumbing to hide the complexities of late and out of order arrivals from application developers.

LinkedIn and several other companies use Apache Samza for stream processing. In addition to other key improvements in Samza, we are working on a set of core operators to make it easy for applications to do windowing and join over event streams with highly accurate results. The core logic is the same:  

  1. Store all input events for a much longer retention period in the Samza job. If the Samza application is doing pure aggregations (like average/sum of, etc.) then it is not necessary to store all the input events. Instead we only need to store the result of the aggregation for every time window.
  2. When there is a late arrival, the Samza job can re-emit the output of the window/s that are impacted by the late arrival.
    1. If it is a standard tumbling (non-overlapping) window then only one window’s output has to be re-emitted.

    2. If it is a sliding (overlapping) window, then a single event could fall in multiple windows and hence the output of all the impacted windows have to be re-emitted.

You can learn more about the details in this design document. As you might imagine, a lot of data has to be stored to recompute and re-emit the results for the impacted windows as a result of a late or out of order arrival. Some systems store all this data in memory, causing instability when the event retention time is higher than a few minutes (depending on the event rate). Some systems store it all in remote databases. This works and even scales out well. However due to the high CPU (serialization costs) and network overhead of remote calls, such solutions require a significantly larger hardware footprint. What makes Samza unique in building the above functionality is its first-class/production-ready support for “local state” with an embedded, fault-tolerant RocksDB-based key value store. More information on Samza’s state management can be found here. I also intend to focus a future post on the details of “state management.”

2. Reprocessing

Let us examine a scenario at LinkedIn that requires reprocessing of data. LinkedIn allows you to set whatever you want as your current job title. You could say that you are the “chief data nerd” at your company. We need to know what you really do in your company to offer relevant recommendations pertaining jobs, news, etc. To solve this we have a Samza-based event processor that listens to changes made to an Espresso-based profile database using our existing change capture system: Databus. For each update made by a member to their profile, it consults a machine learning model (derived offline) and makes an educated guess on what your title (and other attributes) might really be. This machine learning model gets updated periodically (sometimes multiple times a week). When the model changes we need to reprocess the profiles of all existing LinkedIn members so that they are standardized using the latest model.  

Many people do this kind of reprocessing offline using Hadoop or Spark. This implies that the core application logic has to be reimplemented for the batch processing system. If the stream processor makes calls to live services or online databases, then porting the logic to the batch system is not straightforward. Most Hadoop grids are significant in size (hundreds of thousands of machines). If they make calls to online services/databases, then they would easily max out the quotas specified for those systems and cause impact on the site. As a result, the application logic between the nearline and the batch system are not exactly the same.

Databus provides a feature that we call “bootstrapping” whereby a consumer can optionally read and process the entire database directly from a database backup without impacting the serving latency of the actual database. With Databus, whenever an application has to reprocess the entire dataset, we just start a Samza-based stream processor to read from Databus in bootstrap mode. 

The above approach does have one important aspect. The bootstrapping or reprocessing can take a couple of hours especially because we limit the overall throughput of the system. During this time we want to continue to run the online stream processor in parallel until the bootstrap processor is caught up. This does raise the question of where the resultant data set is stored. Do we have separate data stores or can we merge the result of the online processor and the bootstrap processor into one database? Both options are possible and have their own pros and cons. Having separate data stores requires a serving layer to be responsible for merging the results (similar to the Lambda architecture). At LinkedIn, we are also working on a new store for serving derived data, called Project Venice, which promises to seamlessly handle merging the output of the online and bootstrap processor. In the absence of such a data store, the online and bootstrap processors output their results to a common Kafka topic. Another Samza job is responsible for merging these results and writing them to a database like Espresso.

In the above scenario the entire database (from the beginning of time) has to be reprocessed. There are other scenarios where only a few hours worth of data has to be reprocessed. For example, during an application upgrade a bug gets introduced in the stream processor. If you have good monitoring, then within minutes to a few hours you will realize what happened. Assuming the processing is idempotent you would revert the application back, rewind the input stream (supported in messaging systems like Kafka, AWS Kinesis, Azure EventHub, etc.), and reprocess the data.

What are the caveats of this approach?

You have to keep a few things in mind when you do reprocessing in your online stream processor:

  1. When you rewind and start reprocessing in a stream processing job, and the job is directly updating a database that serves user requests, you have to worry about potentially impacting site latency. Hence it is important to govern the concurrency of the stream processor. In a framework like Samza, there is a way to control the total parallelism of your job.
  2. Although there is no technical size limit to what you can reprocess in a streaming system like Samza, there comes a point where you might need thousands of machines to do the reprocessing in a reasonable amount of time, e.g. a dataset which is hundreds of terabytes or more. At LinkedIn, such datasets are reprocessed only in our Hadoop grids. This separation of clusters avoids network saturation and other DOS triggers in our online clusters.

3. Ease of programmability and experimentation

There is a higher bar for creating a nearline “always-running” stream processing application. Many times the logic is much more complex or cumbersome to express in Java as opposed to a high level language like SQL. Application developers and data scientists enjoy the development agility of using higher level languages like HIVE/PIG to express their logic in Hadoop. However, most open source stream processing frameworks don’t have first-class SQL capabilities. A good example of a commercial stream processor which has first-class support for SQL over real-time streams is Azure Stream Analytics. In the absence of SQL on streams, at LinkedIn many nearline jobs are instead implemented on Hadoop/Spark and are configured to run at shorter frequencies. Some other applications end up implementing basic/simple logic in their nearline stream processing job, but have a more comprehensive processing done in their offline Hadoop/Spark job. They then merge the results in classic Lambda style and make it available for consumption.

This is a well-known gap and is now an active area of development in stream processing frameworks. Julian Hyde describes an effort to have SQL support on Samza using Apache Calcite. This effort is not production-ready yet, so we don’t use this at LinkedIn. In the absence of a first-class SQL support, developers do use CEP (Complex event processing) frameworks in conjunction with a stream processing system to provide a higher level abstraction on streams. Shuyi Chen from Uber gave a presentation at the recent Stream Processing Meetup @ LinkedIn, explaining how they have embedded an open source CEP engine (Siddhi) into their Samza-based stream processors, making it easier to implement business rules without having to dive down into Java/Python.

In addition to high-level language support, our Hadoop grids also provide a good model for iterative experimentation. Doing the same experimentation in the online/nearline systems is a lot more cumbersome, and additional care has to be taken in order not to negatively impact online systems when accessing event sources and databases. At LinkedIn, most relevant data sets are periodically copied into Hadoop/Spark grids and are ready for use by offline batch jobs. As a result, many users would rather do most of their experimentation offline in Hadoop. Fortunately, stream processors like Samza already run on YARN (and Mesos in the future). As a result, it is easy to get them up and running on a Hadoop grid. We are currently developing a HDFS system consumer for Samza which will allow the application logic to be written once, with the ability to switch the input between HDFS, Kafka, Databus and other sources. We hope to make it available in a release of Samza later this year.

Join us

If you would like to hear more about this then please join me and Yi Pan for our session on “Lambdaless Stream Processing @ Scale in LinkedIn” in the upcoming Hadoop Summit in San Jose on June 29th, 2016.

We are hiring! If you are passionate about building large-scale distributed streams infrastructure, then please reach out to me or one of the members of the LinkedIn Streams Infrastructure team.