Bridging Offline and Nearline Computations with Apache Calcite
January 29, 2019
The existing Lambda architecture
With the evolution of big data technologies over time, two classes of computations have been developed for processing large-scale datasets: batch and streaming. Batch computation was developed for processing historical data, and batch engines, like Apache Hadoop or Apache Spark, are often designed to provide correct and complete, but high-latency, results. Streaming computation, on the other hand, was built to process recent data in low latency. The initial generation of streaming engines, like Apache Samza or Apache Storm, is often known for offering fresh, but sometimes inaccurate or incomplete, results. Additionally, it is cost-prohibitive for streaming engines to process historical data, an important feature required for backfilling past missing or invalid data. In an effort to get the best of both worlds, Lambda architecture was introduced to place these two systems side-by-side to provide both accurate offline and best-effort near real time (nearline) results together in a single view (see Figure 1).
Figure 1. The traditional lambda architecture
This is a great idea, but a major pain point of the Lambda architecture is that it requires users to develop two different code bases for carrying out the same logic: batch scripts, often in declarative languages like SQL, and nearline code, often in procedural languages like Java. Many problems arise due to this duplication—developers have to do twice the work, and it is painful to make sure the two code bases are consistent during code update and maintenance.
Several years ago, Apache Flink was introduced to unify both batch processing and stream processing in a single engine and provide a better streaming model with a guarantee of accuracy. However, migrating an existing computation platform into a new technology like Flink is not trivial work.
In this blog post, we show how we address the limitations of the Lambda architecture by maintaining a single (existing) batch codebase and building a technology to auto-generate streaming API code from batch logic.
Lambda architecture with a single batch code base at LinkedIn
The majority of our use cases for big data computation are for existing batch jobs, written in Pig, Hive, or Spark scripts and deployed in production. But at some point, our users realized they needed fresh results. We addressed this problem by auto-generating streaming code from batch logic and employing the Lambda architecture to transparently deliver merged results to our users.
Technically, our Lambda architecture is very similar to the above traditional Lambda architecture; however, our users only need maintain a single code base, either in Pig, Hive, or Spark. This code base serves as the single source of truth to specify what users want to compute. If they need fresh results, they just need to turn on the nearline flag.
In our Lambda architecture, we use Apache Kafka to deliver new events, which are then either consumed by Samza jobs in the streaming layer or ingested in HDFS via Apache Gobblin and then processed by Azkaban flows in the batch layer. Batch jobs in Azkaban can be scheduled at different time granularities, including monthly, weekly, daily, and hourly. We use Pinot, a realtime distributed OLAP datastore developed at LinkedIn, for the serving database. Pinot separately stores both nearline results and offline results, and makes them appear as a single view, called a hybrid table, to users. When a client makes a query to a Pinot table, Pinot will automatically figure out the latest timestamp where offline results are available, then fetch and merge appropriate results from both sources together, and return the merged results to the client.
Figure 2. Lambda architecture at LinkedIn
At the heart of this system is a technology we developed to convert batch logic into Samza streaming code with Java APIs. The basic idea behind this is to use Apache Calcite relational algebra or Calcite logical plan as an intermediate representation (IR) that connects batch logic to streaming Java code. We first convert batch logic into relational algebra plans, and then use SQL processing techniques in Apache Calcite to optimize the plans and generate Java API code. In other words, the IR serves as a bridge.
In our first release, we built an end-to-end system to convert Pig Latin scripts, in which the majority of batch scripts at LinkedIn are written, into Samza high-level Java API code and Apache Beam Java API code that can be executed on Samza with Samza Beam runner. We are also currently working on converting Spark code into Beam Java API code.
Under the hood
For the rest of this blog post, we will explain in detail how we convert batch scripts into Java streaming code with an example to demonstrate the process. We assume a basic understanding of relational algebra, query processing, and programming languages on the part of the reader.
The process of converting batch scripts into Java streaming code is split into two phases:
Converting batch scripts into Calcite IR plans: Most parsers of batch languages parse scripts into some form of logical plans. For example, Pig Grunt parser parses Pig scripts into Pig logical plans. We leverage these parsers and build converters to convert those logical plans into Calcite logical plans.
Generating Samza streaming code from Calcite logical plans: We leverage the query processing framework in Apache Calcite to build a Calcite-Samza planner that optimizes Calcite logical plans into Samza physical plans, and then generates Java code from Samza physical plans and streaming configuration parameters.
The following figure illustrates how we generate Beam Java API code from Pig scripts:
There are three components to be handled when converting Pig scripts to Calcite plans:
Schemas: Calcite data types are richer than Pig data types, so the conversion from Pig schema to Calcite relational schema does not reduce the semantic coverage.
Scalar expressions: Scalar expressions are expressions consisting of scalar operands, such as columns, scalar variables, or constants, and operators on those operands, including mathematical operators or functional calls. These expressions are needed in several Pig statements, e.g., a filter condition in a FILTER statement like “a < 1.” Scalar expressions in Pig are represented in expression trees, and are converted into Calcite expression trees through a post-order traversal over Pig scalar expression trees to convert expression nodes from leaves to roots.
Relational expressions: Relational expressions are expressions where operands are relations and operators are relational operators, like FILTER, PROJECT, JOIN. Pig logical plans are DAGs where source nodes are LOAD operators on input tables; sink nodes are STORE operators that save result relations into output destinations; and intermediate nodes are relational operators to carry out the transformation. We do a DFS traversal on each Pig logical plan, starting from STORE nodes, and convert each Pig relational operator into one or more Calcite relational operators. For example, a Pig FILTER is mapped directly to a Calcite filter while a Pig COGROUP is mapped into a combination of Calcite aggregates, projects, and an outer join.
In general, Calcite relational algebra is a superset of Pig Latin, so any feature supported by Pig Latin can be expressed in Calcite plans.
We will demonstrate the conversion through the following running example. Suppose we have an input table in HDFS named “dummy” with several columns named corresponding to data types. Here is a simple Pig script that reads a table and does a self join after some projects and filters (for those who are not familiar with Pig Latin, feel free to read the language manual here).
Figure 4. A Calcite logical plan corresponding to the Pig script example
While Calcite logical plans are purely declarative, meaning that they merely tell us what to do, we need to obtain Samza physical plans to specify how to implement each logical operator. For example, if we want to join two input data sources, depending on the nature of each data source, we can implement the logical join in Samza as a stream-stream join or a stream-table join. In stream-stream joins, joining two different streams and self-joining a single stream together will require different configurations for efficiency.
As in any SQL processing engine, a logical plan first needs to be optimized into a physical plan that guides the system in how to implement each relational operator. This step is called the planning step, while the framework to do that is called a planner. We extend the existing Calcite planner, a top-down query planner implementing the Volcano query optimizer, to build our own Calcite-Samza planner by adding the following implementations on top of the Calcite planner:
Samza physical operators: There is a Samza physical operator corresponding to each Calcite logical operator. The physical operator provides all details necessary to execute the relational operator and implements methods to generate Samza high-level API code and Beam API code.
Set of rules to convert each logical to the corresponding physical operator: These rules are often referred as implementation rules.
Other than our own implementation rules, we also leverage existing transformation rules in Calcite to convert the plan into a more optimized form. A transformation rule is a rule that converts an existing plan into a more optimized plan, while still preserving the same semantic. For example, pushing a filter down toward the leaves of the plan tree will help upstream operators to process fewer records.
In the running example, our Calcite-Samza planner will convert the logical plan into the following plan:
Figure 4. Generating Samza physical plan from Calcite logical plan
In this example, the Calcite-Samza planner makes two optimizations on the logical plan:
Pushing filters down below projects
Recognizing the logical join is a self join and converting it into a stream-stream self join where no re-partition is needed because both sides are already co-partitioned by the join keys. A smaller join window size can be set to save internal resources.
Beam API code generator
With the Samza physical plan in hand, we are now ready to generate runtime Beam API code. Similar to the Pig-Calcite converter, we need to make conversions for three major components:
Schemas: Our streaming runtime code works with Avro records from Kafka input streams, so we need to convert relational schemas into Avro schemas. We use Avro GenericRecord and provide schema for it by generating Avro string schema from relational record types at compile time; these schema strings are parsed into static Avro schema objects at runtime.
Scalar expressions: Each Calcite scalar expression is converted into one or more Java statements that evaluate the expression. There are two major issues to be dealt with when generating Java statements:
Null-safe expressions: Whenever we access a field of an Avro record, the returned value can be null, possibly producing NullPointerException in statements that process the value. Thus, we must add a null-safe expression to ensure further processing on the value is done only if not null.
Handling Pig UDFs: We recently released a portable UDF platform, which enables users to write UDFs once and then evaluate them in a number of SQL engines such as Hive, Presto, and Spark. However, there are a plethora of Pig UDFs that have been developed at LinkedIn, as they add more flexibility to data processing with Pig. Thus, to support existing Pig scripts, we need to be able to execute those Pig UDFs directly in our generated code. Since Pig UDFs only take Pig Tuples as unique parameters and may return Pig-specific data types like Tuple or Databag, we need to write wrapper functions to convert them back and forth to Avro record, Avro array, and Java data types. Additionally, we also support Pig UDFs written in Python by parsing Python scripts into Java objects using Jython library. In the example, we will show how we build a Pig Tuple from a list of parameters for Pig UDF calls in streaming code (see generated code below).
Relational expressions: Each Samza physical operator is mapped into one or more Java statements that call Beam APIs and return PCollection objects.
Our Calcite plan provides the logic to do the work, but in stream processing, we need extra configurations for finding input stream names, producing output streams, specifying timestamp field and window sizes for joins and aggregates, and so on. These parameters are provided from a stream configuration object, which is separate from the logic in the Calcite plan.
The final Beam pipeline for our streaming application example is generated from the above Calcite plan; the stream configuration is shown in the following figures, and you can check out the actual Java Beam API code.
In this blog post, we showed that batch and streaming can be seamlessly combined together in a single system. We believe that a declarative framework that only requires users to specify logics (what to compute) and frequencies (when to compute), and hides the implementation details (how to compute) is important for users to quickly develop their big data analytic applications. In this sense, big data technologies are getting closer to traditional database systems in terms of the usability, while still maintaining the scalability.
This framework would not have been possible without the valuable contributions from various members of the Analytic Platform and Application and Samza teams at LinkedIn. Thanks to support from Shrikanth Shankar, Kapil Surlaker, Ravi Aringunram, and Ameya Kanitkar; contributions for the framework from Harsh Shah, Janki Akhani, Min Shen, Xinyu Liu, and Wei Song; and feedback on this blog post from Carl Steinbach, Arun Swami, and Walaa Eldin Moustafa. A special thanks to Xinyu Liu for his unlimited support on building, deploying, and debugging Samza applications.