Data Streaming/Processing

Streaming Data Pipelines with Brooklin

Editor's note: This blog has been updated.

Near-realtime (nearline) applications drive many of the critical services within LinkedIn, such as notifications, ad targeting, etc. These applications need continuous and low-latency access to data, but moving data quickly and reliably from the storage layer to the nearline applications is a non-trivial problem. Building one-off data pipelines that serve the requirements of every application and dataset combination is not sustainable, as it slows down development and makes the infrastructure unmanageable.  

At LinkedIn, we have hundreds of nearline applications processing data that are distributed across multiple stores like Espresso, Oracle, MySQL, HDFS, Kafka, Azure Event Hubs, and AWS Kinesis. We want to get to a state where application developers focus solely on processing events and not on building pipelines to move data. Brooklin was built to address this need. Comparable solutions in the streaming ecosystem include Facebook’s Wormhole and Kafka Connect. There are a few different ways to address similar problems in this space, and we believe that the approach we’ve taken with Brooklin is particularly suited to realizing our goals.

What is Brooklin?

Brooklin is a data ingestion service that can be used to stream data from multiple streaming sources to different destinations. It can stream data from messaging systems (Kafka, Event Hubs, etc.), databases (Oracle, MySQL, etc.), datastores (NFS, etc.) and publish to destinations such as Kafka, Event Hubs, HDFS, etc. At LinkedIn, we use Kafka as the default platform for message delivery, and because of that, Brooklin publishes to a Kafka topic in the majority of use cases. We are also actively working on building support for publishing directly to HDFS. For scenarios where applications are running in Azure, Brooklin uses Azure Event Hubs as the default destination stream.

brooklin-2

Brooklin is a critical part of LinkedIn’s streaming infrastructure, and its usage typically falls into two buckets: a Change Data Capture (CDC) service, or a bridge connecting systems in different environments.  

Change Data Capture (CDC)
In many large-scale deployments, the source-of-truth databases serve online queries. These queries have strict (real-time) latency requirements, but can compete for database resources with data processing applications. This competition impacts performance and availability of online queries, and the problem is magnified when the number of applications or size of the dataset grows. One approach to resolve the resource contention is to use the Change Data Capture (CDC) pattern. In this model, instead of consuming directly from databases, data processing applications consume from a change stream. The change stream contains an event for each write to the database, which gives applications access to all the database changes without impacting the online queries.

Brooklin can be used to produce a change stream for databases. A few possible approaches include streaming from the database change log or using database triggers to generate a change stream. At LinkedIn, Brooklin is used as a CDC service to stream change events from Espresso and Oracle databases.

Let’s look at few examples where CDC can be used. Assume you want to write an application that notifies a member each time one of their connections changed their profile (e.g., a job change) on the LinkedIn site. If the database that stores the member profile is configured as a Brooklin source, then writes to this database will be published as a stream of change events to the destination stream (Kafka topic). The notification application can now be built as a Kafka consumer that processes each event and updates that member’s connections.

Another application that may be interested in profile updates could be a search index. Soon after a profile update, we would like to see the member’s profile listed in the search results updated with the new job position. To keep the search indices fresh, a search application can consume events from the same change stream and update its corresponding indices in near-realtime. To enable offline (batch) processing of profile updates, we would need to generate regular snapshots of the profile database in the offline grids. ETL applications can subscribe to change events and merge them with previous snapshots to generate newer snapshots.

Bridge connecting systems in different environments
It is common for data to be spread across different environments (public cloud and company data centers), geo-locations, or different deployment groups (clusters, DMZ, etc.). Typically, each environment adds additional complexities due to differences in access mechanisms, serialization formats (JSON, Avro, Thrift), compliance, security requirements (encryption, data obfuscation, etc.), or the need for custom monitoring or firewall exceptions. A dedicated service that can move data seamlessly across environments is preferred. All of the complexities can be managed out of a single service and be abstracted out of application development. For example, at LinkedIn, we have built bridges using Brooklin that stream data from Azure Event Hubs and AWS Kinesis to Kafka topics. In this model, application developers can focus solely on business logic and consuming from Kafka. Another common use case for a bridge is to mirror Kafka topics across different Kafka clusters.

Architecture

Brooklin is a distributed and stateless service. A typical deployment of Brooklin has multiple server instances coordinating with each other, and each server instance is responsible for streaming a portion of the data (partitions) from the source to the destination. Zookeeper is used for coordination between instances, for storing metadata, and for distributing task assignments.  

A Brooklin pipeline is provisioned by the creation of a datastream object. The datastream object describes a data pipeline and includes information about the source, destination, serializer, deserializer, metadata for credentials, encryption, and processing instructions, etc.

brooklin-3

Below is the layout of the Brooklin server components.

brooklin-4

Datastream management API
This is a REST endpoint to create, update, manage, or delete datastream objects. It stores datastream objects in Zookeeper. At LinkedIn, we have a self-service portal called Nuage that facilitates creating and managing infrastructure resources, including Brooklin.

Coordinator
A coordinator is present in all server instances, and one of the coordinators is chosen as a leader. The leader coordinator computes task assignments based on datastream objects stored in Zookeeper and assigns tasks to all active server instances. The leader election and coordination between server instances is done via Zookeeper using well-known Zookeeper recipes. All coordinators (leader and followers) get notified when there is a new task assignment.  Tasks describe the source and destination types, source stream, and partitions. The coordinators hand over tasks to the connector components for data ingestion.

