Open Source

Supporting large fanout use cases at scale in Venice

Venice, which was developed in late 2015, is a key-value store platform built for serving read-heavy workloads and optimized for serving derived data. Since being deployed to production in 2016, it has become very popular in the recommendation world to serve derived datasets inside LinkedIn. Venice handles single-get and small batch-get requests very efficiently and has conveniently scaled to accommodate the organic traffic growth.

In late 2018 we started ramping up a more challenging use case of large batch-gets with hundreds, or even thousands, of keys per request. These requests resulted in large fanout,  potentially touching every partition, and in much larger response payloads. These factors led to preventing the Venice platform from being able to deliver the required throughput within the latency SLA.

In this blog, we describe how we have evolved the platform to become significantly more performant and horizontally scalable for this class of feature store use cases. To become familiar with the Venice platform and the prior set of optimizations, please refer to our previous blog posts.

Use case analysis

Before analyzing the large fanout use case, let’s review the high-level architecture of the Venice read path.

Venice read path

image-of-venice-read-path

Helix/ZooKeeper
Helix is a generic cluster management framework, which is being used in many distributed systems. At LinkedIn, we use it to manage partition placement among multiple Venice servers. Later in this blog, we will explore it in depth when discussing the partition assignment related optimization strategy.

Venice thin client
Venice thin client is a client library that the application uses to talk to the Venice router to perform single-key or multi-key lookups.

Venice router
Venice router is a stateless component with awareness of the partition placement done by Helix. It parses the incoming request, scatters multiple sub requests to the storage replicas hosting the requested keys, aggregates all the responses, and returns the consolidated result to the requesting Venice thin client.

Venice server
Venice server is a stateful component that stores the partition replicas that Helix assigns to it in local storage.

Large fanout use case

One of the typical scenarios is to serve a PYMK (People You May Know) request, which is super important in the LinkedIn ecosystem. For this typical use case, the application will send out 10k+ QPS to Venice and each request will contain 5,000 keys or more, which will result in about 5MB response per request. In the meantime, the latency requirement is very tight: ~100ms at p99. This use case would introduce many challenges such as network bandwidth usage, strict latency SLA, and large fanout. With this particular use case, the network usage will be more than 50 GB/s, more than four times as much as what even a modern 100 Gbps NIC can handle and each request will involve a lot of CPU intensive work, such as database lookup, serialization/deserialization, SSL encryption/decryption, and computation. With the high key count and random key distribution, each request also will touch almost every partition, and thus, every Venice server. As we scale the cluster horizontally, the fanout would increase proportionally. The p99 end-to-end latency would become worse as the long-tail latency of the fanout request to the Venice server will become more dominant. The Venice system won’t be able to deliver the strict SLA for this special use case without optimizations.

Optimization strategies

Using the above analysis, we will break down all the optimizations into the following three categories:

  1. Network bandwidth usage reduction

  2. Latency improvement

  3. Horizontal scalability

These three categories are co-related as well. For example, network bandwidth reduction would certainly improve the scalability. This categorization is mostly used to help describe the strategies with different focuses in this blog, and it has no relationship with the actual prioritization and execution.

Network bandwidth usage reduction

Venice read compute
Many AI use cases store embeddings (vectors of floating-point numbers) in their Venice stores, and the applications apply machine learning operators, such as dot-product or cosine-similarity to the raw response at runtime. The end result is a scalar, which is much smaller than the original embeddings, and these embeddings are the biggest contributors to the large response. The idea of Venice read compute is to push down the computation to the Venice server layer, so that Venice will only need to return the final computed result, which would reduce the response size significantly. With Venice read compute, the response size is reduced by 75% for one of the biggest use cases. Since most of the AI customers are already familiar with some declarative languages, Venice read compute chooses to use a declarative API to be user friendly, which will help avoid arbitrary computation like a Java UDF (User Defined Function), as this won’t work well in a multi-tenant platform because Venice will have no control over the kind of business logic that gets executed. Currently, it only supports the following operators: projection, dot-product, cosine-similarity, hadamard-product, and count. However, we will gradually enrich the capabilities when discovering new use cases.

