Open Source

Venice Hybrid: Doing Lambda Better

Over the last two years at LinkedIn, I’ve been working on a distributed key-value database called “Venice.” Venice is designed to be a significant improvement to Voldemort Read-Only for serving derived data. In late 2016, Venice started serving production traffic for batch use cases that were very similar to the existing uses of Voldemort Read-Only. In the time since then, we’ve built out the rest of the functionality to complete the dream that motivated the construction of Venice: the ability to consume data from both batch and streaming sources, which we’re calling “Venice Hybrid.”

In this blog post, we will cover some of the history of serving derived data and lambda architectures and explore how Venice solves the challenges inherent in combining batch and streaming data.

Derived data

LinkedIn operates an online service that serves traffic to over 500 million members. The operation of this application requires two distinct types of data: primary data and derived data. Primary data is what we normally think about when storing data for a web application. For example: when you update your LinkedIn profile with your new job title, that is primary data. When someone else looks at your profile and sees that job title, we are serving them the primary data you provided.

The canonical example of derived data is the People You May Know (PYMK) feature on the LinkedIn application. Based on primary data, including connections and interests, we compute or derive a list of recommendations. These are members we think you might want to connect with on our platform. Traditionally, we recalculate derived data periodically. We take a snapshot in time of connection information, profile data, and other signals; we use that to generate a list of recommendations that correspond to that snapshot in time.

Serving derived data requires a different type of database compared to serving primary data. Members cannot change their PYMK recommendations, so we don’t need to optimize for online writes. We do need to support very low-latency online reads and we need to optimize for very high-speed bulk ingestion of complete datasets. For a long time, LinkedIn has used Voldemort Read-Only to serve derived data. Voldemort Read-Only includes the Build and Push Hadoop job to do bulk ingestion of complete datasets. The Build and Push job creates indexed database binary files for each partition on each storage node right on Hadoop. Then the job coordinates the download of each partition by the different storage nodes. The Voldemort storage nodes can serve reads directly from these pre-indexed files, so the ingestion speed does not depend on any indexing or data organization operations on the local machine.

Lambda

Voldemort Read-Only represents a common model in the world of derived data: batch processing. At any point in time, we can generate a very accurate derived dataset by batch processing the current state of the world. However, that batch processing is expensive and slow so we often run it only daily or even less frequently. But what about everything that happens between runs? Until the next execution, everything that happens on the LinkedIn application is ignored. There is another popular model that tries to address this shortcoming. That model is often called the lambda architecture.  

In a lambda architecture, you have two different data processing systems. One of those systems is the batch processing and serving system we’ve already seen: batch processing on a platform like Hadoop and then loading into a datastore like Voldemort Read-Only for serving reads. The other system is the speed layer. The speed layer provides those up-to-the-minute updates so you don’t need to serve data that is up to a day or more old. At LinkedIn, we have stream data in Kafka that includes changes that members make to their profiles and may other events. We use Samza to do processing on that stream data and the Samza application writes into a database such as Espresso or Couchbase. This architecture allows us to generate near real-time updates to our recommendations based off of live activity.

Often the speed layer has to make computational sacrifices in order to operate quickly on the stream data as it comes in. Batch processing still has the advantage of operating on the entire corpus of data, so in many architectures it still provides more accurate and reliable results. For this reason, it provides a lot of value to continue doing batch processing even when near real-time stream processing is available.

When a member comes to the PYMK page of the LinkedIn application, the PYMK service queries both the batch system (Voldemort Read-Only) and the speed layer, does some local reconciliation between the two, and serves a result.

Doing Lambda better: Hybrid

Lambda architectures work, and they are widely used at LinkedIn and in the industry. They also have some downsides. The application needs to read from two different databases, which means the application always has to wait for the slower of two responses. The application is also only available when both databases are up. In addition to this, the application needs to be responsible for the additional complexity of integrating with two different database libraries and coordinating the logic of how to prioritize the results from each system.

Venice aims to solve these problems by providing a single database that accepts both batch and streaming ingestions and does the reconciliation at write time as a part of the infrastructure, so that each application team doesn’t need to reimplement a complex, multi-system lambda architecture—they can use one system that we provide and manage.

