Building Venice with Apache Helix

February 15, 2017

Editor's note: This blog has been updated.


Like many internet companies, LinkedIn has faced data growth challenges. Naturally, distributed storage systems became the solution to handle larger volumes of data and queries per second (QPS). But, aside from scaling issues, the variability in access patterns also grew quickly. For example, some scenarios require no more than simple put/get operations but insist on having low latency and high availability. On the contrary, other scenarios require stronger consistency guarantees, more complex query patterns, and secondary indexes but are willing to suffer brief periods of unavailability in order to maintain these guarantees. There is no one-size-fits-all system to satisfy all needs perfectly, so LinkedIn has built specialized distributed storage systems that each offer different trade-offs in order to cover various scenarios.  

Now, the question that comes up is: do we build all of these systems from scratch? The answer is a resounding no! Even though scenarios vary, there are common elements that are essential to any distributed system: cluster management, storage engines, security, etc. Thus, we build generic libraries and leverage them to accelerate our development work. For example, Helix is one of those generic tools that helps to manage large clusters at LinkedIn. This post presents how Helix’s cluster management capabilities have been leveraged in Venice, a distributed derived data serving platform.

Cluster management in a distributed system

All distributed storage systems are clusters composed of a bunch of machines. While managing a cluster, there are some typical issues you see.

Data partitioning: If the dataset does not fit on a single machine, how do we distribute partitions among machines as evenly as possible? More interestingly, after machines join or leave the cluster, partitions need to be re-balanced across the remaining machines. All of those mappings need to be persisted somewhere in order to recover from cluster failures.

Fault tolerance: As we know, the possibility that one machine in the cluster might fail increases as we add more machines to the cluster. In a large cluster, we should therefore expect machine failures to happen frequently. In order to avoid data loss, we need sufficient redundancy, a mechanism to detect failure, and an automatic failover process. Obviously, the cluster manager must also be careful not to place multiple replicas of the same dataset within the same failure domain (machine, rack, etc.).

Replica state management: As a stateful service, the storage system typically cannot start serving requests immediately after joining the cluster, because it first needs to bootstrap the latest version of its state. On the flip side, before retiring a server, we need to make sure there is enough redundancy in the remaining replicas. Meanwhile, the data replica hosted on the server may have complex states during its lifecycle. For example, in a traditional primary/secondarymodel, one machine hosts the primary replica and receives all writes (and possibly all reads too, if strong consistency is required), so a replica assigned to a new machine must start from the bootstrap state to get all data from the primary; then its state changes to secondary and it may serve read requests (if stale reads are allowed). Furthermore, a secondary replica might be elected to become the new primary of a partition if the current primary crashes. Aside from making sure the server applies the replica’s state change, it’s also necessary to notify other servers and clients about that change, which can happen using a broadcast protocol or by updating a centralized metadata database.

Introducing Helix

Helix is a generic framework that we use to manage clusters at LinkedIn. It was initially developed for Espresso, a distributed document-based NoSQL database. Helix is widely used at LinkedIn, not only for distributed storage systems but also for streaming processes, search infrastructure, and analytics platforms.

The core concept of Helix is a state model, which is a finite state machine (FSM). In this FSM, you describe the possible states and transitions between states of your resource. Helix will help you to trigger the proper state transitions in order to achieve the ideal state or rollback to the initial state when retiring a server.

  • Venice1

Primary/secondary state machine

In addition to the state model for participants of a cluster, Helix has a few different machine types:

  • Controller: The controller is the brain of the system, responsible for monitoring changes in the cluster, deciding the mapping between data partitions and machines, and sending state transitions to servers. All metadata about the cluster is persisted in Zookeeper and Zookeeper watchers are used to get notifications when servers join or leave the cluster and when replicas’ states change. There are redundant controllers running in leader/standby mode to avoid having a single point of failure for this critical component.
  • Participant: The participant is the real worker, storing data and serving requests. It responds to the controller’s requests to adopt new partitions, drop old ones, and change their state.
  • Spectator: The spectator is an observer that monitors the mapping between the partitions and servers. For example, it could be a client or a router that uses the mapping to find the proper participant to communicate with in order to satisfy read/write requests.

