FollowFeed: LinkedIn's Feed Made Faster and Smarter
March 31, 2016
Coauthors: Swapnil Ghike, Shubham Gupta
The feed is the default landing experience for LinkedIn’s members. It helps them discover the most relevant content, read original views from people that matter to them, and take part in unique conversations about things they care about. Dozens of LinkedIn products feature custom feed experiences which are all powered by a singular infrastructure.
To support and continue to drive engagement, this infrastructure needed to transform. Our existing feed infrastructure, Sensei, suffered from increasing operational costs and it could not perform well with LinkedIn's growth curve in terms of content and members. We needed an infrastructure that could dynamically sieve through the content shared by a member’s network to deliver a list of relevant, ordered, and personalized content while satisfying all requirements of a highly available low-latency application.
Twenty months ago, we set about building FollowFeed, which today powers all of LinkedIn’s feed experiences. Compared to Sensei, our previous feed infrastructure, FollowFeed has driven an increase in query performance (a fivefold reduction in feed query’s 99th percentile latency) and data retention (20 times larger index) with a smaller hardware footprint (the number of servers was reduced by half).
We created FollowFeed to meet two primary requirements:
Wider distribution: LinkedIn's member base has grown from 200 million to more than 400 million in the last three years. At the same time, LinkedIn's network graph has become denser, which means that content shared by a member gets distributed to a wider audience.
Improved relevance: Because of increased content inventory, we needed algorithms that produce better recommendations in the feed, thereby requiring computationally complex scoring and ranking that can impact real-time performance.
Sensei, the generic search system built by LinkedIn, which previously powered our feed infrastructure, provided the following features:
- A distributed, elastic, real-time searchable database built on top of Lucene. This system could be horizontally sharded to cater to an increase in traffic.
- Built-in support for complex relevance algorithms to rank content records in feed.
Content records were ingested into Sensei’s searcher nodes via Kafka. Queries to Sensei were specified in BQL language, which consisted of both filtering and relevance logic. Requests were received by a broker service which fanned out requests to all searcher nodes. Each searcher node applied some filtering logic to prepare a candidate set for the relevance algorithm. After applying the relevance algorithm, each node returned top N records. Records from all these searcher nodes were aggregated on broker and then sorted according to their relevance scores. Top N records from this sorted list were then returned to the client. As the relevance algorithm is embedded in the query itself, clients could easily A/B test different relevance algorithms.
Although Sensei could scale horizontally, supported relevance based feeds, and supported easy A/B testing of relevance algorithms, it needed to be replaced for the following reasons:
- Operational concerns: It was essential to keep the full inverted index in memory to achieve latency requirements required for feed applications. As the number of content records in our index grew over time, it became necessary to add more hardware to keep the overall system performing well. The incremental addition of hardware made the system that much harder to operate and also increased the cost to operate it.
- Code complexity: Sensei was composed of smaller components—Bobo, Norbert and Zoie. These components were implemented using a non-uniform collection of technologies and programming paradigms, which in some cases was also not consistent with rest of the LinkedIn stack.
- Shrinking leverage: Galene, LinkedIn’s new search engine, replaced Sensei for a variety of reasons. With no other major use case remaining in the engineering org for Sensei, the cost of maintaining and operating it remained the same, while leverage decreased.
- Senescence: Sensei ended up receiving a series of incremental performance hacks to keep it running in production and to stretch its scalability limits. This decayed the code quality, and further negatively impacted developer ramp time, feature commit time, and testing and operational complexity. Coupled with component ownership fragmented across the engineering organization, it was fairly challenging to keep Sensei ahead of LinkedIn’s growth curve.
FollowFeed: New Feed Infrastructure
FollowFeed generates the feed for a member upon request by aggregating content created by entities in which they are interested. An entity can be another member, a company, an educational institution etc. For supporting such complex requests, we need an index that associates content created by an entity with the entity’s unique ID. This index for an entity will be referred to as timeline in this blog post.
A timeline is a chronologically ordered list of records by or about an entity. For example, there are many different records which go in a member’s timeline: member shares an article, member is mentioned in an article, etc. FollowFeed uses this concept of a timeline to compose the feed for the member.
The primary design requirements of the feed infrastructure are:
- Scalability: Architecture should be easily horizontally scalable for both reads and writes.
- Read Performance: As this system will support multiple feed use-cases, it’s very important that this system is efficient in terms of read performance.
- Relevance support: As mentioned earlier, it’s quite important to rank content records via relevance algorithms. Thus, it is essential to make relevance algorithms a first class citizen of this architecture.
We designed FollowFeed to support the aforementioned requirements with the following demands:
Bringing computation closer to data: While building distributed systems, there are two ways in which processing can be done on data:
Data can be brought closer to the computation logic: Processing is done on data fetched from remote nodes.
Computation can be brought closer to the data: Computation is performed on the nodes storing the data.
As described earlier, personalization of feed is critical for feed applications. This requires providing a large number of records as input to the relevance algorithms. Fetching such a large number of records from remote data stores will result in an increase in query latency. Hence, we decided to co-locate computation with data to reduce our query serving latencies.
Query on Demand vs. Pre-materialization: We considered two strategies for constructing a member’s feed.
- Pre-materialization or Fan-out-on-write model: Feed for each viewer is pre-computed, materialized and kept ready for retrieval using a simple lookup query. This is made possible by fanning out a content record to pre-materialized feeds of multiple entities.
- Query on demand or Fan-out-on-read model: Feed for a viewer is constructed at read time by retrieving content records indexed against a single column/field or a combination of different columns/fields in the record schema. This may result in fanning out queries to multiple nodes/partitions depending on the scaling model.
We evaluated both strategies and decided that the fan-out-on-read model is more efficient for our use case because of the following reasons:
- In the fan-out-on-write model, multiple copies of each content record are persisted resulting in an increase in data size. In our experiments, we found out that data size with this model was 62 times greater than pull based architecture.
- Since first class support for relevance algorithms is a must-have in our feed infrastructure, the pre-materialized feed needs to be persisted in the order determined by the relevance algorithms. This approach makes A/B testing of relevance models significantly harder because rolling out a new relevance model and accommodating the new relevance scores and ranks will result in grandfathering every pre-materialized feed (re-ordering, potentially evicting records with low score and ingesting records with higher score). This grandfathering step can be IO intensive, thus it can be slow and reduce the velocity of overall iteration and A/B testing. Slow grandfathering can also significantly impact the time required to resolve any ranking issues introduced by bad relevance models. Fan-out-on-read style architecture can be significantly easier to operate and improves speed of iteration for relevance A/B tests, since relevance scores can be computed on the fly with each query.
In FollowFeed, timelines are logically organized as key-value pairs where the key is a tuple of entity id and content type, and the value is a list of content records in reverse chronological order. The feed index is composed of hundreds of millions of timelines, and the index is partitioned across a cluster of machines.
The index is over-partitioned and each index node hosts multiple partitions. The idea behind over-partitioning is to make cluster rebalancing easier by allowing entire partitions to be moved to the new nodes and avoiding splitting the existing partitions. If existing partitions are split into new partitions, the data ingestion and query routing logic will need to change to account for the new partition range, which will add operational complexity.
To be more specific, for the number of partitions, we chose an integer that was divisible by many smaller integers (we chose 720). This gives a wide range of choices for the number of machines that form the index node cluster (40, 45, 60, 80, 360, etc), where each node will host multiple partitions.
Deep Dive into the Index Node
Embedded Database for Persistence
For persistent storage, FollowFeed uses RocksDB which is an embeddable persistent key-value store. RocksDB is built on top of LevelDB for server side workloads, and it is optimized for low latency accesses to fast storage such as SSDs. We built a JNI binding to enable interactions across language boundaries, and contributed this code back to the open source.
A timeline is stored in RocksDB as a linked list of blobs (byte arrays). Each blob is a serialized representation of content records. The records within each blob and in turn the blobs themselves are put in reverse chronological order in the timeline. Each blob in the timeline is retrievable using a key, which is a serialized tuple of an entity ID, content type of the records in that timeline and the blob’s position in that timeline.
An update operation on the timeline is thus effectively an update operation on one of these blobs. Insertion of new records into the timeline involves binary searching the correct blob using some metadata persisted per blob (also in RocksDB), and then performing a read-update-write operation. This metadata includes the timestamps of the oldest and the newest records in the blob as well as the total number of records in the blob. To keep the Read-Update-Write operations cheap, it's important that the size of each blob be small and finite. This is accomplished by creating a new blob when the original blob becomes bigger than a pre-configured size limit. In case of an out of order update—that results in inserting new data in the middle of a timeline—if the blob size limit is exceeded, the original blob is split into two blobs and both the resulting blobs are persisted with their own unique keys.
A timeline is basically a linked list of blob keys, and the structure looks like: entityId_head -> entityId_n -> … -> entityId_0, where n is a non-negative integer. Please note that the numerical suffix in connected blob keys does not necessarily increase or decrease in any order from head to tail of the linked list.
Allow us to demonstrate the record insertion and blob split mechanism with three examples. In the figure above, there are three timelines on the left side:
- T0: m0_head -> m0_1 -> m0_0
- T1: m1_head -> m1_2 -> m1_1 -> m1_0
- T2: m2_head
For illustration purposes, let’s say that there was a record inserted into each of m0_head, m1_1 and m2_head. Let’s also say that the record insertion to m0_head did not result in m0_head crossing the blob size limit, insertion to m1_1 and m2_head resulted in those blobs crossing the blob size limit. The result is the following as indicated on the right side of the above figure:
- m0_head absorbs the new record and continues to be the head of the list of blobs that form timeline T0. T0’s daisy chain of blobs continues to look the same.
- Timeline T1’s head remains unchanged, however m1_1 splits into two blobs m1_3 and m1_1. T1 now looks as m1_head -> m1_2 -> m1_3 -> m1_1 -> m1_0. The new blob’s suffix 3 is computed using some persisted metadata (highest numerical suffix per timeline).
- In case of timeline T2, the blob m2_head gets renamed to m2_0, and a new blob m2_head is added at the head of its daisy chain. T2 now looks as m2_head -> m2_0.
We realized that deserialization was a major bottleneck in serving requests. To optimize latency, each node maintains a read/write-through cache of deserialized content records. Our implementation uses the open source Guava cache and leverages its functionalities such as the read-through LRU behaviour, limiting the total memory utilization by the size of the key-space or by computing "weights" of entities being cached.
Utilizing this cache effectively is mission-critical for FollowFeed. This translates to two requirements:
(1) The cache should provide high performance.
We usually deploy each index node with a large JVM footprint to cache as much data as possible. Experiments showed that due to the large size of this cache, concurrent evictions and reads became expensive. So using Guava’s configs, we partitioned the cache internally into multiple sub-caches. This optimization allows concurrent access to those sub-caches and also speeds up evictions.
(2) Good controls over what kind of data gets cached or gets evicted.
The memory used by the index node cache depends on two factors: the total number of timeline keys and the total number of records that are cached. Many timeline keys don’t have any records associated with them in the cache, because some members may not have shared new content in the feed for the past few days or may not have shared content of a certain type in the last few days. It is desirable to have good control over which timeline keys get evicted from the cache—those that have a non-empty list of records associated with them, or otherwise.
To achieve this, we use two instances of the Guava cache: one instance is called the fat cache, the other is called the skinny cache. The fat cache holds timeline keys and a non-empty list of records associated with each one of them, the skinny cache contains keys that don't have any records associated with them. The fat cache’s holding capacity is defined in terms of its ‘weight’—the total number of records it's holding, whereas the skinny cache’s holding capacity is defined in terms of its ‘size’—the total number of keys it's holding. By sizing these two instances well, we could reach the right effectiveness of caching and eviction.
FollowFeed uses Avro as the serialization protocol for data storage. Since Avro needs reader and writer schema instances for ser/deser, we needed a schema registry that stored all versions of schemas with which the data was serialized and persisted. We chose to make the index node schema aware, which means that along with timelines, each index node also persists a Schema Registry of Avro schemas in RocksDB. This registry is used to serialize and deserialize content records. There is also an in-memory deserialized object representation of this registry to allow fast access to schema objects.
Index nodes support business logic-specific filtering using parameters such as privacy, geo-restrictions, content types etc. Viewer specific data required for filtering, such as the viewer’s privacy geo-location, are obtained from helper services. Also, FollowFeed defines a custom grammar for filtering that allows clients to specify different filtering criteria. To easily comprehend FollowFeed’s filtering logic, here is a SQL-like example of a client specified filtering criteria. (Please note that FollowFeed does not actually use SQL and has its own query language):
FollowFeed performs such filtering logic on records retrieved from the Guava cache. Records that pass filtering then become input to the relevance algorithms.
A key feature of FollowFeed is highly performant online relevance which ranks tens of thousands of content records per request. Low latencies are achieved by performing this relevance computation in parallel on index nodes that are serving a query. Each index node persists and caches relevance features of different entity and content types. These features are imperative in building a valid feature vector for ranking. A feature vector can be imagined as a mapping of content record onto an n-dimensional vector of numerical features.
One of the approaches we considered was to use an in-house scoring library. This library is essentially an inventory of data transformation and scoring modules, with a control flow that (1) generates a feature vector by performing a series of data transformations that parse a content record and create intermediate feature representations, and then (2) applies a regression (scoring) function to the feature vector and the model coefficient vector which is provided as input to the library. The result of the scoring function is the relevance score of the content record. Since FollowFeed scores millions of content records per second, we need the scoring function to be as efficient as possible. The data transformation steps in this library were heavy on iteration, hash-based lookups and intermediate data representation marshalling, thus these steps proved to be a bottleneck in achieving our latency SLA. At the same time, the ability to easily configure a few parameters to test and iterate through a variety of data transformations and relevance models implemented in a well-tested Java library was quite appealing. So, we came up with a new solution that retained the interface and configurability of this library, but made the implementation more efficient.
The solution was to auto-generate optimized Java code for the data transformation and scoring steps. The code generation mechanism can be thought of as being analogous to the optimization passes performed by a multi-pass compiler. However, a compiler’s optimization pass can strictly only observe the code flow without any knowledge of the inputs that may be provided to the program. Since we know the schemas of content records, and the exact data transformations and regressions that we need to perform, we can generate significantly optimized code that has certain assumptions about the input baked in. For the mobile feed use case, the optimized Java code takes about 50 microseconds (p99) to perform relevance computations on one content record, which is 15 times faster than the scoring library mentioned in the previous paragraph.
FollowFeed is designed to be eventually consistent, so all data ingestion is done asynchronously through Kafka. All the timeline data is present in a feed Kafka stream, which is a stream of all content records on LinkedIn—shares, likes, comments by members and by non-member entities such as companies, schools, etc. We have created an intermediary service called Partitioner, which consumes from the content record’s Kafka stream and republishes those events into another Kafka topic using a custom partitioning function. The resulting Kafka topic’s partition range matches the partition range of FollowFeed's index node cluster. To consume timeline data, an index node simply subscribes to the same partition range of the Kafka topic as the partition range of timeline data that it’s hosting. The use of Kafka for data ingestion into index nodes allows us to consume the data for a given partition into multiple replica index nodes thereby allowing us to easily add more replicas for a given partition set. Relevance features are similarly re-partitioned and ingested from a different Kafka stream.
As we will see below, the ease of adding more replicas results in high availability and read scalability, since all replicas serve read traffic. Also as you can probably guess at this point, the custom partitioning function used to re-partition feed events on the ingestion path is the same as the partitioning function used to route real-time queries to index nodes.
A feed query from a client is routed to the index nodes using a fan-out service called the Broker. The broker receives feed requests from a midtier service, each request includes (1) a list of entities that are of interest to the viewer, (2) use case specific filtering criteria, (3) number of content records that should be returned, and (4) viewer specific data that would be used for use case specific filtering. The broker transmutes the entities and content types it received into timeline keys and packages these timeline keys into requests for the index nodes. These requests are fanned out to the index node cluster in two steps: (1) The broker determines a mapping between the timeline keys and the partitions they belong to using the same custom partitioning function which is used on the ingestion path. (2) The broker is aware of the mapping between partitions/replicas and index nodes via D2, which is a library that provides name service and dynamic load balancing functionality using Zookeeper. Thus, the broker can now fan out requests to the appropriate index node.
The request received by an index node includes (1) a list of timeline keys whose records needs to be retrieved, (2) time range of the records that should be considered for filtering and relevance scoring, (3) number of scored timeline records that should be returned, (4) viewer specific data that would be used for filtering and (5) use case specific filtering criteria. The index node performs a batch-get and retrieves a list of content records from the underlying cache/storage using these parameters. These records undergo business logic specific filtering, and then get scored and ranked using relevance function. From the resulting ranked list of records, the requested number of records with the highest scores are returned from each index node to the broker.
Replicas in the index node cluster are set up in master-master mode. So, all replicas serve real-time read traffic.
The broker then performs a pass of business logic specific deduplication and diversity related filtering. This additional pass of filtering (besides the filtering already performed by index nodes) is required to filter out records that are too similar to each other but were returned by different index nodes. Finally, the broker sorts the filtered list of records using relevance scores, and returns the requested number of records with the highest scores to the client.
Multi Data Center
In the multi-data center world, different viewers can be sticky-routed to different data centers. It’s imperative that a viewer’s feed requests should be served as efficiently as possible—preferably from the data center that the viewer is sticky-routed to. This means that the timeline data of the entities that the viewer is interested in should be available in the data center locally. Since this set of entities keeps evolving continuously, we decided to replicate and store timeline data of all entities in all data centers. This is achieved by ingesting data from an aggregated Kafka stream that includes content records published from all data centers.
The size of FollowFeed’s index node clusters currently in production is primarily a function of the read traffic, so the index node cluster sizes can be different in different data centers according to the percentage of the read traffic being sticky-routed to those data centers. Each data center hosts the Partitioner service that consumes the feed updates' Kafka stream and re-partitions events. Similarly, the broker nodes routes queries to the appropriate index nodes according to the index node cluster configuration in that specific data center.
A/B testing support is baked through-out the FollowFeed stack by integrating with XLNT—Linkedin’s A/B testing platform. We routinely A/B test multiple relevance models at the same time on different segments of the members, a relevance model can be chosen per viewer using a variety of criteria.
Operations: Debugging and Performance
Besides read scalability, A/B testing support, etc., a key focus during building FollowFeed was to bake operability into the system before it was launched in production.
This started with treating the persisted state mission critical and translated into the following requirements:
- Monitoring of state: Monitoring includes real-time breakdown of the characteristics of data such as the count/rate of different content types, content types in cache versus persisted state, and also alerts on the state to catch any significant departures from the norms, etc.
- Recovery: Every stateful system needs the right tools to recover from state corruption. We implement this through reliable checkpointing of timeline data and recovery through backups. Data on individual storage nodes is backed up incrementally at fixed intervals to network-attached storage filers, which provide reliability through data redundancy. Backups are used to bootstrap index nodes, to bring FollowFeed clusters in new data centers online, create more replicas, etc. We also backup Kafka consumer's offsets so that during recovery, the index node can consume from Kafka from the right log offset.
- Reliable admin tools: One-off admin tasks that affect the state should not result in outages. Tools for cluster rebalancing and re-sizing have been designed such that in case of errors the intermediate states can be safely rolled back. These tools are also continuously tested in the staging environment. We also have well-tested admin commands to retrieve and manage the state on the index nodes such as commands to (1) obtain data from the cache or the database for a specified member id, content type and time range, (2) delete cherry-picked data from the cache and/or database, (3) delete the caches entirely, (4) turn the kafka consumption on or off, etc. These tools and commands are dummy-proofed using sufficient warnings and override requirements.
Predictable Performance 24x7
- To ensure that disk and network heavy operations such as backups do not interfere with real-time performance, we have a dedicated a set of index nodes that are used only for ingesting and backing up data. These nodes do not serve real-time traffic and are used for periodic backups. These nodes also host more partitions than the nodes that are serving real-time traffic.
- The request/response loop between the broker and index nodes is asynchronous, which helps utilize broker threads more efficiently.
- We have set an SLA on the p99 latency at the broker. If a request to the broker resulted in a request fan-out to n number of index nodes, then the broker needs to wait for responses from all of these n index nodes. In this case, higher latencies at any of these index nodes impacts the latency at the broker. To optimize this, we have put a lot of emphasis on optimizing the index node’s performance through Read-Copy-Update synchronization, concurrency control through fine-grained locking, garbage generation and collection optimization, Java cache and RocksDB configuration, TCP tunings, avoiding connection churn, page flush and swappiness optimization, etc.
- To further mitigate the impact of higher latency percentiles at the index nodes, the broker fires duplicate requests to 1, 2, or 3 replica index nodes (all of these nodes are hosting the same set of partitions) with appropriate time-outs inserted between firing of each duplicate request. The Broker waits for responses from each of the duplicate requests and terminates the in-flight requests as soon as a response is received.
- We leverage D2 to gracefully degrade performance in case a node gets overwhelmed with queries or its performance degrades during normal operations. D2 assigns and dynamically evaluates certain scores per broker and index node, and drops requests if the score drops below a pre-configured threshold.
- Performance regressions are caught as early as possible: We log and plot timeouts and errors, and there are routine performance tests before deploying newer builds to production that compare timeouts and errors of the new build against the last known good build.
- Email alerts are fired if the quality of service degrades in production due to decrease in throughput, increase in latency, exceptions, etc.
- At times, systems see performance and timeout issues for certain requests that are not reflected in the percentile graphs. One functionality that has helped us immensely in tracking down such corner case performance (and functionality) issues is to enable TRACE level logging for certain viewers’ requests using a dynamic config. The viewer specific logging also avoids log spam and a sudden drop in performance that typically happens when log levels are changed for the entire process through a JMX “backdoor” call.
LinkedIn's newsfeed has come a long way, from just being a chronological set of records to using complex relevance algorithms to rank these records. To cater to these requirements and to support more feed queries due to an increasing user base, LinkedIn's newsfeed infrastructure has changed significantly during this time—from a generic search based solution to a more feed-optimized indexing solution. We recently finished ramping all traffic from Sensei-based architecture to FollowFeed and we have seen significant improvement in key metrics:
- FollowFeed’s p99 latency for the mobile news feed is around 140ms which is five times faster than Sensei. As a result of this, we also saw 150ms improvement in p90 page load latency.
- FollowFeed can host roughly 20 times more data than Sensei.
- FollowFeed migration resulted in reducing overall capex cost by 50 percent compared to Sensei.
FollowFeed has been a huge undertaking over the last couple of years. This project could not have been accomplished without the tireless efforts of feed platform team: Ankit Gupta, Elizabeth Bennett, Joseph Zemek, Min Cao, Nandhini Santhanam, Parin Shah, Roshan Sumbaly, Shubham Gupta, Shubh Mondal, Swapnil Ghike, Tom Chiang and Vivek Nelamangala. We would like to thank Aarthi Jayaram, Asher Feldman, Bee-Chung Chen, Guy Lebanon, Kevin Chang, Liang Zhang, Pannaga Shivaswamy and numerous teams within LinkedIn for partnering with us in developing and ramping this system. Last but not least, thank you Raghu Hiremagalur for leading and architecting this project.