Another challenge in maintaining a lambda architecture is the fact that the data processing for batch and stream data requires different systems. This problem is upstream of data serving, and Venice doesn’t address this challenge; however, LinkedIn has contributed extensions to Samza to allow Samza code to operate on Hadoop data, possibly eliminating the need to maintain two different data processing code paths, since the same Samza code could run on data in HDFS and run on data streaming in Kafka. However, this is a topic for another blog post.

Venice batch

We’ve published a few blog posts about Venice already, but I’d like to take a moment to go over the way that Venice works for ingesting and serving data from Hadoop. The big difference between Venice and Voldemort Read-Only is the ingestion path. The Voldemort Read-Only Build and Push job builds an indexed database file on Hadoop for each database partition, and loads all of those partitions onto the various storage nodes in the cluster. Venice uses Kafka as a sort of write buffer. Any system that writes to Venice does so by writing to a Kafka topic, and the Venice storage nodes consume from that Kafka topic, persisting each record locally in order to serve queries.

The core resource in Venice is a store. A store has schemas, owners, and is isolated from other stores. A store represents a single use case for a team using Venice. When a team pushes a dataset into their Venice store, they create a new version of that store. Every bulk push of data into Venice is a complete snapshot of the dataset. Before starting the push, Venice allocates a new Kafka topic and provides that topic to the system pushing data. We do bulk data pushes from Hadoop, and the push is a MapReduce job. That job uses a defined partitioning strategy to partition all of the records across the different partitions of the Kafka topic. Now that the data is flowing into Kafka, the Venice storage nodes start consuming that data. The physical instantiation of the store version in the Venice storage nodes is also partitioned and uses the same partitioning as we use in Kafka. This means that each partition of the store in Venice is able to consume from a corresponding partition in Kafka. For simplicity, the diagram below doesn’t illustrate the partitioning.

venice3

As you can see from the diagram, while we ingest a new version (in this, case version 8), Venice continues to serve data from the previous version. Only after consumption is complete do we switch over to serving the new version. The offline job that does the push writes an end-of-push control message into each partition after it has finished writing data. Once a storage node consumes that end-of-push message, we know it has completed consumption. Once every replica of every partition is done, we start serving the newly ingested version for requests to that store.

Venice generally retains one historical version of a store in case something goes wrong with a push, so that we can roll back to the previous version and minimize the impact to our members. Voldemort Read-Only has this feature as well.

Venice hybrid

We designed Venice to consume via Kafka in order to support ingestion from a stream processing system like Samza. A stream processing job can continuously write records to Kafka, and in turn, Venice can continuously consume updates in near real-time. Where a batch push may happen daily, a continuous stream of updates means that a derived dataset is serving data that is fresh to the minute, instead of up to a day stale. But as a colleague of mine is fond of saying, “You cannot remove complexity; you can only move it around”. Because we provide a single system for consuming data from both batch and streaming sources, Venice also needs to handle the complexities of reconciling these two data sources.

Data gaps
Venice strives to keep the reconciliation logic as simple as possible by only using one reconciliation strategy—last writer wins—though other reconciliation strategies could be supported. If a batch push comes into Venice and then a subsequent streaming update comes in for the same key, we want to serve the key that came in from the stream. But this simple strategy isn’t actually sufficient. Imagine we push a dataset from Hadoop. Before we can execute that push, we need to collect the data we’re going to work with, do extensive, time-consuming processing on that dataset to derive the fields we need to serve, and then start the large push. By the time ingest is complete, the dataset may already represent a snapshot in time that is six hours old. If we only now start applying streaming updates on top of the bulk ingest, there are six hours of streaming updates that we’re missing—this is a data gap, and we must not allow such a data gap to exist.

Multiple versions
The answer to data gaps leads us to the next question. In order for a streaming update to be captured from six hours ago, that update must apply to both the previous version, which has been live for the last six hours, and the new version, which needs to catch up to real time. In order to satisfy these needs, we actually create a dedicated Kafka topic for accepting streaming writes. This topic acts like a Venice internal buffer that we can rewind for a new push.

venice4

