Auto-Tuning Pinot Real-Time Consumption

July 11, 2019

Pinot, a scalable distributed columnar OLAP data store developed at LinkedIn, delivers real-time analytics for site-facing use cases such as LinkedIn's Who viewed my profile, Talent insights, and more. Pinot uses Apache Helix for managing cluster resources and Apache Zookeeper to store metadata. Pinot has wide adoption at LinkedIn, ranging from internal dashboards to site-facing applications.

Pinot supports batch data ingestion (referred to as “offline” data) via Hadoop, as well as real-time data ingestion via streams such as Kafka. Pinot uses offline and real-time data to provide analytics on a continuous timeline from the earliest available rows (could be in offline data) up to the most recently-consumed row from the stream.

Serving queries on data while rows are being ingested from a real-time stream poses a unique set of challenges. Pinot has been chipping away at these, and getting better over time.

Pinot stores data in shards called "segments." During query execution, Pinot processes segments in parallel, and merges results across segments to construct the final response for a query. Offline data is pushed into Pinot as pre-built segments (offline segments) and stored in the Segment Store (see Architecture diagram). These segments are stored as ImmutableSegment objects (addition, deletion, or modification of rows is not possible on these segments). On the other hand, real-time data is consumed on a continuous basis from the underlying stream partitions into segments called MutableSegments (or, “consuming” segments). These segments allow for the addition of rows to them (rows can still not be deleted or updated, though). MutableSegments store rows in uncompressed (but still columnar) form in volatile memory (discarded on restart).

Every so often, the rows in a MutableSegment are compressed and persisted by “committing” the segment into the Segment Store as an ImmutableSegment. Pinot then moves on to consume the next set of rows from the stream partition into a new MutableSegment. The key question here is: “At what point (or, how often) should Pinot decide to commit a consuming segment?”

Committing segments too frequently ends up with many small segments for a table. Since Pinot queries are processed at segment level, having too many segments results in increased overhead for processing queries (number of threads spawned, metadata processing, etc.), resulting in higher query latencies. 

On the other hand, committing segments less frequently can result in servers running out of memory, since new rows get added into MutableSegments all the time, expanding the memory footprint of these segments. Furthermore, servers can be restarted at any time (at LinkedIn, we push new code every week), causing the MutableSegment to discard all rows and re-start consuming from the first row of the MutableSegment again. This by itself is not a problem (Pinot can ingest back-logged data at a very high rate), but it is possible that the underlying stream topic has retention configured so that the first row of the MutableSegment has been retained out. In this case, we lose data—not good!

It turns out the answer depends on several factors—like ingestion rate and number of columns in the schema, to name a few—that vary across different applications. Pinot provides some configuration settings (e.g., a setting for maximum number of rows in a MutableSegment) that address these variations, but there were still questions from administrators regarding how to set the correct values for those settings on a per application basis. Experimenting with different settings (or combinations thereof) for each application was not a scalable solution, given Pinot’s adoption rate at LinkedIn. In this blog, we will explain how we implemented auto-tuning of real-time consumption that eliminated the experimentation process completely and helped administrators scale to Pinot’s adoption rate.

In order to understand the problem and the solution better, it is useful to go over the Pinot real-time architecture in some more detail.

Pinot real-time ingestion

Pinot real-time servers create a PartitionConsumer object for each stream partition they are directed (by Helix) to consume. If the table is configured to have q replicas and there are p partitions of the stream, then there will be (p * q)  instances of PartitionConsumer objects across all the servers for the table. If there are S servers serving this table, then each server will have ⌈(p * q)/S ⌉ PartitionConsumer instances.

The figure below is an illustration of how PartitionConsumer objects are distributed across Pinot real-time servers.

  • servers-consuming-stream-of-p-partitions

Real-time servers consuming from a stream of p partitions

Helix ensures that more than one replica of any stream partition is never consumed in the same real-time server. (Therefore, we must set S >= q, otherwise table creation will not succeed).

Pinot assumes that the underlying stream partition has messages that are ordered according to their arrival within the partition, and that each message is located at a specific “offset” (essentially a pointer to the message) in the partition. Each message of the stream partition is translated into a row in the MutableSegment. Each MutableSegment instance has rows from exactly one stream partition. The metadata for a MutableSegment (in Zookeeper) has the offset in the partition from which consumption should start for that segment. This starting offset value applies to all replicas of the MutableSegment. Pinot controller sets the value of the starting offset in the segment metadata at the time the segment is created (which is either when the table is first created, or, when the previous segment in that partition is committed).