These three roles work together with the state model to complete the essential features of cluster management: data partitioning, failover, and replica state management.

Introducing Venice

Venice, a successor to Voldemort Read-Only, is LinkedIn’s next-generation derived data serving platform. A common pattern at LinkedIn is to compute a dataset on Hadoop that is derived from primary data sources (profile, connection, etc.) as well as event data (page views, etc.), then push that derived dataset regularly to Voldemort Read-only as an immutable batch to serve online read requests. For example, in order to recommend People You May Know (PYMK), we regularly use offline jobs to rank second degree connections for each member based on our relevance models, then that result will be pushed to Voldemort/Venice. At last, once you open your LinkedIn connections page, the web servers read those results from Voldemort/Venice.

Real-time use cases are now becoming more prevalent, which demands a system that can serve data not only from a batch source (Hadoop), but also from a streaming source (such as Samza). For example, in the PYMK example above, a technique called “impression discounting” can be used as a near real-time signal to improve the relevance of the recommendations. The way this works is that whenever a user views some of their PYMK recommendations, the connections that are seen, but not acted upon, have their ranking lowered, making it less likely that they’ll be at the top of the list the next time the user looks. Venice aims to serve both batch and streaming data in a single unified system.

The diagram below describes:

  1. Venice’s components;
  2. External systems that Venice depends on;
  3. Data flow and metadata requests.

Note: Every box in the diagram is distributed across many nodes. For simplicity, they are shown as single boxes, but that does not imply that there is any single point of failure in this architecture.

  • Venice architecture with Apache Helix

Venice architecture



  • Venice controller: Controls partition assignment, store creation, and other coordination tasks across the whole cluster. Although the controller itself is stateless, it persists all of the metadata in Zookeeper.

  • Venice storage node: A stateful process which holds the data and index for serving low latency online reads. It does not support online writes; instead, it ingests data asynchronously from Kafka.

  • Venice router: A stateless process which exposes REST endpoints for reading data and metadata.

Data and metadata flow
You might have noticed that the data flow looks similar to the Lambda architecture. Venice supports Lambda architecture use cases, but it also supports batch-only use cases (just like Voldemort Read-Only) and streaming-only use cases (as described in the Kappa architecture).

  • Write path: The input dataset already exists on Hadoop as files or on Samza as stream and it is pushed into Kafka. Venice storage nodes consume data from Kafka, and then persist it to their local storage engine.

  • Read path: Clients connect to a router, which forwards the requests to the right storage nodes and returns the result. The router knows which storage node to contact because it is a spectator to the partition mapping maintained by the controller.

  • Metadata path: Before ingesting real data into Venice, the controller needs to create some resources, including storage engine instances, a Helix resource and a Kafka topic. Then, during the ingestion, the controller collects the status of each replica and sends commands to storage nodes using Helix’s message delivery system.

Using Helix to manage a Venice cluster

The above description of Venice does not refer to the distributed data environment at all. You could image that if the dataset were small enough to put into one machine, the data flow above would work very well. At LinkedIn’s scale, however, more than 100 TBs of data need to be pushed every day, and several 100K QPS need to be served. This cannot be handled by a single machine.

Now, let’s come back to the issues we mentioned at the start of this article: data partitioning, fault tolerance, and replica state management. As a distributed storage system, Venice must address all of these issues. Fortunately, we have Helix: a generic framework that keeps us from needing to build everything from scratch. We describe the Venice batch use case below.

Data partitioning
After getting the request to start ingesting data, the Venice controller calculates a proper partition count according to the size of the dataset, then creates a Helix resource for that dataset. There is a 1:1 mapping between a Venice store-version and its corresponding Kafka topic and Helix resource. Within a store-version, there is also a 1:1 mapping between a Venice storage node partition, a Kafka topic partition, and a Helix resource partition. The replication factor in Kafka is independent from the replication factor in the Venice storage nodes, however.

The Helix controller (which is a process embedded in the Venice controller) notices the newly created resource and finds a set of participant machines to hold this resource. During this process, the Helix controller ensures:

  1. Replicas of the same partition should not be assigned to the same machine to ensure redundancy;

  2. Replicas are distributed among the whole cluster as evenly as possible. Right now, Helix supports different allocation algorithms, including raw hashing, CRUSH, and consistent hashing.

