Open Source

Advanced schema management for Spark applications at scale

Co-authors: Walaa Eldin Moustafa, Wenye Zhang, Adwait Tumbde, Ratandeep Ratti

Introduction

Over the years, the popularity of Apache Spark at LinkedIn has grown, and users today continue to leverage its unique features for business-critical tasks. Apache Spark allows users to consume datasets using powerful, yet easy-to-use APIs such as the Dataset interface. The Dataset interface allows users to consume and process data using type-safe objects, where the schema of data is represented using classes that reflect the schema in a strongly-typed manner. For example, to process a dataset about companies in Scala, one would create a case class reflecting the Company schema. Below is a sample of such a case class:

Users can then read data and process it as if it is constructed out of “Company” objects. They can do something like:

However, schema classes do not necessarily have to be case classes. Spark encoders and decoders allow for other schema type systems to be used as well. At LinkedIn, one of the most widely used schema type systems is the Avro type system. The Avro type system is quite popular, and well-suited for our use for the following reasons:

First, it is the type system of choice for Kafka, our streaming data source that contains the majority of the website interactions and data that ultimately lands into our Hadoop clusters. Second, Avro also supports a strong type system encompassing standard, logical, union, and enumeration data types, making it friendly to object-oriented data processing. Finally, Avro comes with well-defined methodology and open source tools to generate type-safe schema classes.

In a large company like LinkedIn, where hundreds of engineers interact daily with Spark APIs to process ever-changing datasets, it is imperative to provide a schema management solution that works seamlessly at scale. 

In this blog post, we discuss how LinkedIn’s infrastructure provides managed schema classes to Spark developers in an environment characterized by agile data and schema evolution, and reliance on both physical tables and logical views to abstract and consume data.

Dali and the Hive Metastore

At LinkedIn, the Hive Metastore is the source of truth catalog for all Hadoop data. The Hive Metastore is managed by Dali. Dali is a data access and processing platform that is integrated to compute engines and ETL pipelines at LinkedIn to ensure consistency and uniformity in the access and storage of data. Dali utilizes the Hive Metastore to store data formats, data locations, partition information, and table information. Among other features, Dali also manages the definition of SQL views, as well as storing and accessing those definitions from the Hive Metastore.

One of the important pieces of information that Dali manages in the Hive Metastore is the Avro schema corresponding to each table. Going back to our company example, its Hive schema looks like this:

As seen above, the Hive schema lacks some of the characteristics that are usually required by developers programming against type-safe APIs, such as the Dataset API. Such limitations include:

  • Casing information: Field names are in lower case by default in Hive schemas; this is Hive’s way of supporting case insensitivity. Fields like “NumberOfEmployees”, or “StreetName”, have now become “numberofemployees” and “streetname”. Having such casing be the default in data processing applications makes them hard to maintain and imposes undesirable code readability issues.
  • Lack of support for Enum data types: Enumerations are not part of the Hive type system (or standard SQL in general). However, many schema definition systems, including Avro, support defining Enum fields as a way of enforcing a limited set of options to the value. For example, the State field in the Hive schema is represented by a String type instead of an Enum. When used in an object-oriented programming environment or in a type-safe manner, developers will want to deal with Enum data types to catch errors during compile time instead.
  • Nullability information: Fields in Hive schema are nullable by default. When used as a source of truth for nullability information in type-safe objects, null handling semantics would be required—however, this should not be the case.
  • Namespacing: Hive schemas are not name-spaced. Namespaces are commonly used in object-oriented programming to achieve disambiguation and better organization.

Therefore, Dali APIs offer the capability of associating an Avro schema with data that is being written through Dali APIs to specify the Avro schemas to use when writing the data. Dali persists the schema using the “avro.schema.literal” property of the Hive table.

The above class can be compiled to a Java counterpart, referred to as SpecificRecord, as it implements the Avro SpecificRecord interface, which can be used in applications processing this data in a type-safe manner. The following is a simplified version of such a Java class.

Availability of this schema information at the Hive Metastore level is a very powerful tool to streamline data processing that uses this type of schema. Instead of having users worry about constructing those schemas every time they read the data, the schema is always there to reflect the structure of the data. Also, because the Java SpecificRecord classes can be automatically constructed, user involvement becomes minimal. Spark enables this type of data processing through mechanisms of Encoders and Decoders. Given a SpecificRecord class, and an implementation of both Avro encoders and decoders, Spark can translate its native data (i.e., DataFrames represented as Rows) into classes of SpecificRecord type, and expose them to the user, giving the user the opportunity to directly manipulate type-safe SpecificRecord objects in their applications—but that is not everything. Let us explore further in the next section.