The algorithm to commit a segment involves a few steps, during which queries continue to be served from the MutableSegment. After a segment is committed, the MutableSegment is atomically swapped with the (equivalent) ImmutableSegment. The memory taken up by a MutableSegment instance is released after the last query on that instance is drained. All through this process, the application is unaware that any segment commit is going on. The algorithm to commit a segment is as follows:

  1. Pause consumption (until step 5).

  2. Execute steps of the segment completion protocol to decide which replica commits the segment.

  3. Build an ImmutableSegment out of rows in the MutableSegment.

  4. Commit the segment to the controller (In this step, the controller creates the next segment in the partition).

  5. Await signal (from Helix) for the next segment.

  6. Resume consumption when signal is received, indexing rows into a new MutableSegment.

This algorithm is illustrated in the figure below. The actual steps of segment completion are more involved, but we skip the details in this blog.

  • algorithm-illustration

The problem of provisioning

The characteristics of applications being provisioned can vary widely from one another. Here is a partial list of variations across applications:

  • The cost of holding a row in memory depends on the data schema (more columns means more memory).

  • Pinot uses dictionary encoding to optimize memory consumption (values in rows are stored as integer dictionary IDs that refer to the actual value in a dictionary). Therefore, a higher number of unique values of any column will consume more memory in the dictionary.

  • The rate at which events are ingested into a topic varies widely across applications, and even over time in any one application. For example, events could be coming in at a much higher rate on a Monday morning than on a Friday evening.

  • The number of stream partitions can vary across applications (see below for the impact).

  • We may provision different number of machines for an application with higher query loads than another with a lower query load.

In earlier versions of Pinot, we provided two configuration settings:

  • Maximum number of rows that can be held across all MutableSegments in a server (N).

  • Maximum time (T ) for which a MutableSegment can exist. After this time, the segment is to be committed, no matter how many rows are in the segment at that time. The administrator may set the value of T depending on the retention of the underlying stream.

If a server ended up owning k ( = ⌈(p * q)/S ⌉) partitions of a table, the Pinot controller sets the segment metadata to consume at most x (= N/k) rows. The PartitionConsumer is designed to stop consumption and start the commit procedure either upon reaching time T, or after consuming x rows into the MutableSegment. However, the variations across applications will require N to be different for each one.

There is one other thing the administrators had to consider before choosing N: Resident Memory size on each server (for both MutableSegments and ImmutableSegments):

  • Memory for a MutableSegment is (as far as possible) acquired at the time the MutableSegment is created. The amount of memory acquired is based on the threshold x set for that segment (therefore, to have a high value of x and not use the memory allocated is a waste).

  • The ImmutableSegment is resident in virtual memory until the retention time of the real-time table, at which point it is unloaded. A higher value of x would mean a smaller number of (larger) ImmutableSegment objects, and larger MutableSegment objects.

The total resident memory on a server will depend on the following:

  1. Number of stream partitions that the server hosts (k).

  2. Number of ImmutableSegments created during the retention period.

  3. Size of ImmutableSegments.

  4. Size of MutableSegments (dependent on x, and other things as outlined above).

The value of k depends on the number of servers deployed. An administrator may decide to deploy as many servers as necessary to support the query throughput, given the latency requirements.

As you can see, the number of variables quickly gets out of hand, and we seem to need one to estimate the other. In order to arrive at a working configuration setting, the administrators had to run benchmark experiments before provisioning a use case:

  1. Set up a table with some number of servers and a value of N.

  2. Consume from earliest offset in the stream partitions so that we get to have the ImmutableSegments in place (this is an approximation, since ingestion rate varies across time for any given stream topic, causing us to hit the time limit rather than row limit).

  3. Run the retention manager to retain out the older segments.

  4. If there is too much paging or we run out of memory, then change the number of servers or N (depending on segment sizes) and go back to step 1.

  5. Run a query benchmark firing queries at the rate the application expects to do so. If performance is not as desired, increase the number of hosts and go back to step 1, readjusting N as needed.

Arriving at the right configuration settings for an application took a few (sometimes several) days, not to mention the time spent by Pinot administrators while they had more urgent things to focus on.

Automatic tuning