Here is a snippet about how Venice read compute API looks like:

snippet-of-venice-read-compute

Latency improvement

Switching to RocksDB
In 2018, we began experimenting with RocksDB in the Venice server, which had been using BDB-JE from the beginning. After the lengthy validation and stabilization, we decided to switch to RocksDB in these large fanout clusters first, and the Garbage Collection (GC) pause time was reduced significantly, thanks to the C++ implementation. The p99 latency was improved by more than 50% due to a modern implementation of the LSM tree.

After analyzing many use cases, we realized that we could leverage RocksDB read-only mode for a subset of use cases, whose data become read-only after the initial bootstrapping, which yielded double throughput with reduced latency. For the use cases with high key count per request, it is not realistic to serve the data out of disk considering the tight SLA requirement, so the large fanout cluster was set up to keep the data in memory as much as possible in RocksDB block cache. We observed two challenges while leveraging RocksDB block cache:

  1. There is a non-negligible extra memory overhead to store all of the data in RocksDB block cache: ~20%, which would increase the hardware cost as the store size will keep growing.

  2. Each Venice store is a versioned database, and every bootstrapping will produce a new version. When switching to the newly ingested version, the cache is cold, and explicit cache warming before version switch is required to avoid latency spikes.

RocksDB PlainTable format is an ideal storage format for the in-memory serving workload,  maintaining a very compact format in memory and internally leveraging mmap to keep the data in OS cache. With this new format, the memory usage is comparable to the actual data stored on disk and there is no explicit cache warm-up required since all the data will be cached in OS page cache during the bootstrapping. The RocksDB PlainTable format perfectly solves the pain points and improves the latency by ~7 times compared to the RocksDB Block based format.   

Routing optimization

Least-load replica selection
Typically, Venice will maintain three replicas for each partition. In the read-heavy clusters, Venice will allocate more than three replicas, and in Venice, any ready-to-serve replica can serve the incoming read traffic because Venice is an eventually consistent platform. Round-robin or random replica selection could work well if all the replicas are healthy, but in practice, the status of each replica could change quickly because of the spiky ingestion or non-deterministic GC behavior. If each replica is treated equally all the time it would impact long-tail latency. Instead, the Venice router adopts a queue-based mechanism to measure the healthiness of the replica. This queue is used to store the pending requests to each route. It will try to choose the least-loaded replica based on the pending request count to serve the incoming request. With this strategy, the slow replica is prevented from receiving excessive load, which speeds up the recovery. When the slow/bad replica recovers, this strategy will resume the traffic instantly. Together with the “smart long-tail retry” feature described in the following section, when the Venice router hits the slow/bad replica, the retry will short-circuit the long-tail sub request, and help minimize the unnecessary retries. These unecessary retries could hurt the capability of the whole cluster since retry is quite expensive.

Smart long-tail retry
At the early stage of Venice, Venice router introduced a latency based retry mechanism, which is called long-tail retry, and the retry request will be triggered when the original request takes more time than the predefined retry threshold. We did notice some shortcomings with this simple retry mechanism, such as when a Venice router instance is experiencing some heavy GC. In this instance,many retry requests are triggered but processing is delayed because of the GC pause, making these retry requests useless and inefficient as they consume a lot of CPU/network resources. With the least-loaded replica selection mechanism we described, the Venice router will try to avoid sending requests to the overwhelmed replicas, which will reduce the chance to trigger retry. This smart long-tail retry tries to reduce the retry load even further when the router itself is not in a healthy state, for example, the router is overwhelmed by too many requests. The new strategy tries to detect if the retry request is triggered by the slowness of the replica to decide whether the retry request should proceed. For example, if the request timestamp of a retry request is much higher than the timestamp the retry request should be triggered (original request timestamp + retry threshold), then the retry request should be aborted because it is mostly caused by the slowness inside the router instead of the replica the original request is talking to. While this strategy is not ideal because it won’t be able to abort every useless retry, based on the observation,it could help skim a lot of unnecessary retries when the cluster is not in a steady-state and helps speed up the cluster recovery.

