Data Management

Managing "Exploding" Big Data

What is the shape of your big data?

While we do love to talk about the size of our big data—terabytes, petabytes, and beyond—perhaps we are not paying due recognition to the shape of it.

Big data comes in a variety of shapes. The Extract-Transform-Load (ETL) workflows are more or less stripe-shaped (left panel in the figure above) and produce an output of a similar size to the input. Reporting workflows are funnel-shaped (middle panel in the figure above) and progressively reduce the data size by filtering and aggregating.

However, a wide class of problems in analytics, relevance, and graph processing have a rather curious shape of widening in the middle before slimming down (right panel in the figure above). It gets worse before it gets better.

In this article, we take a deeper dive into this exploding middle shape: understanding why it happens, why it’s a problem, and what can we do about it. We share our experiences of real-life workflows from a spectrum of fields, including Analytics (A/B experimentation), Relevance (user-item feature scoring), and Graph (second degree network/friends-of-friends).

As you may have guessed already, the data explodes whenever we join (that is, denormalize) two or more tables on keys that are not unique in one or more tables. The shape is, therefore, determined by the relational operators. However, a key observation here is that the shape per se (the widened middle) is in fact not the real problem. What we rather need to look at is how the end-to-end computation is divided across the stages (Map/Reduce tasks or Spark stages). And often, the stage boundaries occur right where the data is the widest (we will see several examples in this post).  This is bad, as it means we need to move this exploded data across the stages (by writing to HDFS, or shuffling it across the stages). It is disquieting to find that we spend more time and cluster resources on moving the data than actually working on it!

It would be ideal if we could influence our stage planning such that the exploded data never leaves the stage boundary. That is, as soon as the data explodes, we should be able to reduce it right away. This post describes our three novel join-and-aggregation strategies on Hadoop, their application areas, and their impact as summarized below:

Domain Application What it was before Our strategy What it is now
Relevance Model scoring for user-item features 6 hours for joins, high Hadoop resources  2D Partitioned Hash Join Joins in 10 seconds!
Graph 2nd degree network/friends of friends 3-4 hours Matrix Multiply Join  10 minutes!
Analytics  A/B testing  40 hours for scaled down data (could not finish for full data)  Mesh Join  5-6 hours!

A wide middle in relevance

The first example is drawn from world of relevance, where we would like to recommend “items” (e.g., jobs to apply for, feeds to view, groups to join, other members to connect to, etc.) to our members. We begin with a list of members, and for each member a list of items to recommend. We have already determined the features for each member and each item, as well as the member-item pairs. We have also trained a model on these features and our job scores each pair in an offline (Hadoop) environment. The goal is to rank the pairs and keep the top K recommendations, which are uploaded to the online serving store. We may further rescore or rerank recommendations online based on real-time signals, but the bulk scoring still happens offline.

Let’s see the exploding data in action. We begin with three tables: (a) the member-item pair features table, (b) member features table, and (c) item features table (top section in figure below). This is a good normalized data model for data at rest. To score the features, we need to join these three tables together (middle section in figure). The member-item table has two foreign keys, neither of which are unique, since we recommend several items to a member, and we can recommend the same item to several members. The denormalization stage explodes the data size. The final scoring stage reduces the data by replacing the feature vectors with numerical scores.

explodingdata4

In the upper section of the figure above (the denormalization stage), the three tables are joined with each other to produce a single denormalized table. This three-way join is accomplished in two steps: (1) join the first two tables to produce the wide intermediate table, followed by (2) join this intermediate table with the third table. With either Map-Reduce or Spark, this would require two data shuffles: of the first two tables (step 1) and then the intermediate and third table (step 2). Map-Reduce would further impose the penalty of making a roundtrip to HDFS for the intermediate data. To repeat from earlier, our problem is not the shape of the workflow, but rather how the stages are divided across the flow—that we need to move this widened intermediate data across the stages.