In order to help administrators provision a use case, we decided to provide:

  • A target segment size setting for the committed segment. Pinot would attempt to create ImmutableSegment objects of this size.

  • A command line tool that helped the administrators choose the target segment size.

With these two in place, all that administrators need to do is to run the command line tool with a sample segment (generated from data previously gathered via ETL on the same topic). The tool outputs a few choices to pick from, depending on the number of servers that are needed for query processing. The administrator can then select one of the choices and provision the table, confident that it will work as desired with reasonable performance.

Command line tool

Given a sample segment, the tool estimates the resident memory on a host, and the segment size setting. The tool works by estimating the resident memory with these segment sizes.

Here is a sample output from RealtimeProvisioningHelper for a table:

The output shows, for different numbers of servers used and hours that a MutableSegment consumes data:

  • The total memory used in a server (for MutableSegments as well as ImmutableSegments).

  • Optimal Segment size setting.

  • The amount of memory that MutableSegments will use (Consuming Memory).

Each of these will vary according to the number of hours consumed, so the values are displayed for the different numbers as provided in the command line arguments. The administrator specifies the host counts that they are considering (in this case 8,10,12, or 14 hosts), a sample segment from consumed data (or sample segment from offline data), and the table configuration (for retention time, etc.). The utility prints out the matrix as above.

Based on the output, the administrator can choose to deploy 8, 10, 12, or 14 hosts, and choose the segment size limit appropriately as per the table. In the above example, if the administrator chooses to use 12 servers (say, based on query throughput requirements), then 10 hours seems to be utilizing memory optimally. The optimal segment size seems to be 360MB. So, the configuration would look like this (the other parameters of StreamConfigs are omitted for brevity):

    streamConfigs {

        "realtime.segment.flush.threshold.size": "0",

        "realtime.segment.flush.desired.size": "360M",

        "realtime.segment.flush.threshold.time": "10h"

    }

Based on the output of the tool, we know that if the PartitionConsumer commits a segment when the segment size is around 360MB, we should be utilizing resident memory optimally between MutableSegments and ImmutableSegments. Note that the 360MB size is that of an ImmutableSegment. As explained before, a MutableSegment is converted to an ImmutableSegment at the time of committing the segment, so it is a chicken-and-egg problem to determine the size of an ImmutableSegment before building one.

Recall that we stop consuming when we reach a row limit (x) or time limit (T). So, if we can somehow set the row limit for a segment in such a way that we can expect the resulting segment size to be near the target segment size, we should be good. But then, how do we estimate the number of rows that results in the desired segment size?

Estimating the row limit for a desired segment size

In order to come up with a row limit for a MutableSegment, we decided to take advantage of the fact that the controller is responsible for committing a segment as well as creating a new segment (which it does in one step, as shown in the picture above).

The idea is for the controller to decide the value of x for the next segment, so as to reach the desired segment size. At the time of segment completion, the controller estimates the number of rows that need to be consumed in the next segment based on the current segment size and the number of rows consumed in the current segment. 

ImmutableSegments have indices, dictionary, etc. in a compressed representation. So, the size of a segment may not vary linearly with the number of rows (e.g., the dictionary size is based on the number of unique values of a column and the average width of the column, no matter how many rows there are in the segment). Also, segment sizes can potentially vary a lot depending on the actual values in a single segment. 

Therefore, we take into account the past values of segment sizes while estimating the size of the next segment. Instead of maintaining the segment sizes over time, we maintain the ratio of segment size to number of rows, improving the ratio each time a segment completes, so that we can estimate the number of rows reasonably for the next segment.

Algorithm for setting the row limit

We assume that the ratio of segment size to number of rows is a constant for each table (say, R). Since there is a fixed overhead for creating a segment even with one row, R is not really a constant, but is a good approximation. Each time a segment completes, we compute the value of R and adjust the learned value R to be more accurate, as below:

Rn+1 = Rn * α + Rcurrent * (1 - α),    where 0 < α < 1

Here, Rcurrent is the row-count-to-size ratio of the current segment (i.e., the one that is in the process of completing). We choose α to be a number higher than 0.5 so that we weigh the learned value more than the new value.

The number of rows threshold for the next segment is computed as:

xn+1 = desiredSegmentSize / Rn+1

Also, it is possible that even though we set x for a segment to be some number x1, the PartitionConsumer could reach the time limit T after only x2 rows, where x2 < x1.

