Graph Systems

Building the Activity Graph, Part 2

Co-authors: Vivek Nelamangala and Val Markovic

Editor’s note: In Part 1 of this series, we talked about how we built the Activity Graph and how we use it to deliver features to Activity searcher machines. In this post, we’ll talk about how we store this data for efficient storage and retrieval.

Quick background on FollowFeed

FollowFeed is the main Activity indexing solution at LinkedIn and powers the news feed on the homepage, in the app, and on many other pages. FollowFeed is designed to answer queries such as “Fetch a feed for viewer X, connected to members A, B, and C, following companies D and E, etc., sorted according a specific algorithm.” In order to perform these kind of searches efficiently, FollowFeed indexes all Activities as a time-ordered list of Activities for each member, company, etc., keyed by the URN of the each of those actors.

FollowFeed also needs to respond to these queries in as short time as possible to provide a good feed experience to our members. To be efficient, FollowFeed heavily relies on the most frequently accessed data being available in memory on the searcher nodes, and it will considerably slow down if data has to be fetched from disk.

A record representing an Activity in FollowFeed is called a TimelineRecord. It is critical to ensure that a TimelineRecord contains only the information needed to perform the core function of candidate selection; in other words, we must keep the size of each TimelineRecord as small as possible so that we can fit as much of the Activity index in memory as we can.

activitygraphpt21

For the purposes of spam and low-quality filtering, we also need to store a set of content quality labels associated with each piece of content that can appear in the feed. These labels are used by the filtering code to decide whether or not any Activity referencing the content should appear in the member’s feed.

Content quality labels are a type of data we call “features.” Features refer to data derived from the original content; this data can be dense (the feature is present for every piece of content) or sparse (the feature is present for only some pieces of content). Content quality labels are sparse features, as the vast majority of content at LinkedIn does not have any labels restricting its distribution. An example of a dense feature would be the detected language for articles.

FollowFeed needs access to many features for each TimelineRecord to be able to perform the filtering and ranking functions.

The easiest way to store features in FollowFeed would be to store them as fields in the TimelineRecord. For sparse features such as content quality, one could argue that storing these labels de-normalized in each TimelineRecord would not consume a lot of memory. However, even for sparse data, we would need to to have a field inside of the Java class representing a TimelineRecord. Having a field costs memory, as Java needs to use an extra eight bytes in every single TimelineRecord to potentially hold a pointer to the Java object containing the data.

The de-normalized approach to feature storage is even more memory inefficient for dense features, as multiple copies of the same feature data will probably exist on a machine. If a very popular article gets shared and liked by lots of members, each like or share creates a new Activity that will be indexed by FollowFeed. Since there would therefore be a large number of Activities, it is extremely likely that there would be multiple TimelineRecords in each searcher node that would have a reference to this article. If this article were to receive a new content quality label, that label information would have to be be present multiple times, once per each related TimelineRecord in each searcher node. Any changes to this article data would also need to be changed in multiple places. So while storing data de-normalized makes retrieval simple, it consumes needless memory and introduces a lot of complexity when it comes to keeping the information updated.

Our solution to this problem was to store the content quality labels and other feature data for content referenced by Activities in separate logical stores inside each searcher node. Thus, we would store the feature data in a normalized form. Once we have selected a list of candidate TimelineRecords for a request, we would then have to lookup all feature data required for filtering and ranking purposes from each individual feature store. These lookups need to be fast— on the order of nanoseconds—as each feed request requires multiple hundreds of thousands of lookups into each of these feature stores.

Feature storage infrastructure

The infrastructure we built to do do these lookups at scale is called EntityFeatureStore (or EFS for short). EFS consists of a bloom filter and a Caffeine cache instance, backed by RocksDB for on-disk storage. One RocksDB instance is used to store the data itself and another to store a snapshot of the in-memory cached keys.

activitygraphpt23

An important feature of EFS is that an EFS instance never starts cold. We keep a copy of the keys stored in the in-memory cache in an instance of RocksDB which we call “CachedKeysStore.” Each time a record is added, deleted, or evicted from the cache, the addition or removal is reflected to the CachedKeysStore. When an instance of EFS is initialized (for example, after the application is restarted), code in EFS automatically warms up the cache. This is done by iterating over all the keys present in CachedKeysStore and for each key fetching the data for it from the corresponding RocksDB instance and then loading it into the cache. Creating each instance of EFS already warmed up before a FollowFeed searcher node starts serving live traffic ensures that the first few requests don’t pay a latency penalty because of cold caches.

We also use a Caffeine loading cache inside EFS. If the cache receives a request for a key it doesn’t have, it loads it from RocksDB. Thus, EFS is designed as a write-through cache. When a new record is added to the cache, we write the relevant data to the bloom filter, the Caffeine cache, and to the RocksDB instances backing the cache (the data store and the cached keys store).

EFS is typically used to look up sparse data in FollowFeed. So for a large number of records, we won’t have any values. However, Caffeine cache does not cache negative lookups, i.e., if we find no data in RocksDB for a particular key, the empty response is not cached. This means that if we don’t have a feature value for a particular key, the loading cache will cause disk IO every time, which is bad for performance. To avoid this, we store a special sentinel value for negative lookups. The notion of sentinel values is internal to EFS and is never leaked to callers.

