Code

Transport: Towards Logical Independence Using Translatable Portable UDFs

In a recent blog post, we touched upon Dali’s new architecture, which is designed to make data and logic seamlessly accessible and shareable across LinkedIn’s diverse environments. Dali achieves this vision by adopting the principles of physical and logical independence. Physical independence refers to enabling users to transparently access data regardless of its physical location, storage format, partitioning strategy, or even sharding strategy across multiple clusters. Logical independence refers to making Dali user-defined logic available in any engine or data processing framework, regardless of its language interface or data processing capability. As a layer for sharing data and logic, Dali strives to achieve these goals while being dedicated to providing virtually the same performance guarantees that the mission-critical systems it serves have.

This blog post is the first in a series of articles where we detail the steps we are taking to realize this vision. In this post, we shed light on one of the most difficult, yet interesting, pieces for achieving logical independence: Translatable and Portable User-Defined Functions, or Transportable UDFs.

Dali logic is expressed in the form of SQL views. A SQL view is a set of logical transformations on a set of input datasets (which could be base datasets or other views). Dali views are extensively used across LinkedIn to summarize and aggregate data, clean noisy data, extract interesting or relevant information, apply privacy filters, or combine data from different sources to create insights and business value out of raw data. UDFs are used extensively in views to apply transformations that cannot be expressed using SQL expressions alone, and usually involve quite a bit of complex operations expressed in an imperative language such as Java. True logical independence can only be achieved if both the view logic, and UDF definitions are portable across engines. While relational algebra expressions used to express view transformations can be mapped to different declarative language interfaces of different data processing engines, that is not the case with UDF definitions, which are imperative and opaque, and hence the challenge.

UDF APIs differ vastly among data processing engines, since those APIs have to take into account the internal data representation of choice for each engine, and have to provide a way to connect that data representation to the relational schema. This variation introduces a burden to application developers who have to learn the UDF API and the internal data model of each engine, and then re-implement the same logic, but using different APIs, once there is a need to move the logic from one engine to another, or even to share the same logic across engines. This introduces what we call UDF denormalization, i.e., harmful redundancy that negatively impacts productivity and craftsmanship.

Given these challenges, the question becomes: How can we enable our users to write user-defined function definitions once, and reuse them in any engine without sacrificing performance? To solve this challenge, we decided to pursue the journey of translatable, portable user-defined functions, a framework we refer to as Transport. While it seemed like a crazy and fuzzy idea at the beginning, fast forward to today, we have a number of users already employing those functions in production pipelines, we are covering three engines (Hive, Presto, and Spark), and a data format (Avro), and are expanding the set of engines the framework can be integrated with.

Transport is an API and a framework. Users implement their UDFs in the Transportable UDF API, and the framework transforms those UDFs into native UDFs of a variety of target engines. For example, a user can write a single implementation for their UDF against the Transport UDF API, and the framework can transparently convert the UDF to a native Hive UDF, as if the user has initially written it as a Hive UDF. Now, if the user wants to run this UDF in another engine, no problem! For example, if the user wants to run the same UDF as part of a Presto query, the framework can also transparently convert the UDF to a native Presto UDF, as if the user has initially written it as a Presto UDF.

Before describing how Transportable UDFs work, we first explore the motivation behind the idea by describing two phenomena that exhibit today’s UDF situation: UDF API disparity and UDF API complexity.

UDF API disparity

There are many engines in the data processing world, and each comes with its own set of features that make it suitable for certain use cases. Similarly, each engine comes with its own UDF API that differs considerably from one engine to another. In this section, we detail some of those differences, focusing on UDF APIs of three popular engines: Hive, Presto, and Spark.

UDF type validation and inference
UDF APIs usually offer their users some means to specify which data types the UDF expects (i.e., type validation), and how the output type of the UDF is related to its input types (i.e., type inference). Some UDF APIs, such as Presto’s, use type signatures to declare what types the UDF expects, while others, such as Hive and Spark, expect the user to write imperative code to express type validation and inference, by traversing the given type object tree.

Engine’s underlying data model
Different platforms use different data models to represent the data being processed in their execution engines, and therefore expose those data models directly to their UDF APIs as well. For example, Presto UDFs use Presto “Type” objects, along with Block, Slice, long, boolean, double, etc. data types, to describe data in those objects, while Hive UDFs use ObjectInspectors along with Objects. Similar differences exist for other engines too.

The UDF definition API
Additionally, the way users are expected to define UDFs is by implementing APIs that differ from engine to engine. For example, Presto UDFs use special type annotations to declare that a class represents a UDF. Hive UDF classes extend the GenericUDF Hive abstract class, while Spark UDFs implement either the Spark SQL UDF API, or alternatively, the Expression API.

UDF API features
Finally, not all UDF APIs offer the same set of features. For example, Hive UDFs offer hooks to add files to the MapReduce distributed cache, allowing UDFs executed on different workers to process those files. Such a feature does not exist in Presto or Spark UDFs. Even when a feature exists in multiple engines, the way it is expressed in the API can sometimes be fundamentally different. Presto UDFs allow users to declare what UDF arguments are nullable (i.e., can receive null values; otherwise the UDF automatically returns null), while Hive and Spark UDFs delegate null support to the user. Presto implements UDF overloading (i.e., the UDF expecting more than one signature) by implementing the UDF class a number of times while using the same name. In Hive and Spark, users use the same class, but manually check whether the input types conform to one of the expected types.

UDF API complexity