For example, consider a scenario with four machines, a dataset with four partitions, and a replication factor of 3. The mapping between partitions and nodes is shown in the diagram below.

  • Venice3

Venice data partition management


State model
Venice’s state model is comprised of five states: OFFLINE, BOOTSTRAP, ONLINE, ERROR, DROPPED.

  • OFFLINE: This is the initial state. The storage node has not started to read this partition from Kafka yet or the node is disconnected from ZooKeeper.

  • BOOTSTRAP: The storage node is consuming data from Kafka, but it’s not ready to serve read requests yet.

  • ONLINE: Once a replica becomes ONLINE, the storage node is able to serve online read requests for this partition.

  • DROPPED: In this state, data has been moved off of this server and all of the related resources have been cleaned up.

  • ERROR: If a storage node encounters any error while consuming data from Kafka, it will transition this replica to the ERROR state.

The state machine transitions list is below:

  • Venice State Machine

Venice state machine

NO. From To Trigger
1. OFFLINE BOOTSTRAP The node is selected to be a participant. Initialize Kafka consumer and local storage engine.
2. BOOTSTRAP ONLINE As soon as a replica enters the BOOTSTRAP state. Consume all data from Kafka, validating it and applying it into the local storage engine. This is a long-running transition.
3. BOOTSTRAP OFFLINE Resource is deleted during the ingestion. Close Kafka consumer and local storage engine.
4. ONLINE OFFLINE Resource is deleted after ingestion, or; Partition is re-balanced to other nodes. Close local storage engine.
5. BOOTSTRAP ERROR Consumption failure. Close Kafka consumer and reset offset.
6. OFFLINE ERROR Initialization of the consumer or storage engine fails. Close consumer and storage engine.
7. OFFLINE, ERROR DROPPED Resources are deleted, or; Partition is re-balanced to other nodes. Clean up all related resources, including local storage engine, Kafka offset, etc.

Fault tolerance
Venice is built from the ground up to be resilient to failures, thanks to replication. Moreover, Venice leverages Helix to achieve autonomous self-healing of every part of the system.

Controller failover: The Venice controller fails over in the same way as the Helix controller. There is one leader and there are a few standby instances that comprise a set of redundant Venice controllers. If the leader fails, a new leader is elected and it rebuilds all local context from Zookeeper before taking over the cluster.

Storage node failover: The Helix controller detects the storage node’s failure and re-assigns the partitions on this node to other nodes in the cluster. In the case of a short-term, transient issue, the mean time to repair (MTTR) until the node comes back is shorter than reading the large partition and rebuilding an entire store on the other nodes. Therefore, we use Helix’s delayed rebalance feature to avoid causing big partition movements during small outages. In the Venice architecture, storage nodes always bootstrap their data from Kafka, which means that partitions can be recovered even if all replicas are lost due to correlated failures of many storage nodes.

Router failover: Routers are stateless and redundantly deployed, so clients will start using another router in case of failure.


Accompanying data growth at LinkedIn, the various data usage needs have given birth to different kinds of distributed storage systems. Building each system from scratch is inefficient and error-prone, so Helix provides a generic framework developed to solve typical issues while managing a cluster: data partitioning, replica state management, and fault tolerance.

Based on the needs of real-time use cases, we built Venice: a derived data serving platform built to merge batch and streaming data sources, providing low latency access to derived datasets. In developing Venice, we leveraged Helix to distribute partitions evenly across the cluster and specify the logic behind various state changes, all without any single points of failure.


Designing, developing, and launching Venice has truly been a team effort, and I want to thank my colleagues Felix GV, Gaojie Liu, Matthew Wise, and Sidian Wu for their invaluable contributions and feedback. Special thanks to Lei Xia and Junkai Xue from the Helix team for their input and support on Venice. Finally, thanks to the leadership in supporting our efforts: Siddharth Singh, Ivo Dimitrov, and Swee Lim.

And finally, huge thanks to our incredible SREs who keep the system running: Akhil Ahuja, Steven Dolan, Warren Turkal, and Greg Banks.