Since some feature data is sparse, the vast majority of the calls to EFS end up being negative look-ups (for content quality, over 99.99999% of calls). With the sentinel approach described above, we have an entry for each miss! This is very memory inefficient; we have to do better.

Bloom filters for negative lookup performance

So, to optimize memory utilization while supporting this access pattern, we use a bloom filter in front of the cache. The bloom filter is built during EFS initialization from the data in RocksDB. When a lookup is made to an EFS instance, we first look up the key in the bloom filter. The bloom filter indicates whether the cache may contain the key, with a pre-configured false positive rate, which we typically set to 1%. Only if the bloom filter indicates that they key may be present in the data do we make a lookup into the cache (and thus possibly cause a lookup from disk). What this means is that for sparse data sets, the vast majority of requests to the EFS are satisfied by the bloom filter. The size of a bloom filter instance used to store N keys is in the order of log(N). The size of a cache instance used to store N key-value pairs would be in the order of N. This translates to several gigabytes of memory space saved for large key sets.

Because we would end up making hundreds of thousands of calls to the bloom filter for each request, and we have many requests being processed in parallel, we will potentially be making millions of bloom filter lookups per second with this system. Since our application also has to process writes to EFS coming in from the indexing system, we needed a thread-safe bloom filter.

We started by using the bloom filter implementation from the Guava library. Since this implementation is not thread-safe, we initially guarded access to the bloom filter using a read-write lock. As we expected, this proved to be a bottleneck. Thus, we needed a lock-free bloom filter. We couldn’t find an open-source one for Java that we liked, so we changed the Guava implementation to be lock-free and submitted a pull request to the project (which was later merged) so everyone could benefit from this work.

While we initially built EFS to support normalized storage of content quality labels, we now have about seven different instances of EFS storing various types of feature data, including PathsToRoots for each Activity. We also have several new features being onboarded to FollowFeed, as this infrastructure enables much faster iteration on feature development.

Making decoration faster

Once the storage issues were resolved and the first use-case of content quality was fully powered by the Activity Graph, we started noticing ways in which we could further improve the member experience at LinkedIn by using the graph in new places. One of the first ideas that came to mind was improving decoration speed.

What is decoration?
LinkedIn follows a service-oriented architecture. Each service acts as a source of truth for data in its domain; for example, member data is owned by a service called Identity. Unique URNs are used to address each data record, e.g., member data records have URNs like urn:li:member:<ID>, where ID is a unique identifier for a record representing member data within the member domain.

LinkedIn’s data is also usually normalized. So if record A needs to reference data contained in another record B, the URN of record B is used inside record A rather than the data of record B being copied inside record A. The normalized data representation has a number of advantages, such as eliminating data duplication and maintaining referential integrity. When we are rendering a URN, all these references need to be resolved to their actual data. This process is called decoration.

Let’s take the example of decorating an Activity of a member liking an article. This Activity is represented by a unique URN and the Activity record might look like this:

urn:li:Activity:123
Actor: urn:li:member:123
Verb: LIKE
Object: urn:li:article:123

To render this information in the UI, we need to fetch data for each of the referenced URNs: urn:li:member:123 and urn:li:article:123. Which themselves might look like this:

urn:li:article:123
Title: Some article
Thumbnail: urn:li:image:123
Publisher: urn:li:company:123
Author: urn:li:member:234
urn:li:member:123
FirstName: First
LastName: Last
ProfilePicture: urn:li:image:456

After resolving the URNs directly referenced in the Activity itself, we now have more URNs to resolve, so this process continues recursively until we are out of URNs. If we visualize LinkedIn’s data as a directed graph with an edge from a referencing URN to a referenced URN, the decoration process needs to visit all nodes along these edges until root nodes are reached.

activitygraphpt24

Doing this resolution recursively at runtime is very expensive, as multiple sequential network calls are performed. We will know all URNs to fetch in Level 2 only after the record in Level 1 has been resolved. A slow response from a single service will slow down the entire resolution process. At LinkedIn, the code to perform this process lives in the “deco library” that multiple frontend services depend on to provide a standard way of displaying data across all pages on the site.

The deco library has a number of optimizations to make this process faster. A node may be visited more than once; an example would be a member sharing an article they have written. In such cases, the library caches data for all nodes already visited and does not make a second network call (this cache exists for the lifetime of the request). The library also has safeguards to limit the depth of the graph it visits, as bugs in the system can inadvertently introduce loops.

For Activities, we realized that the PathsToRoots field in the Activity Graph which we were originally using to prevent loops contains the pre-materialized path that deco will traverse to render the Activity. This means we can use the PathsToRoots data to speed up Activity decoration!

The deco library now calls our new Activity Graph REST API and fetches the set of ancestor URNs. These are then resolved in parallel. This means that as the deco library decorates an Activity and discovers more URNs, the records for those URNs are already present in the deco cache, and the library does not need to make further network calls.

We are currently rolling out this feature to production for the news feed and expect it to reduce the latency for decoration significantly, which will also reduce the time we take to render the page, providing a better user experience.