Connection warming
The Venice router is still talking HTTP/1.1 protocol to the Venice server and we are in the middle of validating HTTP/2. With HTTP/1.1, each Venice router instance needs to spin up many http connections to each Venice server instance to serve a high throughput of requests. Whenthere is a traffic shift or some router instance restarts, there would be a large latency spike, which could last for a couple of minutes since HTTPs connection setup is time-consuming.

The Venice router has adopted the following connection warming strategies to mitigate the these issues:

  1. When the router instance gets restarted, it will try to warm up a configured amount of connections to each Venice server instance before serving any incoming traffic.

  2. When the router instance detects any newly joined Venice server instance, it will try to warm up the connections to the new instance before putting it into the rotation.

  3. When the connection pool for a given route degrades over time, asynchronously, the router will warm up the connections to meet the minimal requirements of the connection pool.

Once the Venice router completely adopts the HTTP/2 support, the connection warming strategy will become much simpler since only a very few number of HTTP/2 connections are required between each Venice router/server pair because of HTTP/2 multiplexing.

Venice streaming

Initially, the Venice thin client only offers the blocking APIs, so the application will either receive a full response or nothing, which is not ideal for the AI use cases that are responsible for offering recommendations. As long as the quality of the result is good enough, there is no need to always return the full set of results. In the meantime, with blocking APIs, the application could only start processing when the full response is received. With this, the wait-and-process strategy would cause the following consequences: high end-to-end latency since the chained operations are not pipelined, and high GC overhead since the application needs to buffer the full response for a long period to execute the processing logic. To mitigate these pain points, Venice built the streaming support from the Venice router to the Venice client. The following section describes the behavior change with or without Venice streaming.

Blocking API

  1. Venice client composes a batch-get request and sends it to the Venice router.
  2. Venice router finds out which Venice server instances are hosting the partitions the keys in the client request are belonging to.

  3. Venice router will send out a sub-request for each route.

  4. Venice router will wait for all the sub responses from the requested Venice server instances.

  5. If any sub response is errored, the Venice router would return an error response to the Venice client.

  6. If all the sub responses are good, the Venice router will consolidate them into a full response, and return it back to the Venice client at one go.

Streaming API

  1. Venice client composes a batch-get request and sends it to the Venice router.

  2. Venice router finds out which Venice server instances are hosting the partitions the keys in the client request are belonging to.

  3. Venice router will send out a sub-request for each route.

  4. Venice router will send back the response from the Venice server immediately to the Venice client.

  5. After collecting all the sub responses, Venice router will send back a footer to indicate the response is not complete if any response is errored.

  6. The Venice client could start the processing right away once receiving any records from the Venice router.

The bolded bullet points are the main differences. As you can tell, it is very natural to receive a partial response in the application with the streaming API in case there is an error from the backend or a timeout happens. After adopting the streaming API in the large fanout use cases, we observed a 15% of end to end latency improvement at p99.

Fast-Avro
Fast-Avro was originally developed by RTBHouse, but over time, they stopped maintaining the project. LinkedIn offered to maintain the project and adopted it under the LinkedIn namespace here, including many new optimizations.At the high level, Fast-Avro is an alternative approach to Apache Avro serialization and deserialization. It relies on runtime code generation for serialization and deserialization, which achieves better performance results than native implementation. Right now, this library is being widely adopted inside LinkedIn since it can support various Avro versions at runtime.

The Venice platform has fully adopted Fast-Avro and one major use case we observed is about 90% of de-serialization improvement at p99 on the application end, which makes it  a must-to-have technology in the whole Venice ecosystem.

Horizontal scalability

Helix group based routing
Venice has been leveraging Helix as the cluster management framework from day one and has been doing a great job in the following areas: 1) making sure each Venice server instance will be assigned a similar amount of partition replicas to achieve even distribution from the storage space perspective and, 2) making sure the replicas belonging to the same partition won't fall into the same zone to achieve high availability.

Let us walk through an example to describe the problem we encountered when scaling the large fanout cluster horizontally. Assuming there are six partitions with a replication factor of three and there are six hosts spreading across six zones, the partition assignment is as follows:

Hostname Zone ID Hosted partitions
H1 R1 P1, P3, P5
H2 R2 P1, P3, P5
H3 R3 P1, P4, P6
H4 R4 P2, P3, P6
H5 R5 P2, P4, P5
H6 R6 P2, P4, P6

For any request which touches every partition, there could be many different choices of hosts to satisfy all sub-requests:

H1, H3, H4

H2, H3, H4

H1, H6

H2, H6

… …

Any of these combinations could serve the incoming requests, however, because the fanout size varies, the Venice router couldn’t constantly hit a group, which contains the minimal number of hosts. This causes the capacity of the whole cluster to decrease significantly. In practice, when the total number of partitions is much higher than the host number, a high key count request could touch almost every storage node, and the fanout size would be proportional to the total number of Venice server instances.

To scale a read-heavy cluster, it is very natural to increase the replication factor to serve more traffic, but without any special arrangement, the fanout size will increase proportionally during cluster expansion leading to two issues:

  1. Any client request will scatter out many smaller subrequests and each sub-request has a fixed amount of overhead when serving in the Venice server.

  2. The chance to hit long-tail latency will increase exponentially when the fanout size increases.

When we experimented with this partition assignment strategy by doubling the replication factor (doubling the total number of the Venice Server instances), the read throughput capacity of the whole cluster didn't change at all, indicating that this partition assignment strategy was not horizontally scalable. 

To make sure the fanout size is constant when scaling out the cluster, Venice has adopted a new strategy with the assumption that the total number of logical zones will be exactly the same as the replication factor. Note that a zone is a concept created by Venice's use of Helix for partition shard management, which is distinct from a physical rack. If we apply this strategy to our example, the zone count will become three instead of six. The following table describes the partition assignment:

Hostname Zone ID Hosted partitions
H1 R1 P1, P3, P5
H2 R1 P2, P4, P6
H3 R2 P1, P4, P6
H4 R2 P2, P3, P5
H5 R3 P2, P4, P5
H6 R3 P1, P3, P6

Because the replication factor is exactly the same as the zone count, each zone will contain a full replication. This means any high key count request can be served by any host group belonging to the same zone, making the fanout size constant regardless of the replication factor. With this partition assignment strategy, Venice is able to scale the large fanout cluster horizontally, and the experiment we have done proved this assumption.

State of today

In the past several years, we have learned and applied the described optimization strategies to the large fanout clusters Currently, these clusters have the capability to serve large batch requests with a very tight latency SLA and are able to scale horizontally to the organic traffic growth.

Ongoing optimizations and future plans

HTTP/2 evaluation
We are in the middle of verifying HTTP/2 support between Venice router and Venice server, and the initial result has shown a great improvement of GC behavior and better latency. In the meantime, the connection warming strategy in the Venice router can be simplified greatly since the total number of HTTP/2 connections required is much smaller than HTTP/1.1. In the future, we hope to be able to explore more advanced features available in HTTP/2, such as header compression and server push to improve the performance even further.

Fast Client
At a high level, the classical Venice is using three-layer architecture: Venice thin client ↔ Venice router ↔ Venice server. The Venice fast client, on the other hand, removes one hop, the Venice router, from this architecture and consolidates the routing layer into the client layer, so the architecture becomes two layers: Venice fast client ↔ Venice server. We are expecting to achieve better latency and higher scalability with this approach.

Acknowledgements

Brainstorming and verifying these performance tuning strategies has been a team effort. Here I would like to thank my colleagues Felix GV, Min Huang, Sidian Wu, Sourav Maji, Xun Yin, and Zac Policzer for their invaluable contributions and feedback. Special thanks to Peter Chng, Edwina Lu, Sumit Rangwala, and Amol Ghoting from the PYMK team for their input and suggestions. Thanks to the leadership team for supporting our efforts: Salil Gokhale and Ashish Singhai

Finally, thanks to our incredible SREs who keep the site up and running: Abhishek Bhargava, Ali Poursamadi, Amit Balode, Andrew Goktepe, Arjun Shenoy, Karthik Appigatla, Kian Chung, Vinoth Govindaraj, and Zonia Harris.