What we would like is to avoid moving the widened data across the stages. That is, whenever the data expands, we should be able to process it right away (in this case, score the features). The following describes our 2D Partitioned Hash Join strategy for joining on two foreign keys.

2D Partitioned Hash Join
As you will see, this strategy is a hash join (creates a hash table on one table), partitions the tables to horizontally scale on cluster, and is 2-dimensional to simultaneously join on two keys. And the denormalized data, on either foreign key, will never leave the stage boundary.

The figure below illustrates the strategy. The member features table is hash-partitioned on member into “M” partitions (which we will call slices). Each slice is stored as a hash table with member as key and features as value. The item features table is similarly hash-partitioned into “I” slices, again as a hash table. The number of slices are determined such that one slice from each can be loaded in memory. The pair features table is partitioned into a cross product of M*I partitions using a custom function. A neat thing here is that we don’t need to separate partitioning jobs; it can be piggyback on the upstream workflow stages that generates these features, effectively making this partitioning free.

explodingdata5

The left side of the figure above shows the partitioned tables. A mapper is launched for each pair partition, which will load the hash table slice for member and item features. In the figure above, the logic is shown for a mapper that processes partition 4 of member features and partition 1 of the item features. Thereafter, for each record, we look up the member and item features from the hash tables and score them right away! The code is given in pseudo-form, setup is called once in a mapper, map is called for each record, and the load_[member|item]_slice loads the slice file directly from HDFS.

Observe how we have shifted our data read/write/shuffle patterns: previously we were shuffling and writing the intermediate data. With 2D Partitioned Hash Join, we have completely avoided that. Our cost now is reading the same partitions of member and item features tables from several mapper sites. This is better, as we are reading the pre-joined data (which is smaller) and the read access is fully parallelized (which is faster).

The net effect? Loading the slices in memory takes less than 10 seconds. So, the two joins that used to take several hours are now done in seconds!

A wide middle in graphs

A key value of LinkedIn’s professional network is discovering connections with people you should know or reconnecting with old or lost connections. Over the years, the People You May Know (PYMK) feature has been helping members build a healthy and thriving professional identity and network. One component of discovering connections (presented in simplified form here) is via the second degree network (also called friends-of-friends network). If you and I are not already connected but we both share several mutual common connections, it is a good indicator that our professional identities overlap and we should “get in touch.”

The “standard” technique for generating this on a cluster is actually rather simple. The graph is stored as an adjacency list: each row of data is a member and a list of their connections—e.g., {member: Adam, connections: [Bob, Charlie, David]}. The mappers load the partitions of this graph and generate the pairs of second degree connections as:

{Bob, Charlie, mutual: Adam}
{Bob, David, mutual: Adam}
{Charlie, David, mutual: Adam}

That is, each input row produces several output rows. If each member has on average 100 connections, the mappers will produce 10,000x rows to the next stage of computation. This is the exploding data!

Fortunately for us, this huge intermediate data is subsequently reduced in two ways: first, even though there may be, say, 50 mutual connections between Bob and Charlie, we will group them and only count the number of common connections (instead of 50 rows from mappers, we only keep one row in reducer). Second, we will discard such pairs with low (say, less than three) common connections. Both factors taken together bring down the data by several orders of magnitude.

If we were to think in relation algebra, we are joining the connections relation with itself on keys which are not unique in either table. This, as we now know, leads to data explosion.

Matrix-Multiply Join
We view the graph, G, as a 2-D (sparse) matrix with sources as rows and destinations as columns. The original graph is preprocessed twice. In the first place, the graph is transposed, hash partitioned, and sorted within partition on the dest vertex and stored on HDFS. We call this the “left partitioned graph.” The second step is a little unconventional: the graph is partitioned on the dest vertex but sorted on source vertex within each partition, and also stored to HDFS. We call this the “right partitioned graph.”

explodingdata6