Current engine APIs come with varying degrees of complexity and expectations for the users’ skill sets (i.e., UDF developers). For example, Hive UDFs require users to understand the Object + ObjectInspector model (ObjectInspectors are metadata that describe the type of data stored in the corresponding Objects being processed by the UDF). Users are expected to capture the input ObjectInspectors upon UDF initialization, and inspect them level by level in a tree traversal fashion to validate that they conform to the expected types. Furthermore, users are expected to bind arguments explicitly by capturing subtrees of the input ObjectInspectors and creating new ObjectInspector trees out of them to return as the output ObjectInspectors. Let us say that a UDF expects a Map from a String to an Array of some type K, and returns a Map from K to K. Let us say at query execution time, the UDF is invoked by a Map from a String to an Array of Integers. The following Object inspector tree is passed to the UDF initialization method:

udf3

It is the developer’s responsibility to walk the tree, verify that the blue nodes shown above match what the user expects, and then capture the value of the gray node (or more accurately, the subtree), say K, and subsequently construct the return type by building the tree below by adding the green node to whatever was captured in the K subtree as shown below.

udf4

Furthermore, in Hive, even a single category of ObjectInspectors (e.g., StringObjectInspector) has variants and different implementations of the same top-level ObjectInpector class of that category. Users who write a Hive UDF must be aware of the differences and that the different implementations are not interchangeable. This creates code complexities in order to deal with those interchangeability checks and guarantees.

Another form of API complexity can be found in Presto UDFs. Presto UDFs allow users to process container data types, such as Arrays, Maps, and Structs, through the Block API, which is an API for manipulating byte arrays. Consequently, users are exposed to the physical layout of the complex data types inside byte arrays, and have to write multi-step, low-level operations to express simple high-level operations on those container types. Another example is using dependency injection to resolve concrete runtime types corresponding to generic type parameters and also to resolve UDF dependencies. Furthermore, when a UDF expects top-level generic types (i.e., it can take any type), it must be re-implemented for each instance of a matching top-level concrete type, growing the number of implementations exponentially with the number of generic arguments. While this problem can be mitigated by writing UDFs that are code-generated, this is still a complex approach.

Transportable UDFs

Driven by the vision of physical and logical independence, and motivated by the issues of API disparity and complexity, we designed the Transport UDF API, which lets users focus on the UDF logic rather than implementing and adhering to platform-specific interfaces. Users write the logic once and it can run on any platform. The high-level idea is to provide a standard data API that users can implement their UDFs against. This standard data API can be implemented by any platform, using its native data types that it would normally use in native UDFs. For example, a Map object will appear to Presto as a PrestoMapType along with a Block data type. It will appear to Hive as a MapObjectInspector along with an Object. Those standard data APIs are manipulated into a Transportable UDF API that expresses the actual logic on the standard data. The Transportable UDF API can manipulate generic container types, such as “array(T)” or “map(K,V)” through type signatures. Transportable UDFs are structured as abstract classes to be extended by actual UDFs expressing the logic. Finally, the Transportable UDF API is called from a platform-specific wrapper that is automatically generated. That being said, the Transport UDF API implementation represents an intermediate representation (or a common ground) logic that is platform-independent but also has two platform-specific faces that are readily provided to the user, one beneath, which is the platform-specific standard data API implementation, and the other is above, which is the platform-specific auto-generated wrapper. This architecture is depicted in the diagram below.

udf5

Transportable UDFs example

As described above, all the user needs to define is the Transportable UDF. Here we show an example of a Transportable UDF to illustrate the API’s simplicity and focus on the logic rather than the platform details. The function is self-explanatory. It takes two arrays as input (of any element types), and creates a map using the first array’s elements as keys, and the second array’s elements as values, provided that both arrays are of equal length.

In the example above, StdMap and StdArray are interfaces that provide high-level map and array operations to their objects. Depending on the engine where this UDF is executed, those interfaces are implemented differently to deal with the native data types used by that engine. getStdFactory() is a method used to create objects that conform to a given data type (such as a map whose keys are of the type of elements in the first array and whose values are of the type of elements in the second array). StdUDF2 is an abstract class to express a UDF that takes two parameters. It is parametrized by the UDF input types and the UDF output type.

Conclusions

In this blog post, we have introduced Transport, an API and framework for defining UDFs once and reusing them in different platforms as native UDFs, thereby avoiding any sacrifice of performance. Transportable UDFs are one step among many that we are taking towards our vision for physical and logical independence. They are motivated by the challenges of today’s UDF API disparity and complexity. At LinkedIn, Transportable UDFs have proven to be a powerful tool for increasing our developers’ productivity, enabling them to share UDF logic and ensure data processing consistency across engines. We have explained how Transportable UDFs work at a high level with an example. There are more topics and details that we would like to share about Transport UDFs, but they are beyond the scope of this one blog post. We will discuss those topics and other cool Transport UDF features in a dedicated future blog post. Stay tuned!

Acknowledgements

Transport UDFs would not have been possible without the continued support of Kapil Surlaker, Shrikanth Shankar, Vasanth Rajamani, and Adwait Tumbde; the valuable discussions with Carl Steinbach, Maneesh Varshney, Ratandeep Ratti, Shirshanka Das, and Will Vaughn; the valuable contributions of Shardul Mahadik; and the ongoing support of the rest of the Dali team: Rohan Agarwal, Chris Chen, Wenye Zhang, and Anthony Hsu. Transport UDFs received immense support from partner teams in Presto, Spark, Samza, the Unified Metrics Platform, and the Data Warehouse. Thank you to Mark Wagner, Min Shen, Fangshi Li, Srinivasulu Punuru, Khai Tran, Eric Sun, Chris Li, and Lu Li.