In this case, for the subsequent segment, we want to set the row limit to be more like x2, so that we always try to end the segments by reaching the row limit rather than the time limit (this goes back to not wasting memory allocated up front, as mentioned before).

Taking these factors into account, here is the final algorithm:

Note that the value of R is stored in local memory, not persistent store. It may happen that the lead controller needs to be restarted (e.g., for deployment, failure, etc.). In this case, another controller takes over leadership, and as per the algorithm, starts with a null value of R. However, the algorithm takes the first value of R from the completed segment, thus effectively transferring over the value to the new controller, with all the history of older segments.

Lastly, we run this algorithm only on one partition of a topic. Multiple partitions of a stream tend to have similar characteristics at similar times. For example, if 100 new articles appeared between 8 and 9am, the events for the clicks on those articles will probably follow a similar distribution across all partitions of the click-stream during that period. So, changing the value of R (which is applicable across all partitions of the table) whenever the segment completes for any partition is not a good idea, since we will be biasing the value of R towards recent segments more than we want to.

In practice, we see that all stream partitions of a topic result in more or less the same segment sizes, and complete more or less at the same time.

Results

The algorithm presented essentially computes the row limit for the next segment, given some history and characteristics of the current completing segment. Here is a graph that shows the size of the segment adjusting to reach the target segment size over the first 20 segments. The measurements are for a single partition of the stream topic of a table. The average event ingestion rate was 630 rows/sec, with the maximum being around 1,000 rows/sec.

  • results-by-size-and-number

The number of unique values (within a segment) in a column, the dictionary sizes, etc. can vary significantly between different segments, especially as we transition from a weekend to a weekday, or from a longer holiday period to a workday. Depending on the topic (Pinot serves over 50 topics in production), major world events, publications, new product launches, etc. can significantly change the characteristics of data, thus making it hard to predict the segment size by just using number of rows. Thus, the estimated number of rows for a segment could result in much larger (as is the case with the 500MB segment size target in the graph above) or much smaller segment size.

However, the wild variations typically happen during the initial learning period. Typically, tables are provisioned first and queries ramped up over time. 

Here is a graph that shows the segment size variation over 10+ days with the target segment size of 500M.

  • segment-size-variation-results

Here is the code for this algorithm.

Conclusion

We now provision all single tenant real-time tables based on the output of RealtimeProvisioningHelper. This has reduced our time to evaluate capacity from days to minutes, since the administrators do not need to try various combinations before provisioning a cluster, and can be fairly confident that once provisioned, the cluster will take the consumption load as specified.

Future work

As mentioned before, we try to acquire most of the memory needed for a MutableSegment at the time we start consumption. Allocating memory dynamically as rows come in is an option, but that leads to two problems:

  1. We will need to read-lock appropriate data structures while processing queries, and write-lock the structures when we expand them. We strive to minimize locking while processing queries on real-time consuming segments and try to avoid lock contentions, so adding more read-locks does not help in delivering low latency.

  2. In order to avoid wasting memory, we may allocate memory in small chunks, further exacerbating the lock contention.

This is one area that needs more work. The algorithm does stabilize over time, but sometimes it over-sizes segments during the learning phase. It will be good to avoid over-sizing altogether. For example, it will be useful to add another configuration for the maximum memory to be used for consuming segments. We can then stop consumption if we hit a certain limit on resident memory. Typically, over-sizing happens due to fluctuations in cardinality or width of columns (changing the dictionary size). If these are temporary, we really do not want to carry them over as learnings for future segments. Stopping consumption early in these cases will be useful.

Another area we will be working on in future is multi-tenant systems—where a single host may handle stream partitions of multiple tables. In this case, a single tool will not suffice to set the segment size. We need alternative mechanisms to continuously evaluate the memory utilization considering all MutableSegments in a host, no matter which table they belong to.

Watch out for future blogs and announcements!

Acknowledgements

We would like to thank all members of the Pinot team for their relentless efforts to make Pinot better: Dino Occhialini, Jean-Francois Im, Jennifer Dai, Jialiang Li, John Gutmann, Kishore Gopalakrishna, Mayank Shrivastava, Neha Pawar, Seunghyun Lee, Sunitha Beeram, Walter Huf, Xiaotian (Jackie) Jiang, and our engineering manager Shraddha Sahay and SRE manager Prasanna Ravi. Also, we would like to thank Ravi Aringunram, Eric Baldeschwieler, Kapil Surlaker, and Igor Perisic for their leadership and continued support.

Topics