Next, we launch mappers for cross-product of the slices from right and left graph. Each mapper will load the right graph in memory as hash table, and scan one record at a time from the left graph. Observe that the logic in the mappers enumerates all second degree pairs, but immediately compacts and filters them. Even though the data has exploded, it is also reduced right away within the same mapper.

The net effect? This job that used to take several hours and shuffle trillions of records now runs in 10 minutes!

A wide middle in analytics

Our XLNT platform for A/B tests has been an indispensable driver behind LinkedIn’s data-driven culture. XLNT allows for easy design and deployment of experiments and provides automatic analysis. All of the A/B tests we conduct have a clear end goal of improving our products for the benefit of our members. We also have very clear guidelines about what we will not test, which include experiments that are intended to deliver a negative member experience, have a goal of altering members’ moods or emotions, or override existing members’ settings or choices.

The actual computation and the strategy is presented in simplified form here. For full details, please refer to our VLDB 2014 paper.

The offline process operates on two tables:

  • Experimentation table, having schema of {member, experiment, variant}

  • Metrics table, having schema of {member, metric, value}

The offline computation process first joins the two tables on member (the join key) and subsequently groups on experiment, variant, and metric (the grouping keys) while applying statistical aggregation functions on the values.

SELECT experiment, variant, metric, STATISTICAL_AGG_FUNCTION(value)
FROM experiments JOIN metrics ON experiments.member = metrics.member
GROUP BY experiment, variant, metric

Observe that our join key (member) is not unique in either table—there are multiple experiments exposed to a member, and there there several metrics for a member over several days. We’ll also point out that the statistical function is “non-additive” in nature, which means it is not possible to partially aggregate before the join (for advanced readers: it is pseudo-distributive on member, but fortunately not holistic).

We now see the pattern once again: the joins on non-unique keys explode the data, while the later stage aggregation brings it down. Again, the standard join strategies with Map-Reduce or Spark would need to move the exploded data across the execution stages. Our Mesh Join strategy described below is able to join (on one key) and aggregate (on other keys) in the same stage.

Mesh Join
We partition the two tables on the member consistently, that is, both tables will have equal numbers of partitions, and the member set in ith partitions of both tables will be identical. There are two other tricks to partitioning: the number of partitions is dynamically determined such that each partition of one table (say, metrics) can be fully loaded in memory. Second, even though the data is partitioned on join key, each partition is sorted on grouping key first. All these tricks are possible with the standard Map-Reduce API, albeit requiring custom partitioners and comparator classes.

The figure below illustrates the process in which Mesh Join operates. A block of metrics table is loaded into memory on each mapper, then the matching block of experiments table is streamed, so each mapper is processing exactly one pair of matching blocks from the two tables.

explodingdata7

We can create sub units of partitions in both tables which are pivoted on the respective grouping keys. Thus, the metrics table has subunits for the metric, while the experiment table has subunits on (experiment, variant). The Mesh Join is a series of nested loop joins on the cross product pairs of these sub units. We only need to store only one subunit of experiments table in memory at a time, and the joined output can be aggregated right away.

The net effect? Our previous attempts with hand-optimized Map-Reduce for this workflow would take upwards of 40 hours, which we bring down to under six hours with this Mesh Join strategy implemented in Cubert.

Conclusion

It pays to get your big data computation trimmed down!

Look at how your execution engine is dividing the computation into stages, and see if the stage boundaries are aligned where the shape is the widest. And if so, try to partition input data in a way so that operators that reduce the data size (aggregations, transformation, etc.) can be applied in the same stage where the data explodes.

We are looking ahead to where these strategies are built into the execution engines, which could infer the shape of the flows and automatically generate efficient stage plans.

Acknowledgements

The work described here is the result of collaboration across teams: Min Shen, Jeremy Simpson, Myunghwan Kim, Heloise Logan, and Ya Xu, with support from Shirshanka Das, Igor Perisic, Hema Raghavan, Shrikanth Shankar, Kapil Surlaker, Suja Viswesan, and our alumni Jie Bing, Krishna Puttaswamy, and Srinivas Vemuri.