Challenges

As outlined in the previous section, Dali APIs ensure that the Hive schema is always augmented by the corresponding Avro schema (which is usually supplied by the user or dataset creator) when the metadata is persisted in the metastore. However, there are additional questions that arise when these schemas need to be managed in a large engineering organization:

  • How do application developers access the Avro versions of the schema so they can enjoy its type-safe characteristics? Do they need to worry about accessing the metastore, and using its APIs to extract the “avro.schema.literal”? Are they expected to compile Avro schemas to their corresponding SpecificRecord implementations?
  • How do developers add the SpecificRecord classes to their application class paths? Do they check their source code into their application code? Is that repetitive or hard to maintain? What if a developer accidentally edits the auto-generated code?
  • How do developers manage schema evolution? If the schema of the datasets evolve, will they miss new data? Will their applications break? Are they expected to refresh their application version of the schema continuously? Are they expected to build tooling for schema evolution?
  • How do developers manage classpath conflicts? If different applications include their own version of the schema captured at some point in time, those multiple versions will end up together on the classpath of an application that pulls both schema versions through transitive dependencies, resulting in a classpath conflict.
  • How do developers access data through views? Avro schemas are associated with base tables only. When views are defined on top of those base tables, view definitions are not associated with Avro schemas. There must be a derivation mechanism of view Avro schemas from base table Avro schemas.

Avro schemas as software artifacts

schema-management-workflow

Schema management workflow

With the scale of LinkedIn’s data processing ecosystem, it is mandatory to provide a solution that imposes minimal overhead on the data processing application development lifecycle. In addition to addressing the challenges stated in the previous section, we had to ensure that our solution relied on standard software development practices to include and build schemas. For that purpose, we built infrastructure to serve dataset schemas as software artifacts. This entails capturing Avro schemas of datasets from the Hive Metastore, generating their corresponding SpecificRecord classes, and then compiling and pushing them to an artifact repository manager (Artifactory, for example). Each table corresponds to an artifact that is named after the Hive database name and table names. When table schemas change, a new artifact is generated and pushed with a new version number.

From the application development perspective, application developers consume schemas using standard dependency management systems such as Gradle. For example, say our Company schema is associated with the Hive table, company, in the database example. Let us also say the corresponding generated artifact has the group Id “com.linkedin.dali.schema”, and artifact Id “example-company” (which is derived from the database and table names). The first version of such an artifact will be 0.0.1. Therefore, to include this schema in an application that processes company data, the build script should simply declare a dependency along the lines of:

compile com.linkedin.dali.schema:example-company:0.0.1

Since data schemas are integrated to a standard dependency management process, version conflicts are managed in a standard way as well through dependency management tools such as Gradle. This leads to automatically resolving dependency conflicts, and avoiding classpath conflict situations.

As the Company schema evolves, possibly by introducing new fields, new artifacts are generated with new version numbers, and again pushed to Artifactory, our artifact repository manager. Users can always be on the latest schema by simply declaring dependency on the latest artifacts.

compile com.linkedin.dali.schema:example-company:+

When table schemas change, derived view schemas potentially change as well. For example, if the Address field schema evolves to include the Unit Number, all views built on top of the Company table should expose that field as well. In the following section, we discuss our design choices for schema evolution management techniques for both tables and views.

Schema evolution management 

As new tables and views are created, and as table schemas evolve, schemas of views dependent on them also change. A change management mechanism should capture those changes, generate new Avro schemas for new tables or views, and publish new versions of their respective artifacts.

Snapshot- and event-driven models
In general, there are two broad schema evolution management models: snapshot-driven and event-driven. In the snapshot-driven model, our schema management system takes a snapshot of the metastore schema information at regular intervals, creates an artifact for each table or view, and publishes the artifacts to Artifactory. 

In the event-driven model, our schema management system captures change events on the metastore—such as those triggered by ALTER TABLE, CREATE TABLE, or CREATE VIEW commands—before extracting the new schema information and publishing it to the Artifactory (the next section will touch on how to generate Avro schemas for views). Trade-offs between the two models mainly revolve around schema freshness requirements. The event-driven model offers near real-time schema artifacts generation and publication.

Snapshot-driven algorithm
A very basic snapshot-driven algorithm for artifact generation—assuming S denotes the set of tables or views—looks like this:

