Data Streaming/Processing

Load-balanced Brooklin Mirror Maker: Replicating large-scale Kafka clusters at LinkedIn

At LinkedIn, Apache Kafka is used heavily to store all kinds of data, such as member activity, log storage, metrics storage, and a multitude of inter-service messaging. LinkedIn maintains multiple data centers with multiple Kafka clusters per data center, each of which contains an independent set of data. Mirroring (i.e., replicating) Kafka topics across the clusters and data centers not only enables easy accessibility and analytics by aggregation of data from multiple data centers, but also fault tolerance and recovery during cluster or data-center wide outages. Additionally, mirroring enables the ability to isolate the data access for performance or security reasons and limits the data availability based on security domains. Streaming infrastructure at Linkedin mirrors more than seven trillion messages per day between the clusters using Brooklin (Github repo).

Mirroring at such a continuously increasing scale brings a unique set of challenges including: 

  • How to efficiently replicate large-scale Kafka clusters that have topics with variable throughput rates? 

  • How to determine the optimal number of Kafka consumers and distribute the replication workload across them to maximize performance (e.g., replication latency, throughput)? 

  • How to dynamically scale the Brooklin clusters based on changes in traffic patterns (e.g., daily, weekly, hourly patterns)? 

Our team has addressed these questions to reduce the operational overhead and hardware operating costs while achieving peak performance using Brooklin. This post walks through our journey to efficiently predict the Brooklin cluster size required to mirror the Kafka clusters, evenly distribute the load across all the Brooklin instances, and dynamically adapt to changing traffic patterns.

Let us start with the basics to understand this post better.

Brooklin background

What is Brooklin? 
Brooklin is an open-source distributed system that streams data between heterogeneous source and destination systems with high reliability and throughput at scale. It is easily extendable to new sources or destinations and designed to handle multi-tenancy and power hundreds of data pipelines across different systems.

image-of-the-overview-of-brooklin

Figure 1 shows an overview of Brooklin.

Brooklin Mirror Maker (BMM)
Brooklin provides mirroring functionality that replicates topics between Kafka clusters across data centers. Brooklin service consumes from one cluster and produces to another while ensuring that there is no data loss, that each message is delivered at least once, and minimal delay in the delivery. LinkedIn migrated from Kafka Mirror Maker (KMM) to BMM in 2018 and open-sourced Brooklin in 2019; switching from KMM to BMM brought stability and reduced the operational overhead of KMM. In this blog post, we covered the pain points of mirroring Kafka clusters using KMM and how Brooklin solves those problems. 

Datastream
A datastream describes a data pipe between two systems: 1) a source system to consume the data from and, 2) a destination system to produce the data to. Datastream allows the setting of the regex-based allowlist that defines which datasets from the source will get mirrored. For example, the Kafka topics in the source cluster matching against the allowlist regex get mirrored by the datastream. A Brooklin cluster handles hundreds of different datastreams simultaneously.

Datastream task
A task is a unit of work for the datastream processing and is associated with a datastream. Brooklin distributes all of the topic partitions as part of the datastream across the tasks for processing. The task is a long-running process responsible for replicating the records from the assigned subset of topic partitions. Each of these tasks runs concurrently.

Challenges of mirroring with Brooklin Mirror Maker

Let us dive deeper into the current challenges with mirroring.

Oversubscribed tasks & grouping of hot partitions

image-of-the-variable-load-problem

Figure 2 shows the variable load problem across the datastream tasks due to partition distribution based on the count. The height of a partition represents the load/throughput rate in that partition.

BMM and other similar replication systems for Kafka rely on partition distribution across the Kafka consumers, based on the partition count rather than the partition throughput/message rate because Kafka does not natively provide partition throughput metrics. As illustrated in Figure 2, grouping multiple partitions with high throughput results in an overall throughput requirement of more than what a task can handle. This results in higher latency to mirror the records, impacting the delivery SLAs. There is no direct way to identify or distribute the load to mitigate the issue. High latency is a serious concern in some mirroring pipelines that are very time-sensitive in near-line infrastructure and are business-critical. 

Task count determination
Uneven load distribution across the datastream tasks makes the task count determination unpredictable. We manually improvise the task count to handle any changes in the workload. The task count adjustment can result in a wastage of hardware resources because of additional reserved tasks. Moreover, this process is prone to human errors and is a significant operational overhead. The task count prediction is even more tricky with load-balancing, as it is a multi-dimensional bin packing problem.

Dynamic scaling
The workload for mirroring can vary over time. The current system does not have a way to dynamically scale the mirror maker cluster to optimize the hardware cost and reduce the operational cost of manual interventions. 

Identify the offending partitions
The current system cannot quickly identify the topic partitions having a throughput rate more than what a task can support. The topic may not have enough partitions to distribute the traffic or has a poor partitioning logic, resulting in skewed traffic distribution. It is critical to identify these topics to take corrective actions, as it impacts the mirroring of other topics. 

How are we solving these problems at Linkedin? 

