Declarative Data Pipelines with Hoptimator

June 26, 2023

For the last several years, internal infrastructure at LinkedIn has been built around a self-service model, enabling developers to onboard themselves with minimal support. We have various user experiences that let application teams provision their own resources and infrastructure, generally by filling out forms or using command-line tools. For example, developers can provision Kafka topics, Espresso tables, Venice stores and more via Nuage, our internal cloud-like infra management platform. These self-service integrations are typically owned by the teams that build and support the underlying infrastructure.

However, we've found that this vertical self-service model doesn't work particularly well for data pipelines, which involve wiring together many different systems into end-to-end data flows. Data pipelines power foundational parts of LinkedIn's infrastructure, including replication between data centers. Just as important, a growing number of use cases are driven by developers building applications.

To support these use-cases, we have built convenient onboarding experiences for a small subset of data pipelines, including Kafka-to-Kafka replication and Espresso CDC (Change Data Capture). However, despite running on the same infrastructure (Brooklin), these two examples have slightly different onboarding experiences, as they deal with completely different data sources. Moreover, developing each of these onboarding experiences requires considerable time and effort. This means that developers frequently encounter gaps in self-service, requiring them to build their own solutions. 

To reduce onboarding friction across a growing number of use-cases, we've been working on a unified control plane for all data pipelines at LinkedIn. Instead of having unique user experiences for each specific use-case, we are building a unified experience which leverages our existing infrastructure under the hood. As part of that effort, we have developed a new end-to-end data pipeline orchestrator called Hoptimator.

Current Gaps in Self-Service

Depending on the systems involved, developers can often use Nuage, Azkaban, or a command-line tool to create a single "hop" from one system to another:

Destinations Kafka Brooklin Espresso MySQL HDFS Venice
Kafka Huage N/A     Azkaban  
Brooklin N/A N/A Nuage Nuage    
Espresso     N/A      
MySQL       N/A    
HDFS auto Nuage Nuage   N/A Nuage
Blob CLI          
Venice Nuage       Azkaban N/A
Pinot Nuage       Azkaban  

Table 1: Partial listing of user onboarding experiences

The holes in the above chart – the majority of spaces – represent gaps where self-service does not exist yet. In those cases, creating an end-to-end data pipeline involves writing custom code to bridge the gaps. For streaming data pipelines, this involves writing stream processing jobs.

For example, to create an end-to-end data pipeline which brings data from Espresso into Pinot, we have self-service solutions for the Espresso→Brooklin hop and for the Kafka→Pinot hop, but not for the Brooklin→Kafka hop in between. A developer would need to write and operationalize a custom stream processing job to replicate their Brooklin datastream into a Kafka topic. A number of Samza and Beam jobs exist for such purposes.

The records streaming through these data pipelines often require transformation into a more convenient format. For example, Pinot ingestion by default expects records to be flat, with field names and types that are compatible with the Pinot table definition. It is unlikely that an Espresso table and a Pinot table happen to agree on these details. This sort of mismatch can occur between any pair of systems. Thus, data pipelines often involve some stream processing logic to transform records of one schema into another, filter out unnecessary records, or drop extra fields.

This means that data pipelines almost always require some form of stream processing in the middle. We have historically thought about these as two different technologies (e.g. Brooklin vs Samza), and have left it to developers to string them together. In order to provide an end-to-end data pipeline experience, we need a way to combine stream processing and data pipelines into a single concept.

Enter Flink

We've recently adopted Apache Flink at Linkedin, and Flink SQL has changed the way we think about data pipelines and stream processing. Flink is often seen as a stream processing engine, and historically the APIs have reflected that. But since the introduction of the Table API and Flink SQL, Flink has evolved to support more general-purpose data pipelines.

This is in large part due to the Table API's concept of Connectors, which are not unlike the connectors of Brooklin or Kafka Connect. Connectors are the glue between different systems, and thus are associated with data pipelines. To some extent, the Table API subsumes Brooklin's use-cases by pulling Connectors into a converged stream processing platform.

This means we can express data pipelines and stream processing in the same language (SQL) and run them on the same runtime (Flink). End-to-end data pipelines that would normally span multiple systems and require custom code can be written as a bit of Flink SQL and deployed in one shot.

Toward Declarative Data Pipelines

From a user perspective, the ideal end-to-end experience is a single authoring language (e.g. Flink SQL) for a single runtime (e.g. Flink) on a single big cluster. Users want to deploy a data pipeline with kubectl apply -f my-pipeline.yaml. The reality, however, is considerably more complex. A single end-to-end data pipeline at LinkedIn may span multiple purpose-built data plane systems (e.g. Brooklin, Gobblin), run on multiple stream processing engines (e.g. Samza, Flink), and talk to multiple storage systems (e.g. Espresso, Venice). Each of these may require manual onboarding, custom code, imperative API calls, and so on.

Starting from the ideal user experience and working backwards, we can imagine a declarative model for end-to-end data pipelines. This would present data pipelines as a single construct, but would implement them by assembling the required components from the data plane and compute layer. If the data pipeline requires some stream processing, we could automatically provision a Flink job. If part of the data pipeline requires an approval or review process, we could automatically trigger the workflow.