Connectors
Connectors are responsible for reading events from a data source. Each connector receives tasks from the coordinator, reads from the specified source(s)/partition(s), and sends the data to the producers for publishing onto the destination. A connector needs to periodically checkpoint the state of its current read position in the source stream, to ensure no data loss in the case of instance failures or reassignment of datastream tasks.

Producers
Producers publish data to the destination streams. The framework supports multiple producers publishing to different destination types, which could be messaging systems, databases, data stores, etc.

Capabilities

One of the motivations for building Brooklin was to make it easy for application developers to readily access data. This, along with the requirements of streaming applications and our experience from building Databus (an earlier change capture service) influenced most of the Brooklin feature set. At present, Brooklin has the core capabilities to address the needs of a majority of streaming applications while providing the flexibility for future requirements.

Pluggable source and destinations
Brooklin can stream from different source types and supports producing to different destination types. The connector and producer APIs are standardized and allow developers to implement support for additional source and destination types.

Dynamic provisioning
The REST management endpoint enables application developers to dynamically provision, reconfigure, or tear down data pipelines within minutes.

Multitenant
Brooklin is designed to be a heavily multitenant cluster and can simultaneously power several hundreds of data pipelines across the different source and destination types.

Read and write optimized
From our experience with operating Brooklin, we have noticed that it is very common for many applications to be interested in the same source. To address this, data pipelines corresponding to the same source are deduped to the same destination stream. Deduplication reduces the number of transactions (reads and writes) as well as storage and network costs. However, some applications might prefer a dedicated destination stream to control the number of partitions, retention policies, quotas, etc. Brooklin supports this option (referred as BYOT: Bring Your Own Topic), and in this case, Brooklin reads the events only once from the source and publishes to multiple destination streams.  

Guarantees
Brooklin supports “at-least-once” delivery guarantees in addition to maintaining the order of events (in-order delivery) within a partition. However, Brooklin does not guarantee ordering across partitions.  

Samza integration
Apache Samza tasks use system consumers (Samza system consumer) and system producers to read and write messages from and to underlying systems (e.g., Kafka, Databus, Hadoop, etc.). Brooklin leverages existing Samza system consumers and producers such that any existing Samza consumer and producer can be used as a Brooklin connector or producer. We are evaluating if we can take a similar approach for Apache Storm spouts and other stream processing systems.

Pluggable serializer and deserializer
At LinkedIn, we standardize on Avro in most cases as the common format for data systems, which greatly simplifies data management and exchange. However, when integrated with external systems, we encounter a variety of data formats. Common use cases include encoding formats, such JSON, Avro, Protobuf, Thrift, etc. Brooklin supports pluggable serializers and deserializers (SerDe), and this SerDe configuration for a data pipeline can be established dynamically.

Bootstrap support
Bootstrap is mostly relevant to Change Data Capture scenarios. At times, applications need a complete snapshot of the data store. This could happen when the application starts for the first time or when it needs to reprocess the entire dataset because of a change in processing logic. Brooklin supports bootstrapping by having two destination streams, one of which has the complete snapshot of the database, and one that has the change feed. Applications start by consuming the bootstrap stream and can later switch to the change stream. The Brooklin Samza system consumer abstracts this functionality of starting from a bootstrap topic and then seamlessly switches to the change-capture stream.   

Transaction support
Many databases have transaction support, and for these sources, Brooklin connectors can ensure transaction boundaries are maintained.

Brooklin applications
Applications consume Brooklin events directly from the destination stream by using a client specific to the destination systems. At LinkedIn, the destination stream is a Kafka topic, so applications typically consume events either through a Samza Job or a Kafka consumer. Brooklin also ships with a Samza system consumer, which makes it easier to write Brooklin applications.

Brooklin in production

Brooklin has been running successfully for LinkedIn workloads since October 2016. It is currently the default change-capture solution for Espresso and Oracle sources and streams events from 150+ sources, serving roughly 12.5 billion messages per day. Brooklin is also being used as a bridge to stream data between LinkedIn data centers (Kafka) and the cloud.

What next?

We plan to build connectors to support additional data sources (MySQL, Kinesis, etc.) as well as destinations (HDFS, DocDb, Kinesis, and Couchbase). We also plan to add server-side processing, such as filtering, projections, repartitioning, encryption, data obfuscation, etc. These would allow data to be ingested into the pipeline in the desired form right from the beginning. Towards this goal, we are exploring integration with Gobblin tasks. Continuing with LinkedIn’s tradition of contributing to the developer community, we intend to open-source this framework in 2018.

Acknowledgments

We want to thank Kartik Paramasivam, Swee Lim, and Igor Perisic for supporting the Brooklin engineering team throughout our journey. We also are grateful to the Kafka, Samza, Nuage, and Gobblin teams for being wonderful partners. Finally, a huge shout out to the members of the Brooklin engineering team: Greg Roelofs, Srinivasulu Punuru, Hai Lu, Peng Du, Celia Kung, Wei Song, Aditya Toomula, Deepthi Sridharan, Miguel Sanchez, Ed Yakabosky, Clark Haskins, Rafael Alvarez, Justin Kaufman, Thomas Law, Indrajeet Kumar, Sameer Singh, Venu Ryali and Akash Vacher who put their heart and soul into this product and made it what it is today.