Bridging batch and stream processing for the Recruiter usage statistics dashboard
July 14, 2020
Co-authors: Khai Tran and Steve Weiss
Batch and streaming computations are often combined together in the Lambda architecture, but carry the cost of maintaining two different code bases for the same logic. We have previously shared on the blog a behind-the-scenes look at our approach into enabling the seamless translation of declarative batch code into streaming pipelines to this problem. In this blog, we will share how we leveraged the technology to build a consistent, near real-time metric dashboard for the Recruiter usage statistics page. We will first describe the dashboard and its requirements, and then elaborate on the process of building a solution to meet those requirements.
Recruiter usage statistics dashboard
The usage reporting dashboard helps companies learn about user habits over certain periods of time based on generated data. The reports identify which users are utilizing the product well and which users could be using the product more effectively. In the case of LinkedIn Talent Solutions, we are tasked with helping recruiters post jobs, search for talent, and send InMails to prospective candidates. Our customers rely on usage reports to see a variety of activities related to how their recruiters are sourcing and communicating with potential new hires. Specifically, it’s important that we take a close look at the rate of responses to InMails, a metric directly tied with a customer’s return on investment (ROI) in using LinkedIn Talent Solutions.
Recruiter statistic dashboard
Requirements for the dashboard
- Data freshness: To keep our customers informed, they should have access to the most recent data. The dashboard should present metrics up to the last 10 minutes. This requires the capabilities of a streaming engine.
- Backfill capability: The ability to re-compute past metrics for any changes from the inputs or the computation logic. This often requires the capabilities of a batch engine.
- Easy to maintain: While Lambda architecture is a common approach to combine the capabilities of both batch and streaming engines, the major drawback of this approach is that it requires us to express our computation into two different codebases: one for streaming and one for batch. This burdens code maintenance over time; we would like to keep only one code base that serves as the source of truth for generating metrics.
- Consistency: It should not matter which engines we used to compute metrics. Our customers should see the same metrics for the same time period regardless of when it was viewed.
- Support for both additive and non-additive metrics: Additive metrics are measures that can be aggregated across any dimensions (i.e., SUM, MAX, MIN), while non-additive metrics are measures that cannot be aggregated across any dimensions (i.e., COUNT DISTINCT). We need to support both additive metrics, such as number of profiles viewed, and non-additive metrics, such as number of unique searches performed, for the application.
To meet the above requirements, we leveraged our Calcite-based translation technology for auto-generated streaming Java code from batch Pig scripts to compute the metrics in both streaming and batch engines. Metrics computed from both streaming and batch engines are viewed as a single table stored in Apache Pinot, a real-time distributed OLAP datastore developed at LinkedIn, for serving client requests. When a client makes a query to the Pinot table, Pinot automatically figures out the time boundary between batch and stream data in the table, and uses it to compute the query results.
This solution meets three out of the five requirements listed above:
- Data freshness: The streaming engine helps produce low-latency metrics.
- Backfill capability: The batch engine provides the re-computation capability required for backfilling.
- Easy to maintain: We only need to maintain the Pig scripts as the single source of truth for defining our transformation logics because the architecture helps us auto-generate streaming code.
That leaves us with two remaining requirements:
- Consistency: In our event architecture, we run Apache Kafka in at-least-once mode, meaning that the same event may appear more than once. In the streaming pipeline, Samza consumes events directly from Kafka, causing duplicate events to be processed multiple times. On the other hand, the ETL system in the batch flow, Apache Gobblin, performs deduplication to make sure each event lands exactly once in HDFS before it is processed by the batch engine. Therefore, batch and streaming engines may produce different numbers for a single metric over the same period of time.
- Support for both additive and non-additive metrics: Computing additive metrics is straightforward in both streaming and batch engines because it is trivial to update these metrics with the arrival of new data. For example, to compute the number of messages sent each day from a recruiter in the streaming engine, we can use split message sent events into small windows of 10 minutes, and then add up the numbers from 6 * 24 = 144 such windows each day (6 windows per hour) to obtain the daily metrics. However, it is not that straightforward when computing non-additive metrics, such as computing the number of distinct searches performed each day by a recruiter. The traditional approach is to collect all search events for each day before doing distinct and aggregations on unique search session ids. However, that does not work in the streaming environment because waiting for all of the events of the day introduces a long delay, while adding up the results of the small windows would yield incorrect results.
We solved both of these problems by building event deduplication logic in Samza using the RocksDB local storage, and rewriting the batch scripts by inserting appropriate DISTINCT statements.
Event deduplication in Samza
We implemented the deduplication logic in Samza by using the anti-join pattern and a local RocksDB table. We save all events to deduplicate for each day (or any period of time) into the local RocksDB table and use it as a filter for incoming events. For any new event, if it already appears in the table, we simply discard it. Otherwise, we retain the event for further processing. We abstract the implementation details using the Apache Beam API.
Rewriting batch scripts
In the batch scripts, we inserted the DISTINCT statement right after reading each event data source. This additional statement does not change its computation results as those events were already deduplicated by the ETL system before; however, it can introduce extra performance overhead in the batch engine. It will be interesting to see if future work yields a smart optimizer that relies on table statistics or metadata to eliminate such redundant DISTINCT.
Non-additive metrics are computed in the batch script by first, using DISTINCT statements to eliminate duplicate events, and then, applying COUNT() aggregation function on dimensions (GROUP BY keys).
For the streaming side, our query translation system, as described in the previous blog, then translates those DISTINCT statements into the above Beam API for performing duplication in Samza.
Discrepancies across metrics computed from streaming engine vs. computed from batch engine
To validate the consistency of metrics, we compute metrics over the same day from both batch and streaming engines and measure the resulting discrepancies. The above chart shows the observed differences between the two engines with and without implementing event deduplication. There are 11 metrics used for the dashboard—among them, searchesperformed and activedays are non-additive, while the remaining are additive.
Without the deduplication logic, the discrepancies are already small, with an average of 0.05% for all metrics. The deduplication logic helps reduce the discrepancies to be negligible, with an average of 0.003%, meaning we only produce 3 incorrect counts for every 100,000 events. Among those 11 metrics, there were no discrepancies for the following three metrics: newtags, newsavedsearches, and newsavedsearchalerts.
However, we are aware that there are still discrepancies—albeit very small—across the two computation engines. We believe that it will take much more effort to eliminate those remaining discrepancies as both our streaming engine (Apache Samza) and our messaging system (Apache Kafka) have not yet guaranteed the exact-once semantic while data is copied across multiple clusters.
In this blog post, we showed a prototype of applying our single codebase Lambda architecture to build a near real-time dashboard that helps recruiters utilize our product better. We also presented our work in making the dashboard consistent over time by significantly reducing the discrepancies of metrics computed from the batch and streaming engines. We’re always committed to improving our product, and delivering more value to our members and customers. However, we have experienced through our production that the operating challenges of the Lambda architecture still remained as we needed two different teams, one for operating the batch engine and the other for operating the streaming engine, to serve one service. This can incur unnecessary friction when operating the service in production. Alternative approaches to combine streaming computation and batch computation into a single engine, like Apache Flink, can be a promising approach to solve this problem.
We would like to thank Plaban Dash, Kexin Fei, and Ping Zhu for their important contributions to build the offline and nearline flows. Thanks to support from LinkedIn leadership, Harsha Badami Nagaraj, Ameya Kanikar, Divyakumar Menghani, Vasanth Rajamani, Eric Baldeschwieler, and Kapil Surlaker; and valuable feedback from Shirshanka Das, Xinyu Liu, Maneesh Varshney, and Mayank Shrivastava to make this blog post better.