InSearch: LinkedIn’s new message search platform
March 17, 2020
Co-authors: Suruchi Shah and Hari Shankar
The rise of instant messaging has changed how we communicate. Compared to the back-and-forth of email, we send and receive messages at much higher volumes and speeds. In the instant conversations we have, we also expect to be able to search for important phrases, moments, or references with ease. This rapid increase in the volume of data exchange calls has brought a number of new engineering challenges in enabling the scalable and efficient discoverability of messaging information.
In this blog, we discuss how we improved the messaging search experience by revamping our messaging backend architecture from the ground up and introducing a message search backend that we call InSearch.
If we were to support message search using LinkedIn’s traditional search infrastructure, the cost to build and serve the index nearline would be prohibitively high. This is because:
- The total volume of messaging data to be indexed is very high relative to other use cases.
- The rate of updates to the index is significantly higher, given the increase in message exchange (in the order of thousands of writes per second).
- This data needs to be encrypted at rest, as well as in transit, because messaging data is highly confidential.
Additionally, we observed that the ratio of search queries to messages being created was quite low. This makes reducing the cost of search infrastructure an important concern. We used this and other observations about the data and usage patterns to design a system which satisfies all our requirements while being cost effective at the same time.
High-level architecture for InSearch
Message search is confined to a member’s respective inbox. You can only search within your own inbox, so only your data needs to be indexed in memory for queries to be served quickly. Looking at usage, we also know that members who use message search typically tend to be power users, meaning they rely on our search functions often. This makes an ideal case for having a per-member index and makes the index very cacheable, which led us to explore the idea of generating member indexes only when a member performs a search, and then caching the index.
Our Searcher service internally uses Lucene as the search library. A key requirement for working with highly confidential data, like messaging, is to ensure that all data is encrypted on disk. At the same time, we also need to be able to support a high rate of updates. Storing the index on disk would require a lengthy process (reading the encrypted index from disk, decrypting it, updating it, encrypting it again, and persisting it back) that makes writes very inefficient—important to note is that this is a fairly write-heavy system.
Consequently, we traded off the index creation speed to get a better write throughput. This was achieved by storing each raw document encrypted in a key-value store, with the key being a combination of memberId and the documentId, and the value being the encrypted document (e.g, a message). Note that this is a simplified version, in production it is more complex where we have different document types, and the actor is not necessarily a member. With this design, a new message is simply a new row added to the key-value store, making writes to the system very fast. We use RocksDB as our key value store as it has been verified to be reliable and effective at LinkedIn (see examples of FollowFeed and Samza). It also doesn’t hurt that we have significant in-house expertise supporting it.
High-level Searcher flow with RocksDB key-value structure
When a member performs his or her first message search, we run a prefix scan of keys (prefix being the ID of the member) from RocksDB. This gives us all the documents for the member that are then used to construct the index. After the first search, the index is cached in-memory. The results? We observed a cache hit ratio of around 90%, and a resulting overall 99th percentile latency of the order of 150ms.
For writes, we insert encrypted data into RocksDB. If the index is cached, then the cached index is also updated by reading the updated document from the DB again. We also persist cached member IDs, so that their indexes can be reloaded into cache on startup. This keeps the cache warm—even after deployments.
Partitioning, replication, and backups
As with most distributed systems, replication and partitioning are how we deal with scalability and availability. The data is partitioned by a combination of member ID and document ID. This allows data for one member to be distributed across multiple partitions, and helps us scale horizontally for members with large inboxes since the index creation load can be shared by multiple partitions.
We have three live replicas for each searcher partition and one backup replica. Each replica independently consumes indexing events from Kafka streams. We have monitoring in place to ensure that none of the replicas develop more lag compared to its peers. The backup replica periodically uploads a snapshot of the database into our internal HDFS cluster. We also backup the Kafka offsets along with backups. These offsets are used to ensure that we are fully caught up with missing data from Kafka before the service starts up from a backup data set.
The source of truth for messaging data is Espresso tables. We consume updates from these tables using Brooklin streams into a Samza job, which then transforms these changelogs into the format required by the searchers for indexing. The stream processing job joins this stream with other datasets to help decorate the IDs with actual data to be used (e.g., decorating member IDs with their names). It also takes care of partitioning the data required by the searchers. Now, each searcher host only has to consume data from the specific Kafka partitions that it hosts.
The Broker service is the entry-point for search queries. It is responsible for:
- Query Rewriting: It rewrites the raw query (example: “apple banana”) into InSearch format (example: TITLE:(apple AND banana) OR BODY:(apple and banana)) based on the query use-case. Different search queries may prioritize certain fields to search on with varying scoring parameters.
- Scatter gather operation: It fans out requests to Searcher hosts, collating the results from searchers.
- Retries: Brokers will retry requests on a different searcher replica in case of retriable failures or if a particular searcher is taking too much time
- Re-ranking: The results from all searcher hosts are re-ranked for a final result set, and pruned based on pagination parameters.
Brokers use our internal D2 zookeeper service (which maintains the list of searcher hosts for each partition) to discover the searcher hosts, for each partition, in order to select hosts for fanout. We also ensure sticky routing on these hosts such that requests for a given member go to the same searcher replica, so that the index does not get rebuilt on multiple replicas and we deliver a consistent search experience.
As of today, all message search requests from the flagship LinkedIn app are now served by InSearch, and we are able to serve search requests with a 99th percentile latency of under 150 ms.
We are currently in the process of migrating several of our enterprise use cases into the new system, and evaluating other applications. Additionally, we are now starting to leverage the new messaging search system to accelerate improvements to the LinkedIn messaging experience.
The engineering team behind InSearch consists of Shubham Gupta, who envisioned and championed the project in the early days as well as played a key role in its design and development, Ali Mohamed, Jon Hannah, and us. InSearch was also an amazing team effort across organizations. We would like to thank our key partners from the messaging team—Pradhan Cadabam, Pengyu Li, Manny Lavery, and Rashmi Menon. This project would not have been possible without the strong support from our engineering leaders: Parin Shah, Michael Chernyak, Josh Walker, Nash Raghavan, and Chris Pruett.