Our real-time processing applications write directly into this dedicated real time buffer topic, and Venice replicates messages from that topic into the current version topic. The real time buffer topic follows the same partitioning as the version topics. This way, the storage nodes are able to continue consuming from a version topic, and simply continue consuming after reaching the end-of-push control message from the bulk push. In the diagram above, we have an existing version 7, which received replicated messages from the real time buffer topic. Meanwhile, the new version 8 initializes with a bulk push from an offline system such as Hadoop.

venice5

After the offline job completes, Venice starts another replication from the real time buffer into the new version 8 topic. This replication rewinds and starts with six hours past of real time updates. This rewind time is configurable and will depend on the nature of the bulk-loaded dataset. Once the storage nodes have caught up on consumption, Venice can switch versions and start serving out of version 8.

Bookkeeping
Any time a storage node begins ingesting a partition, we need some way to know when the ingestion is complete. For bulk loads, this is pretty simple—we look for the end-of-push control message. For real time streaming updates, however, this gets more complicated. In the previous section, I hand-waved and said that we’re good to go as soon as the storage node has caught up. But how do we define this? Kafka identifies messages in a stream with an offset. Each offset points to one message, and we can query a Kafka topic and partition for the latest offset. Messages continue flowing into the system so that latest offset is only valid for a moment. We can define an offset threshold and define “caught up” to mean when the storage node has consumed an offset that is within the threshold of the most recent offset. But even this might not be sufficient. If the replication stream from the real-time buffer into the version 8 topic has just started and we are consuming faster than the replication stream is able to replicate data, then there may still be a backlog in the real-time buffer that doesn’t exist in the version 8 topic. In this case, we might be caught up with the version 8 topic, but we’re not caught up with the real-time buffer.

venice6

When the replication stream begins, we embed a special control message called a SOBR, short for Start Of Buffer Replay. The SOBR specifies the offset in the buffer replay where replication starts from. When the storage node consumes the SOBR, it keeps track of the offset in the version topic where the SOBR exists and associates that offset with the SOBR-specified offset in the buffer topic. Now the storage node can query Kafka for the latest offset in the real-time buffer topic and infer progress in catching up to the latest messages written into Venice, instead of just written into the version topic.  

For an example, I’ll assume that the topics only have one partition. In practice this is done for every partition in the topic. The batch push for version 8 ends with an end-of-push message at offset 27. At this point in time, the real-time buffer topic is at offset 219. Venice writes a SOBR into the version 8 topic at offset 28, specifying that replay begins at offset 220 in the real-time buffer topic. A moment later, there are 12 more messages in the real-time topic, and the replication stream has started replicating messages from the real-time topic into the version 8 topic. Now when the storage node evaluates its status, it looks at the real-time offset that was embedded in the SOBR: 220, and the latest offset in the real-time buffer: 232. The difference, 12, is the portion of the real time topic that needs to be consumed. The storage node read the SOBR from the version 8 topic at offset 28. The latest offset the storage node has consumed is offset 32. This difference, 4, is the portion of the real-time topic that has already been consumed. 12 messages need to be consumed from the real time topic and only 4 have been consumed, so the storage node has a lag of 8 messages.

The bookkeeping is complicated, but it is the right calculation to evaluate storage node consumption lag. With this calculation, Venice is able to ensure that the new version is caught up with the real-time stream before going online.

Acknowledgments

Venice has been a really awesome system to build over the last two years.—getting batch functionality into production and now hybrid, and we’re currently migrating all the Voldemort Read-Only use cases at LinkedIn to Venice and looking forward to decommissioning our old workhorse. All of this has been a huge team effort and any discussion of Venice would not be complete without mentioning everyone that has contributed on the team, both past and present.  So a big thanks to Charles Gao, Felix GV, Gaojie Liu,  Arun Thirupathi, Sidian Wu, and Yan Yan on the development team; all the support from Akhil Ahuja, Greg Banks, Kian Chung, Steven Dolan, Vinoth Govindaraj, Nirupam Mohari, Ali Poursamadi, Tofig Suleymanov, Brendan Harris, and Warren Turkal on our site reliability team; and the leadership from Siddharth Singh and Ivo Dimitrov.