Distributed Systems

Distributed tier merge: How LinkedIn tackles stragglers in search index build

hadoop-logo

Co-authors: Andy Li and Hongbin Wu

Indexing plays the key role in modern search engines for fast and accurate information retrieval, and the ability to swiftly build indexes is crucial for LinkedIn to provide up to date information, such as candidates to recruiters, job posts to members, etc. In some instances, such as if a member profile is missing and therefore not searchable, we need to build and deploy a fresh index as part of the solution. To do this, we need an indexing pipeline whose end-to-end index build time is both fast and predictable.

LinkedIn’s indexing pipeline was designed using Hadoop MapReduce (MR) back in 2014, and as the volume of data we’ve needed to index has increased over time, the existing pipeline began encountering scaling issues. For major search products, it eventually took quite some time to finish at its best performance, and could easily go beyond several hours. This hurt both service availability and site operability.

According to the metrics we collected, “stragglers” in MR clusters contributed the most to the delay when an indexing job ran beyond its normal execution time. In distributed computing engines like MapReduce or Spark, data is partitioned into small shards to be manipulated at assigned machines; when machines are slow to complete a user’s job due to issues like network problems, CPU constraints, disk throttling, or hardware failures, we refer to those machines as “stragglers.” In this post, we share the distributed tier merge (DTM), which is the mechanism we developed to deal with stragglers in the context of search index build, and show how it saves 30-40% on indexing time for LinkedIn’s major search products.

Background and challenges

At LinkedIn, search index build includes two stages: events generation (EventsGen for short) and index generation (IndexGen). EventsGen starts with consuming datasets transported by the extract-transform-load (ETL) pipeline and generates Galene (search architecture powering multiple search products at LinkedIn) index events, which thereafter are combined into documents and indexed by IndexGen to produce Lucene indexes. The overall picture is illustrated by Figure 1.

illustration-of-search-index-build-at-linkedin

Figure 1

IndexGen has four stages: combiner, divider, indexer, and merger (CDIM), each of which is a MR job with its own mapper and reducer implementations. The combiner composes Galene events into documents, and the divider ensures that indexers index similarly sized documents to avoid data skews. In order to expedite the indexer, the output from the combiner and divider are split into small shards so each of them can be accommodated by a single machine. These small shards are then distributed to machines inside the MR cluster for indexing. The outputs from indexers are subpartitions (indexes including large amounts of data are too big to fit on a single machine, which is why we divide the data into multiple partitions), and finally the subpartitions belonging to the same partition are merged back into a single segment of the Lucene index. The main purpose of merging subpartitions into a single segment is to achieve better performance at query time. Figure 2 illustrates the inputs and outputs at CDIM. 

illustration-of-inputs-and-outputs-of-cdim-process

Figure 2

The subpartitions of the same partition are merged at a single machine, and there can be a number of subpartitions to merge for an indexing job. Although stragglers can originate at either indexers or mergers, the chance for a machine where subpartitions are being merged to become a straggler is much higher than a machine where a subpartition is being built. One reason for this is that it is easy to mitigate the stragglers in the subpartition build by increasing indexing parallelism, which is impossible when performing the merge. Another reason is that the merge computation is both CPU and disk IO intensive compared to the index computation, which can further exaggerate resource shortages at machines. 

More recently, we’ve successfully migrated IndexGen to Apache Spark. After migrating indexers to Spark, we improved the speed of building indexes by about 30% with a similar parallelism to its MR counterpart; however, we didn’t observe the same improvement for the overall end-to-end build time. These are the major reasons why:

  • Lucene doesn't support distributed merge out of the box—the merger has to merge all subpartitions at a single machine. This means that we cannot leverage standard big data techniques to speed things up by increasing the parallelism.

  • The merger actually takes most (60-70%) of the IndexGen time for major search products when indexing jobs run at their normal speed; therefore, the reduction we gained from executing the combiner, divider, and indexer in Spark is not very significant.

  • The straggling mergers can easily eat up all the gains from executing the combiner, divider, and indexer in Spark. 

    • For major search products, mergers can run for hours even at their best performance, and a straggling merger can magnify the slow merging computation significantly—quite often, it's doubled. 

    • Take the product name-search as an example: after migrating to Spark, the runtime of the combiner, divider, and indexer was reduced by 30%. The merger, however, remains the same, and takes about 70% of the IndexGen time. Therefore, the overall improvement is merely 12%, even considering the best performance of the merger. When mergers become stragglers, such gain is simply negligible. 

Unfortunately, stragglers are common and inevitable in any distributed computing engines, and popular mitigation recipes such as speculative execution cannot help us here for the following reasons:

  • The average merge operation needs hours to complete; when a resources manager speculates that some mergers are straggling and launches backup mergers, it's already too late.

  • Even with backup mergers up and running, the latency is still high, because the backup mergers need hours to complete, too. 

Approach

illustration-of-distributed-tier-merge

Figure 3

The distributed tier merge (DTM) is motivated by the tier merge policy (TMP) in Lucene, and can be illustrated by Figure 3. 