Event-driven algorithm
Similarly, a very basic event-driven algorithm—assuming S denotes the ordered set of tables and views expressed in the event-driven model—is depicted below. Note that the loop in the following algorithm is a streaming loop, processing tables and views as their change events arrive.

If we assume events are processed serially, then all that is needed when creating or altering a view or a table is to generate and publish their respective artifact. When altering a view or table that other views may depend on, we publish new artifacts for the dependents as well. Below we relax the serial ordering assumption.

Out-of-order execution and parallelism

The snapshot-driven algorithm discussed in the previous section is order-independent, and hence, easily parallelizable. On the other hand, the event-driven algorithm is order-dependent. Parallelizing it breaks the event order assumption—even if events consumed in one thread (or unit of work) are ordered relative to each other, we cannot assume any ordering across threads. Consider the following order of events:

E1: CREATE TABLE T

E2: CREATE VIEW V1 AS SELECT * FROM T

E3: CREATE VIEW V2 AS SELECT * FROM V1

E4: ALTER TABLE T

Ideally, after this sequence of events, schemas for V1 and V2 should reflect the second version of table T schema (created at E4), rather than the first version (created at E1). Let us say that due to parallel execution, the events are seen in this order: E1, E3, E4, E2. If we apply the serial algorithm on this order, the following takes place:

  • E1: CREATE TABLE T: The first version of schema for table T is generated.

  • E3: CREATE VIEW V2 AS SELECT * FROM V1: Schema of view V2 is derived and will reflect the first version of table T schema. It will register what it directly depends on—in this case, V1.

  • E4: ALTER TABLE T: The second version of schema for table T is generated. Since T is not aware of any dependents, it does not refresh any further schemas.

  • E2: CREATE VIEW V1 AS SELECT * FROM T: Schema of view V1 is derived and will reflect the second version of table T schema. It will register what it depends on directly—in this case, T—but it is too late to be used, as ALTER TABLE T has already been processed. The ALTER TABLE T event can no longer be used to refresh V2, and ends up reflecting the first version of the schema instead of the second.

To address this situation, the algorithm should be tweaked to refresh schemas of the dependents (in this case, V2) even on CREATE events (in this case, V1), fixing the last step of the example above.

Deriving view schemas

diagram-of-a-view-schema-derivation

View schema derivation using view logical plan

Given a view definition and a set of Avro schemas corresponding to its base tables, we derive the Avro schema of the view. We use Coral to analyze the view and propagate the Avro schema types from the base table to the view through the logical plan. This way, we can preserve casing information, nullability information, and non-standard SQL data types such as Enums throughout the view definition. Given the logical plan of the query, we construct the Avro schema at each node. For example, a simple projection (without any functions applied) preserves the schema. Functions change the schema types based on the function inference mechanism. We implement a number of function transformations to preserve non-standard SQL types as much as possible. Renames preserve the schema structure, but alter the field names. Joins concatenate left and right schemas together, and filters do not change the schema.

What’s next?

In the future, we are considering incorporating more type systems beyond Avro into this framework. Essentially, we are considering developing new types of schema classes that do not require the encoding or decoding of Spark internal rows. Those schema classes would instead directly wrap internal rows in a type-safe manner. With a few extensions, the API of the schema classes can remain the same, while its implementation differs by engine dependent on engine’s internal representation, enabling the type classes to be used in multiple engines without paying a penalty in performance. For Spark, the underlying implementation will be internal Rows. 

This paves the way also for providing a platform-independent way of implementing transformations or writing UDFs on those type systems using our platform-independent Transport UDF framework, getting the best of both worlds and enabling type-safety and platform independence for both developer productivity and reproducibility, while allowing UDFs to still execute natively on each engine, to maximize performance.

Acknowledgements

This work would not be possible without the collaboration with our Foundation team, specifically Yu Feng, Dan Hata, and Thuc Nguyen. We are very grateful for the immense support and help we have received from our very own Big Data Platform team members: Shardul Mahadik, Suren Nihalani, Chris Chen, Nagarathnam Nuthusamy, Sushant Raikar, Carl Steinbach, and Shirshanka Das. Thank you to the executive team, Vasanth Rajamani, Eric Baldeschwieler, Kapil Surlaker, and Igor Perisic, for their continued support for this project and many others. Finally, thank you to our peer reviewers, Min Shen, Issac Buenrostro, SungJu Cho, and Michael Kehoe for their valuable feedback.