Infrastructure

TopicGC: How LinkedIn cleans up unused metadata for its Kafka clusters

Introduction

Apache Kafka is an open-sourced event streaming platform where users can create Kafka topics as data transmission units, and then publish or subscribe to the topic with producers and consumers. While most of the Kafka topics are actively used, some  are not needed anymore because business needs changed or the topics themselves are ephemeral. Kafka itself doesn’t have a mechanism to automatically detect unused topics and delete them. It is usually not a big concern, since a Kafka cluster can hold a considerable amount of topics, hundreds to thousands. However, if the topic number keeps growing, it will eventually hit some bottleneck and have disruptive effects on the entire Kafka cluster. The TopicGC service was born to solve this exact problem. It was proven to reduce Kafka pressure by deleting ~20% of topics, and improved Kafka’s produce and consume performance by at least 30%.

Motivation

As the first step, we need to understand how unused topics can cause pressure on Kafka. Like many other storage systems, all Kafka topics have a retention period, meaning that for any unused topics, the data will be purged after a period of time and the topic will become empty. A common question here is, “How could empty topics affect Kafka?” 

Metadata pressure

For topic management purposes, Kafka stores the metadata of topics in multiple places, including Apache ZooKeeper and a metadata cache on every single broker. Topic metadata contains information of partition and replica assignments. 

Let’s do some simple calculation here:  topic A can have 25 partitions, with a replication factor of three, meaning each partition has three replicas. Even if topic A is not used anymore, Kafka still needs to store the location info of all 75 replicas somewhere.

The effect of metadata pressure may not be that obvious for a single topic, but it can make a big difference if there are a lot of topics. The metadata can consume memory from Kafka brokers and ZooKeeper nodes, and can add payload to metadata requests. 

Fetch requests

In Kafka, the follower replicas periodically send fetch requests to the leader replicas to keep sync with the leader. Even for empty topics and partitions, the followers still try to sync with the leaders. Because Kafka does not know whether a topic is permanently unused, it always forces the followers to fetch from the leaders. These redundant fetch requests will further lead to more fetch threads being created, which can cause extra network, CPU, and memory utilization, and can dominate the request queues, causing other requests to be delayed or even dropped.

Controller initialization

Kafka controller is a broker that coordinates and manages other brokers in a Kafka cluster. Many Kafka requests have to be handled by the controller, thus the controller availability is crucial to Kafka. 

On controller failover, a new controller has to be elected and take over the role of managing the cluster. The new controller will take some time to load the metadata of the entire cluster from ZooKeeper before it can act as the controller, which is called the controller initialization time. As mentioned earlier in this post, unused topics can generate extra metadata that makes the controller initialization slower, and threaten the Kafka availability. Issues can arise when the ZooKeeper response is larger than 1MB. For one of our largest clusters, the ZooKeeper response has already reached 0.75MB, and we anticipate within two to three years it will hit a bottleneck.

Service design

While designing TopicGC, we kept in mind a number of requirements. Functionality, we determined that the system must set criteria to determine whether a topic should be deleted, constantly run the garbage collector (GC) process to remove the unused topics, and notify the user before topic deletion.

Additionally, we identified non-functional requirements for the system. The requirements include ensuring no data loss during topic deletion, removal of all dependencies from unused topics before deletion, and the ability to recover the topic states from service failures.

To satisfy those requirements, we designed TopicGC based on a state machine model, which we will discuss in more detail in the following sections.

Topic state machine

To achieve all of the functional requirements, TopicGC internally runs a state machine. Each topic instance is associated with a state and there are several background jobs that periodically run and transit the topic states if needed. Table 1 describes all possible states in TopicGC.

State Name Description
USED The topic is being used.
UNUSED The topic is not being used, and is empty.
NOTIFICATION_SENT Email notification has been sent to the topic owner, informing that the topic is going to be deleted after a certain amount of time. 
USER_WAIT_DONE After the email is sent to the user for a given amount of time, the topic is still unused. 
WRITE_ACCESS_BLOCKED The write access is blocked on that topic, meaning no data can be written to the topic anymore.
MIRRORING_DISABLED Mirror service is disabled for the topic.
DELETED The topic is deleted from the Kafka cluster
INCOMPLETE Usage is detected during the topic deletion process. 

Table 1: Topic states and descriptions

TopicGC workflow

With the help of internal states, TopicGC follows a certain workflow to delete unused topics.

Graphic of Topic GC state machine

Figure 1: TopicGC state machine