The distribution of partitions based on count is insufficient to replicate large-scale Kafka clusters efficiently. To achieve this, we need to distribute the partitions based on the per-partition throughput load. Because the partition level metrics are not readily available, we built a system that provides this information while also giving us the capability to dynamically scale the task count in a datastream to handle the changes in the workload.  

Elastic task count
We added support to have an elastic task count for a datastream, which enables dynamic scalability of the task counts based on the workload. The feature comes with upper and lower bounds to avoid a drastic change in the task count due to a sudden glitch. We then built a Partition Metrics Provider (PMP) that gathers and exposes all the required partition throughput information to get insights into the workload required for elastic task count. In the subsequent section, we will discuss the implementation of PMP.  

Partition Metrics Provider (PMP)

architecture-of0the-partition-metrics-provider

Figure 3 shows the architecture of the Partition Metrics Provider (PMP) that makes the partition-level metrics available. PMP uses open-source components like Cruise Control, Apache Samza, and Apache Pinot.

How to find the partition level metrics?
Cruise Control, which runs on all Kafka clusters at LinkedIn, is an open-source distributed service that monitors the Kafka clusters and automatically adjusts the resources to meet predefined performance goals. Cruise Control has been integrated into other systems/cloud providers, including Cloudera, Amazon MSK, Redhat AMQ Streams, Strimzi, Banzai Cloud, and Bloomberg. It emits partition metrics like the Kafka brokers, which host the partition and their disk usage, incoming byte rate, outgoing byte rate, and message rate, at a configured rate to a dedicated partition metrics sample store (i.e., a Kafka topic per cluster). Cruise Control takes actions based on the metrics generated for the topic partitions and pauses the collection of partition level metrics during an ongoing rebalance to avoid triggering another rebalance. The rebalance can run for hours, and the metrics are unavailable for that duration. To ensure partition metrics are available during rebalance, we added a feature in Cruise Control to enable the collection of these metrics to a different partition metrics sample store (i.e., a Kafka topic per cluster). We also ensure that we continue to collect metrics without accidentally triggering an additional rebalance operation by emitting to a separate topic. Through leveraging these Kafka topics, we were then able to build the PMP service. 

Apache Pinot-based system
After investigating the open-source and LinkedIn internal systems, we chose Apache Pinot to host the PMP service. Pinot is an open-source real-time distributed OLAP datastore that delivers scalable real-time analytics in low latency. It ingests data at a high rate, is immediately available to compute queries, and allows filtering results based on column fields like timestamp, data center, cluster, and topic. Further, Pinot efficiently computes aggregations like min, max, average, count, distinct, etc. and supports long data retention and SQL queries.

Apache Pinot can ingest real-time (in a few milliseconds) from only one Kafka topic for a table, which brings us two significant challenges: 

  • Merge Cruise Control Kafka topics from different Kafka clusters into one Kafka topic.

  • Kafka topics emitted by Cruise Control have their custom serialization format. Apache Pinot can ingest from any stream with a provided decoder, however, we configure all the Pinot tables with an Avro decoder at LinkedIn to avoid operational overheads, so we need to convert the records to Avro format.

Intermediate Samza pre-processor job
We architected an Apache Samza job that ingests the Cruise Control topics with custom serialized format from different Kafka clusters. This filters and transforms the records to Avro format and aggregates them to one Kafka topic.  Pinot then ingests the data from this Kafka topic in real-time.

Task count prediction and partition assignments in BMM

To manage the partition assignment based on load, we migrated the Kafka connectors in Brooklin from high-level Kafka consumer APIs to manual partition assignment APIs. The migration gave Brooklin significant power to customize the partition management.

We then added a new partition assignment strategy in Brooklin open-source that estimates the number of tasks required to mirror the Kafka topics in a datastream and distributes the partitions based on partition level metrics, which allows plug-in of the custom class that can provide these metrics. 

image-of-how-brooklin-interacts-with-pmp-and-monitoring-services

Figure 4 shows how Brooklin interacts with PMP and Monitoring services to do load-balanced partition distribution.

Datastream task count prediction

sample-pinot-query

Sample Pinot query to get last one-hour data

Brooklin queries PMP service on a rebalance trigger to fetch the partition metrics in the last X amount of hours and then predicts the number of tasks required to handle the traffic. Brooklin utilizes the topic level metrics if the partition level metrics are unavailable and also considers the maximum throughput performance achievable from one datastream task. It also has a configurable property to keep the throughput load at a certain threshold in the tasks and a buffer to handle sudden changes in the load. For example, suppose a task can process 10MBps compressed traffic, and the setup has a 70% threshold configuration. In this case, the task count will be predicted based on 10 * 0.7 = 7MBps compressed traffic and will reserve 3MBps for unknown traffic for each task. The task count prediction accounts for elasticity and gets bounded by minimum and maximum task configurations. The task count is directly proportional to the Brooklin cluster size and determines the Brooklin cluster size.