Leaning into the Kubernetes ecosystem, it's clear this would involve a sophisticated operator. This would take a custom resource spec (essentially, a YAML file) and turn it into various physical resources in the data plane. Ultimately, a single pipeline spec would result in new Flink jobs, Kafka topics, and so on.

However, it's not hard to imagine the proliferation of complex configuration that may result from such a model. It may be nice to have a single YAML file, but only insofar as that YAML file is itself simple.

To solve this problem, we started looking into expressing end-to-end data pipelines in SQL. We use streaming SQL extensively at LinkedIn, but existing SQL only expresses one "hop" of a data pipeline, e.g. from one Kafka topic to another. This has resulted in data pipelines that span hundreds of SQL statements. Ideally, an entire data pipeline could be codified as a single, high-level construct. What if an entire end-to-end, multi-hop data pipeline were just a SQL query?

Hoptimator: SQL-based Multi-hop Data Pipeline Orchestrator 

We've been building an experimental data pipeline orchestrator called Hoptimator. It's essentially a sophisticated Kubernetes operator that constructs end-to-end, multi-hop data pipelines based on SQL queries. Hoptimator's user experience is based on a high-level concept we call "subscriptions", which represent a materialized view. Given a subscription, Hoptimator automatically creates a data pipeline to materialize the corresponding view. This enables developers to create complex data pipelines with shocking simplicity:

$ cat my-subscription.yaml

apiVersion: ...
kind: Subscription
  name: sample-subscription-1
  namespace: sample
  sql: SELECT "name", "age" FROM ESPRESSO."SampleTable"
  database: KAFKA

$ kubectl apply -f my-subscription.yaml

In response, Hoptimator might create a new Kafka topic, provision a Brooklin CDC datastream, deploy an auto-generated Flink job, etc. The Flink job will include all the configuration, DDL, SQL, connectors, etc that it needs to run.

Notice that the SQL above makes no mention of Brooklin at all. A Brooklin CDC datastream is implied when accessing an online database (in this case, Espresso). The resulting Flink job will read from the datastream, not from the database directly. This is important at scale, because we never want stream or batch processing jobs to impact online database performance.

Flink on its own can read and write to external systems via connectors, but Hoptimator provides a mechanism to incorporate arbitrary infrastructure into a pipeline. This has the potential to yield the best of both worlds: highly performant, purpose-built infrastructure like Brooklin and Gobblin, but folded into a Flink SQL-like experience. Under the hood, a subscription may involve multiple hops through various systems, and may leverage multiple auto-configured connectors.

To do this, Hoptimator has a plugin model enabling custom integrations with external systems like Espresso. Unlike Flink Connectors, Hoptimator's "adapters" do not deal with reading and writing to those systems directly. Instead, they express what external resources will be required by the pipeline. This may be something simple like an existing Kafka topic, or something complex like a new Brooklin CDC datastream, a new Couchbase cache, or some part of an existing data pipeline.

As you may have guessed, adapters are also declarative. Adapters do not need to do much work – they simply declare what resources are needed for part of a pipeline. For example, an adapter does not need to know how to create a Flink job – it just needs to generate a FlinkDeployment spec.

Yes, it's open source!

We are just getting started with Hoptimator, and there are currently no production workloads using it directly. However, the project has attracted a lot of internal interest and excitement, and it's being used to quickly prototype new data pipelines. We are focusing on specific use-cases with especially thorny onboarding processes, but we think the model is broadly appealing. That’s why we've recently open-sourced a big chunk of Hoptimator, including support for Kafka and Flink.

To get started, try using the RAWKAFKA adapter, which doesn't require a Schema Registry or other existing infrastructure. You can spin up a test environment easily:

$ git clone
$ cd Hoptimator
$ make quickstart

Once the cluster is initialized, you can generate test data using the built-in DATAGEN adapter:

$ cat random-names-subscription.yaml
kind: Subscription
  name: random-names
  database: RAWKAFKA

$ kubectl apply -f random-names-subscription.yaml

For this simple subscription, Hoptimator will automatically provision a Kafka topic and a Flink job. Within a few seconds, you will see some random names appear on the new Kafka topic.

To further explore the tables and adapters available, you can launch the hoptimator SQL CLI, which is based on sqlline:

$ ./bin/hoptimator
> !intro
> !tables
> !q

Within the CLI, you can execute SQL statements without deploying anything:

$ ./bin/hoptimator
> SELECT * FROM RAWKAFKA."random-names" LIMIT 5;
> !q

To serve these queries, the CLI will generate Fink SQL, run it in-process, and tail the results. Run the !intro command to see additional examples, and !help for other commands.

Happy hopping!

Special thanks to Gerardo Viedma, Subbu Subramaniam, and Naveenkumar Selvaraj from the Pinot team, Abhishek Mendheka from the Flink team, Felix GV from the Venice team, Aditya Toomula, Vaibhav Maheshwari, Harshil Shukla, Eric Honer, Joseph Grogan, and intern Hui Wang from the Brooklin team for contributing to the design, review, and implementation of Hoptimator.