Assume we have total N subpartitions to merge.

  • At tier 0, we can have M merges run at the same time, and each merges N/M subpartitions

  • At tier 1, we further have L merges run at the same time, and each merges N/M/L subpartitions that are produced at tier 0 

  • Cascade down until at some tier, the number of subpartitions becomes less than a predefined value K, or 1 already 

In Lucene, TMP happens in one machine and tries to improve the merge performance by making use of multi-threading. In other words, all merges at the same tier can be performed concurrently. However, this cannot give us an obvious saving in merge time because:

  • The merge operation is CPU expensive and disk IO heavy, so to have multiple threads running in a single machine makes the machine itself a hotspot and vulnerable to resource shortage and competition.

  • The TMP is hard to tune; parameters such as how many subpartitions for each merge at each tier are tied to, for example, the total input data size, and inappropriately chosen parameters could make the merge even slower.

  • Last but not least, once the merger starts straggling at a machine, the magnified latency can offset its gain easily. 

Therefore, as we saw with moving the combiner, divider, and indexer to Spark, the improvement made by TMP is not big enough to change the game. 

How DTM works

The core idea of distributed tier merge (DTM) is to dispatch individual merges in TMP to different executors in the Spark cluster, and coordinate them to finish a TMP distributively. For this to work, the following assumptions must be true:

  1. Each merge of a specific number of subpartitions must run faster in its own machine than it would if we launched multiple merges concurrently in one machine. This is intuitive because there are fewer resource competitions amongst mergers when running merges distributively.

  2. The final merge to a single Lucene segment must take less time—actually it must be much less—compared with merging all subpartitions at once. 

To confirm assumption 2, we took 12 subpartitions and measured the merge time for merges at different tiers in a 2-tier TMP. Table 1 shows the experiment results using different parameters. It’s clear that the final merge can take much less time to complete compared with merging them all at once. For example, to merge the 2 indexes produced at tier 1 only requires 82 seconds, which is less than one third of the time of merging 12 subpartitions produced by indexers directly. 

table-showing-experiment-results-using-different-parameters

Table 1

  3. To execute the TMP distributively, stragglers must be mitigated effectively.

Let's say Pts is probability for any task from a job to become a straggler, then Pj = 1-(1-Pts)^m is the probability for a job with m tasks to have any stragglers, then for a tier that has n jobs, the probability to see at least one job having a straggler is:

 Pt = 1 -(1-Pj)^n = 1 - (1-(1-Pts)^m)^n = 1 - ((1-Pts)^m)^n.

Therefore, we have Pt > Pj, which means compared with running a single job, we have more stragglers at any tier. In practice, not all tiers have stragglers, and it even happens, by choosing a proper number of tiers, that we still have a lower probability, as shown below:

Pall =  Π(1 - ((1-Pts)^m)^n)_t, where t = {1, 2...T}

Meanwhile, if assumptions 1 and 2 are true, then we have a shorter completion time for each merge at each tier. Shorter completion time means less probability for the merger to become a straggler during its merge execution. It also means a shorter time to finish the merge operation, even when stragglers appear.

Assumption 3 implies an opportunity to finish the merge in DTM sooner no matter how many tiers have stragglers. This is crucial to judge the effectiveness of DTM, since we discussed earlier that all performance improvement made by optimizing indexers alone can be offset by stragglers easily. 

Impact

As of 2020, the following LinkedIn search products have been migrated to Spark IndexGen powered by DTM, and satisfactory results have been observed across all of them, as shown in Table 2, where each index build time is the average end-to-end indexing time over multiple executions.

Product Name Build Time Improvement
Name search 40% less 
People search 41% less
Careers search 45% less
Recruiter search 30% less

Table 2

Galene Spark IndexGen not only helps to maintain low variance of index build time, but also supports new and advanced features that expand the scope and use cases provided by LinkedIn’s search tech stack, including:

  • Forward index compression: In Galene, the indexed document data were stored as Lucene’s binary doc values, with the offsets to the fields encoded as fixed length integers. In Spark IndexGen, they are encoded as variable length integers, which becomes more compact as the number of fields in the forward index increases. 

  • Virtual shard grouping (VSG): In VSG, data in a search cluster are mapped to logic groups, and use this mapping information at query time to limit queries to a single group, instead of a full scatter-gather approach. This allows for the introduction of a new group without disturbing the service, and also paves the way for new capabilities such as topical sharding.

Conclusion

By distributing individual merges to their own mergers in a Spark cluster, distributed tier mege (DTM) not only speeds up the merging computation, but also reduces the chance for mergers to become stragglers significantly. DTM powered Spark IndexGen has demonstrated its effectiveness in fighting stragglers in search index build, and ensures LinkedIn can deliver fresh indexes fast to its products for a better member experience.

Acknowledgments

Thanks to Hongbing Wu and Dave Kolm of the search infrastructure management team for their trust and support on every aspect of this project. Thanks to SungJu Cho and Choong Soon Chang for their insightful discussions and shepherding, and thanks for the assistance from every member of the search indexing and i18n team.

Special thanks to Erik Krogen and Sunitha Beeram of the Spark team for their continuous support during the journey; it would not be possible without them.

Last but not least, thanks to the Recruiter and Careers teams, the early adopters of Galene Spark IndexGen and DTM.