Kafka is a distributed publish-subscribe messaging system. It is open-sourced under the Apache 2.0 license and also submitted to be an Apache incubator project. A paper on this project was published in the NetDB ’11 conference.
This system is aimed at providing a publish-subscribe solution that can handle all activity stream data and processing at LinkedIn. Current usage of Kafka at LinkedIn includes:
- Collecting log data (e.g., frontend activity events and service metrics) and loading them into Hadoop/DWH for offline reporting and ad hoc analysis.
- Feeding log data directly to online applications, such as security, firehose (feeding Signal and LinkedIn Today), wotc, and realtime service dashboard.
- Deploying derived data (e.g., customized emails and connection strength scores for Cloud) from Hadoop to production.
Recently, we released Kafka v0.6 to open source and also deployed it at LinkedIn. This is the first release after we open-sourced the project towards the end of 2010. Our philosophy is to successfully deploy any big feature in production at LinkedIn, before making it available in an open-source release. Although this ensures a stable release, it could take several months to get the next version out. We are now working on developing frequent and regular release cycles for Kafka.
In this release, we focused on enhancing the producer component. In addition to this, several important bug fixes and enhancements have made it into this release.
Here is a description of the producer features that we developed for v0.6:
Automatic load balancing
In v0.6, we introduced built-in automatic load balancing between the producers and the brokers in Kafka. Currently, in our own usage we publish from a large number of heterogeneous machines and so it is desirable that the publisher not need any explicit knowledge of the cluster topology. We rely on a hardware load balancer to distribute the producer load across multiple brokers. An advantage of using the hardware load balancer is the “healthcheck” functionality that detects if a broker is down and forwards the producer request to another healthy broker. In v0.6, this “healthcheck” feature is provided in the cluster-aware producer. Producers discover the available brokers in a cluster and the number of partitions on each, by registering watchers in zookeeper. Since the number of broker partitions is configurable per topic, zookeeper watchers are registered on the following events:
- new broker comes up
- broker goes down
- new topic is registered
- broker gets registered for an existing topic
Internally, the producer maintains an elastic pool of connections to the brokers, one per broker. This pool is kept updated to establish/maintain connections to all the live brokers, through the zookeeper watcher callbacks. When a producer request for a particular topic comes in, a broker partition is picked by the partitioner (see section on Semantic partitioning). The available producer connection is used from the pool to send the data to the selected broker partition.
Asynchronous non-blocking operations are fundamental to scaling messaging systems. In Kafka, the producer provides an option to use asynchronous dispatch of produce requests (producer.type=async). This allows buffering of produce requests in an in-memory queue and batch sends that are triggered by a time interval or a pre-configured batch size. Since data is typically published from set of heterogenous machines producing data at variable rates, this asynchronous buffering helps generate uniform traffic to the brokers, leading to better network utilization and higher throughput.
Consider an application that would like to maintain an aggregation of the number of profile visitors for each user. It would like to send all profile visit events for a member to a particular partition and, hence, have all updates for a member to appear in the same stream for the same consumer thread. In v0.6, we added the capability to the cluster aware producer to be able to semantically map messages to the available kafka nodes and partitions. This allows partitioning the stream of messages with some semantic partition function based on some key in the message, to spread them over broker machines. The partitioning function can be customized by providing an implementation of the kafka.producer.Partitioner interface, default being the random partitioner. For the example above, the key would be member_id and the an example of a partitioning function would be hash(member_id)%num_partitions.