Detect topic usage

TopicGC has a background job to find unused topics. Internally, we use the following criteria to determine whether a topic is unused:

  • The topic is empty
  • There is no BytesIn/BytesOut
  • There is no READ/WRITE access event in the past 60 days
  • The topic is not newly created in the past 60 days 

The TopicGC service fetches the above information from ZooKeeper and a variety of internal data sources, such as our metrics reporting system.

Send email notification

If a topic is in the UNUSED state, TopicGC will trigger the email sending service to find the LDAP user info of the topic owner and send email notifications. This is important because we don’t know whether the topic is temporarily idle or permanently unused. In the former case, once the topic owner receives the email, they can take actions to prevent the topic from being deleted.

Block write access

This is the most important step in the TopicGC workflow. Think of a case: if a user produces some data right at the last second before topic deletion, the data will be lost with the topic deletion. Thus, avoiding data loss is a crucial challenge for TopicGC. To ensure the TopicGC service doesn’t delete the topics that have last minute write, we introduced a block-write-access step before the topic deletion. After the write access is blocked on the topic, there is no chance that TopicGC can cause data loss.

Notice that Kafka doesn’t have a mechanism to “seal” a topic. Here we leverage LinkedIn’s internal way to block topic access. In LinkedIn, we have some access to services to allow us to control the access for all data resources, including Kafka topics. To seal a topic, TopicGC sends a request to the access service to block any read and write access to the topic.

Disable mirroring

The data of a topic can be mirrored to other clusters via Brooklin. Brooklin is open-sourced by LinkedIn, as a framework to stream data between various heterogeneous sources and destination systems with high reliability and throughput at scale. Before deleting the topic, we need to disable Brooklin mirroring of the topic. Brooklin can be regarded as a wildcard consumer for all Kafka topics. If the topic is deleted without informing Brooklin, Brooklin will throw exceptions about consuming from non-existent topics. For the same reason, before topic deletion, if there are any other services that consume from all topics, TopicGC should tell those services to stop consuming from the garbage topics before topic deletion.

Delete topics

Once all preparations are done, the TopicGC service will trigger the topic deletion by calling the Kafka admin client. The topic deletion process can be customized and in our case, we delete topics in batches. Because topic deletion can introduce extra load to Kafka clusters, we set an upper limit of the concurrent topic deletion number to three.

Last minute usage check

Before any of the actual changes made to the topic (including blocking write access, disabling mirroring, and topic deletion), we run a last minute usage check for the topic. This is to add an extra secure layer to prevent data loss. If TopicGC detects usage during the whole deletion process, it will mark the topic as INCOMPLETE state, and start recovering the topic back to USED state.

Impact of TopicGC

We launched TopicGC in one of our largest data pipelines, and were able to reduce the topic count by nearly 20%. In the graph, each color represents a distinct Kafka cluster in the pipeline.

Figure 2: Total topic count during TopicGC

Improvement on CPU usage

The topic deletion helps to reduce the total fetch requests in the Kafka clusters and as a result, the CPU usage drops significantly after the unused topics are deleted. The total Kafka CPU usage had about a 30% reduction.

Figure 3: CPU usage improvement by TopicGC

Improvement On Client Request Performance

Due to the CPU usage reduction, Kafka brokers are able to handle the requests more efficiently. As a result, Kafka's request handling performance improved, and request latencies dropped by up to 40%. Figure 4 shows the decrease in latency for Metadata Request.

Image of Kafka request performance improvement by Topic GC

Figure 4: Kafka request performance improvement by TopicGC

Conclusion

After we launched TopicGC to delete unused topics for Kafka, it has deleted nearly 20% of topics, and significantly reduced the metadata pressure of our Kafka clusters. From our metrics, the client request performance is improved around 40% and CPU usage is reduced by up to 30%. 

Future plans

As TopicGC has shown its ability to clean up Kafka clusters and improve Kafka performance, we have decided to launch the service to all of our internal Kafka clusters. We are hoping to see that TopicGC can help LinkedIn have a more effective resource usage on Kafka.

Acknowledgements

Many thanks to Joseph Lin and Lincong Li for coming up with the idea of TopicGC and implementing the original design. We are also grateful for our managers Rohit Rakshe and Adem Efe Gencer, who provided significant support for this project. Last but not least, we want to shout out to the Kafka SRE team and Brooklin SRE team to act as helpful partners. With their help, we smoothly launched TopicGC and were able to see these exciting results.