Partition assignment based on throughput load
The strategy intelligently distributes the partitions across the tasks ensuring that all of the tasks have a similar throughput processing workload. The partition distribution logic ensures not to group hot partitions and does not require the tasks to have the exact partition count. The assignment remains sticky unless a full rebalance is triggered. Brooklin uses the configurable default throughput rate for partitions with unknown throughput metrics and if the PMP service is unavailable, the strategy switches to the default sticky round-robin partition distribution. The current design is a foundation that allows the interfaces to be extended and has a future scope to involve additional parameters to consider while balancing the load.

image-of-partition-assignment-based-on-load

Figure 5 shows the partition assignment based on the load. 

Monitoring service
To focus on message delivery latency, we added a monitoring service that triggers rebalance in Brooklin whenever it notices that the latency has crossed a certain threshold. The rebalance typically completes within a few seconds and uses sticky assignments in other scenarios, like new topic onboarding, to avoid full rebalance. Brooklin rebalances the datastream tasks based on the current traffic. The monitoring service has an extensive capability to monitor the overall load and trigger rebalances during off-peak traffic time. This powerful capability promotes dynamic and automated shrinking and expanding of the clusters based on traffic load while also minimizing hardware usage while running in cloud environments, thereby optimizing cost and operational issues/overheads.

Performance evaluation 

Setup
This setup, which has two Brooklin clusters for comparison, evaluates the correctness and performance of the Brooklin clusters. Both clusters replicate the same Kafka topics from the source Kafka cluster to different topics in the same destination Kafka cluster. The tests subject the Brooklin clusters to the same load, and different destination topics ensure completeness via Audit. The audit ensures that the source and destination Kafka topic record counts match. The control cluster runs BMM without load balancing, and the candidate cluster runs Load-balanced BMM. Both the Brooklin clusters have similar hardware configurations that run in the destination data center so it performs remote read and local write operations.

image-of-the-topology-of-the-performance-evaluation-setup

Figure 6 shows the topology of the Performance Evaluation Setup. The control Brooklin cluster runs BMM without load-balancing, and the candidate Brooklin cluster runs BMM with load-balancing. Both the clusters mirror the same Kafka topics from the source to the destination Kafka cluster.

Key results
Control cluster without load-balancing had a skewed distribution for all of the collected metrics, and the deviation was significant. It was unclear from the graphs how many tasks a Brooklin host was running. The experiments showed that it was hard to achieve optimal performance just with regular partition distribution in the case of heterogeneous workloads. On the other hand, the candidate cluster running load-balanced BMM showed a well-balanced load across the tasks. The estimated tasks were sufficient to handle the traffic load and we saw minimal deviation in the collected metrics across the hosts. The Brooklin hosts with twice the number of tasks could process twice the number of records—this pattern allowed for easy prediction of the number of tasks supported on one Brooklin host. 

Inbound network rate

figure-comparing-the-deviation-in-the-inbound-network-rate-between-the-control-cluster-versus-candidate-cluster

 Figure 7 compares the deviation in the inbound network rate between the control cluster (without load balancing) v/s candidate cluster (with load balancing). There is a significant deviation in the control cluster v/s minimal deviation in the candidate cluster. Each line represents a Brooklin host. It is clear from the graphs that some Brooklin hosts in the candidate cluster are running twice the number of tasks between 10:00 and 12:00. 

Total record bytes processed rate

figure-of-the-deviation-in-the-total-bytes-processes-rate-in-the-control-cluster-versus-the-candidate-cluster

Figure 8 shows the deviation in the total bytes processed rate in the control cluster (without load balancing) v/s candidate cluster (with load balancing). Each line represents a Brooklin host with one or more datastream tasks running. The result is similar to the other metrics.

Total records processed rate

figure-of-the-deviation-in-the-total-records-processed-rate-in-the-control-cluster-vesrus-the-candidate-cluster

Figure 9 shows the deviation in the total records processed rate in the control cluster (without load balancing) v/s candidate cluster (with load balancing). Each line represents a Brooklin host, with one or more datastream tasks running. The graph reflects balanced tasks in the candidate cluster, which is critical for heterogeneous workloads.

Summary

Load-balanced BMM provides an efficient solution to the existing challenges of replicating large-scale Kafka clusters,  enabling the balanced workload distribution across the tasks based on partition throughput. The feature provides a capability to make load-aware task count predictions while ensuring efficient utilization of the task capacity. The feature also supports dynamic adaptation to the changes in the live traffic and promotes dynamic scaling of the datastreams while drastically reducing the manual resizing overhead. The feature generates alerts to resize the static clusters, which is super helpful in the case of manual cluster management. The feature also enables easy identification of the offending topic partitions breaking the SLAs using an analytics dashboard powered by Pinot. The feature is already in production, and we are witnessing a significant reduction in operational overhead.

Acknowledgments

I would like to thank Jhora Zakaryan, Keith Ward, and Sonam Mandal for their significant contributions to this feature. This feature would not be complete without the valuable guidance of Adem Efe Gencer, Rayman Matharu, and Subbu Subramaniam. Lastly, thanks to Aditya Toomula, Erik Holstad, Swathi Koundinya, and Clark Haskins for their continued guidance and support.