Managing Distributed Tasks with Helix Task Framework
January 24, 2019
Co-authors: Junkai Xue and Hunter Lee
Stateless tasks are widely used for serving large-scale data processing systems. Lots of requests were made by systems, which rely on Apache Helix, for a stateless task management feature to be added to Apache Helix. Recently, our team decided to explore new ways to manage stateless tasks, in addition to our ongoing work to support Helix. The result of these efforts is Helix Task Framework, an engine that powers distributed execution of stateless tasks. In this post, we'll introduce Helix Task Framework, which is available in Helix as an advanced feature.
Apache Helix is a generic cluster management framework used for automatic management of partitioned and replicated distributed systems hosted on a cluster of nodes. Helix automatically adjusts and replaces partitions and their replicas in case of node failure-and-recovery, cluster expansion, and reconfiguration.
Helix separates cluster management from application logic by representing an application’s lifecycle through a declarative state machine. The Apache Helix website describes in detail the architecture of Helix and how it can be used to manage distributed systems. There are also a few public blog posts available that provide a walk-through of Helix’s internals.
Helix was initially developed at LinkedIn and currently manages critical infrastructure, such as Espresso (No-SQL data store), Brooklin (a search-as-a-service system), Venice (derived data serving platform), and the infrastructure for offline data storage and processing. More information about these systems could be found here.
Distributed tasks on Task Framework
Helix Task Framework supports distributed execution of both short-running and long-running tasks. Task Framework partitions a job into tasks and schedules them to be executed on Helix Participants (nodes) in the cluster. The actual scheduling is accomplished by assigning tasks to available Participant nodes, and these tasks undergo stages of execution and are labeled accordingly with corresponding states in the Helix Task StateModel.
Task Framework can subsequently trigger the execution of tasks or schedule tasks all in parallel. Helix Controller oversees all task executions in a cluster, avoiding the split-brain problem and enabling failure resilience by way of finite state machines and constraints defined by Helix.
There are three levels of abstraction for tasks: a workflow, a job, and a task. A workflow consists of one or more jobs. A job consists of one or more tasks. A task is defined as the smallest, mutually-independent, executable unit of work whose logic is implemented by the user. The following subsections delve further into the actual use cases of these abstractions.
There are two types of workflows: a generic workflow and a job queue.
A generic workflow is a directed acyclic graph (DAG) representation of dependencies among jobs. These dependencies explicitly define the order in which jobs must be run. Depending on the DAG, there may exist multiple possible orderings.
A job queue is a special type of workflow. A job queue is different from a regular workflow in that the dependency among jobs is linear. As such, a job will not be scheduled until all other jobs in front of it in the queue have run and completed successfully. However, note that Task Framework allows users to change this behavior—for example, by modifying configuration parameters, you can allow a job to run even though there is a failed job somewhere before it in the dependency chain.
Task Framework also allows workflows to be either one-time or recurrent. A generic workflow described above is by default one-time and will be discarded from Task Framework’s metadata store upon completion. Recurrent workflows or job queues, however, are suitable for use cases in which business logic needs to run on a regular basis. To create a recurrent workflow, the user must create what is called a “template.” This template contains the information and parameters (such as frequency, start time, task logic, etc.) for creating a one-time instance of a workflow. Then Task Framework will automatically create and submit workflows at regular intervals as defined by the given template, eliminating the need for users to resort to other scheduling tools, like cron for example.
A job is the next level of abstraction in Task Framework. A job can have many identical or non-identical tasks. Whether a job completes successfully or not depends upon the status of its tasks; for example, if its tasks do not complete within the timeout threshold or fail for whatever reason, then the job will be marked as timed out or failed. A job will only be marked as completed when all of its tasks have completed successfully.
There are two types of jobs supported in Task Framework: generic jobs and targeted jobs. A job is a generic job by default. It consists of one or more tasks, which will be distributed over multiple nodes per the assignment generated by Helix Controller. A targeted job, however, binds to a “target,” which is a partition of a non-task Helix resource (a “Helix resource” here denotes an entity managed by Helix, such as a database partition). Suppose a distributed database system running on Helix wants to run a back-up job. That means the back-up tasks must be scheduled and run precisely on the nodes where the “target” database partitions are. A targeted job accomplishes exactly that; it has a “target” resource (the distributed database in this example), and its tasks will run on all machines serving partitions of that target resource. It's also possible to specify the state (defined by Helix’s StateModel) of the target, meaning the targeted tasks will run only on the machines where a replica of the specified target partition is in the target state.
A task is the smallest independent unit of work whose logic is implemented by the user. Internally, each individual task follows the Task StateModel defined by Helix. The assignment and scheduling in Task Framework take place at a task granularity. The following is a code snippet of the Task interface:
Powering distributed systems: Applications of Task Framework
Managing distributed database jobs in Espresso
Espresso used to have a traditional backup method, where each node ran a backup scheduler locally, backing up every partition it hosted. This approach had the following two limitations.
Lack of a central, coordinated scheduler: Since backup was scheduled independently on each node, and each node was unaware of what other nodes were backing up, there were a lot of duplicate backups of the same partitions—for every replica of the partition. Although nodes sent asynchronous messages to each other to keep track of what partition was being backed up, this was unreliable due to a race condition. What's worse was that sometimes we saw no backup produced for partitions at all!
Lack of task progress monitoring: The backup schedulers sent messages to the storage node to trigger a backup task. However, this message only started the backup task—we needed a way to track its progress. For tracking, we sent frequent getStatus messages to Helix, which created a new node in Zookeeper. However, this approach soon proved to be problematic due to a large volume of messages it spawned trying to back up several hundred partitions.
Espresso now runs its backup jobs on Task Framework. This is beneficial because Espresso was already using Helix’s generic resource management functionality for its database partitions. Helix Controller is aware of state changes taking place around database partitions and therefore able to easily determine what tasks need to be bound to which database partitions. Additionally, Helix Controller acts as the centralized scheduler that can control the number of concurrent tasks and determine when backup tasks should be run (to avoid peak traffic hours). The following section provides a more detailed walk-through.
How Espresso runs backup tasks on Task Framework
Task Framework requires users to implement two Java interfaces: Task and TaskFactory. Helix will use TaskFactory to generate instances of Task at runtime. The next step involves telling Espresso’s nodes (called storage nodes by Espresso and Participants by Helix) that we will be running tasks on them. This is done by registering TaskFactory to StateMachineEngine (accessed via HelixManager). Then, Espresso’s central controller/scheduler is ready to create and submit tasks using a set of APIs in TaskDriver. TaskDriver provides an easy-to-use set of APIs that allow users to create, run, stop, and delete workflows. In addition, users may retrieve the state information of each workflow through TaskDriver.
Monitoring and operations
To enhance the operability and user experience, Helix provides light-weight REST service Helix REST 2.0 and Helix UI. Helix UI talks to Helix REST to master all the configs and track Task Framework status.
As stated above, Espresso uses targeted jobs for backups. Helix UI is very useful for spotting any abnormalities in the execution of jobs. A user can easily access Helix UI to monitor the status of existing workflows and jobs and perform simple operations. Furthermore, Helix UI supports authentication, operation authorization, and audit logging, which adds a layer of security for distributed applications built with Helix.
Recent performance and stability improvements
Minimizing redundant ZNode creation
IdealState and ExternalView are the metadata information reflecting future status and current status of a Helix-defined resource. In older implementations of Task Framework, a job was treated as a Helix resource, like a DB partition, for example. This meant that an IdealState ZNode and an ExternalView ZNode were being generated for every job at creation. This was problematic because Task Framework jobs and generic Helix resources are different in nature: jobs tend to be short-lived, transient entities that are created and removed frequently, but generic Helix resources tend to be enduring entities that stay and continue to serve requests. Therefore, the creation and removal of so many ZNodes were proving to be costly and no longer appropriate for jobs. These ZNodes would stay in ZooKeeper until the specified data expiry time (usually one day or longer). It was even worse for recurrent jobs—one set of IdealState/ExternalView ZNodes was being created for the original job template, and another set of IdealState/ExternalView ZNodes would be created for each scheduled run of the job. An improvement was made so that the IdealState ZNode of a job would only be generated when it was scheduled to run and would be removed immediately once the job was complete. In practice, there should be only a few jobs running concurrently for each workflow, so only a small number of IdealState ZNodes would exist for each workflow at any given time. In addition, the job's ExternalView ZNode would no longer be created by default.
Problem with scheduling recurring jobs
We discovered that the timer was not stable when scheduling recurring jobs from recurrent workflows. This was mostly because we set a timer for each job when it was added to a recurrent job queue and because maintaining a large set of timers for all current and future jobs was prone to bugs. In addition, during Helix Controller's leadership hand-off, these timers were not properly being transferred to the new leader Controller. This was fixed by making Task Framework set a timer for each workflow, instead of jobs. There are now much fewer timers Helix needs to track, and during Helix Controller's leadership hand-off, the new leader Controller will scan all existing workflows and reset the timers appropriately.
Task metadata accumulation
We observed that task metadata (ZNodes) were quickly piling up in ZooKeeper when Task Framework was under a heavy load, continuously assigning tasks to nodes. This was affecting performance and scalability. Two fixes were proposed and implemented: periodical purging of job ZNodes and batch reading of metadata. Deleting jobs in terminal states periodically effectively shortened the metadata preservation cycle, which reduced the number of task metadata present at any given point of execution. Batch reads effectively reduced the amount and frequency of read traffic, solving the problem of overhead from redundant reads from Zookeeper.
Next steps for Task Framework
Restructuring metadata hierarchy in ZooKeeper
As discussed in previous sections of this post, the performance of Task Framework is directly affected by the amount of Zookeeper storage structure (ZNodes) present in ZooKeeper. Helix stores ZNodes for workflow and job configurations and contexts (current status of workflows/jobs) in one flattened directory. This meant that every data change, in theory, triggers a read of all ZNodes in the directory, which could cause a catastrophic slowdown when the number of ZNodes under the directory is high. Although small improvements like batch reads have alleviated the problem, we identified that the root of the issue is the way ZNodes are stored. In the immediate future, a new ZNode structure will be introduced to Helix so that Task Framework ZNodes will reflect the hierarchical nature of workflows, jobs, and tasks. This will greatly reduce the ZooKeeper read and write latency, enabling Task Framework to execute more tasks faster.
More advanced distribution algorithm and strategy
Task Framework uses Consistent Hashing to compute an assignment of tasks to available nodes. There are two ways in which task distribution could be improved. First, Helix Controller currently computes an assignment of tasks in every run of its pipeline. Note that this pipeline is shared across Helix’s generic resource management, which implies that some runs of the pipeline may have nothing to do with Task Framework, causing Helix Controller to compute a task assignment for naught. In other words, we have observed a fair amount of redundant computation. Moreover, Consistent Hashing might not be the appropriate assignment strategy for tasks. Intuitively, matching tasks up with nodes should be simple: whenever a node is able to take on a task, it should as soon as possible. With Consistent Hashing, you might see some nodes busy executing tasks whereas other nodes would be sitting idle.
The producer-consumer pattern has been identified to be a more appropriate model for distributing tasks over a set of nodes—the producer being the Controller, and the set of available nodes being the consumers. We believe that this new distribution strategy will greatly increase the scalability of Task Framework.
Task Framework as an independent framework
Helix started as a framework for generic resource/cluster management, and Task Framework was developed by treating jobs as a special resource with its own state model. However, we now have LinkedIn users only using the Task Framework functionality of Helix. Task Framework has seen its share of growth, and to meet the ever-increasing scalability needs of its users, we have decided that its separation from generic resource management is inevitable.
The separation work is threefold: 1) Reduce resource competition; 2) Remove needless redundancy; and 3) Remove deployment dependency. The work of both Task Framework and the generic resource management framework are managed by a single, central scheduler: Helix Controller. That means that a single Controller runs in one JVM with no isolation, and we cannot prevent a slowdown in resource management from affecting assigning and scheduling of tasks, and vice versa. In other words, there is a resource competition between the two entities. In this sense, we need to separate one Helix Controller into two separate Controllers running independently—a Helix Controller and a Task Framework Controller.
This separation naturally solves the problem of redundant computation incurred in the pipeline, as mentioned in the previous section. Changes in generic resources will no longer trigger an unnecessary pipeline run in the Task Framework Controller. Also, we plan to extend this separation to the deployment/repository level so that new deployments/rollbacks are not affecting each other. We believe that this will not only improve the overall performance of both components but also improve the experience for developers.
In this post, we have explored Task Framework, a growing component of Apache Helix that helps you manage distributed tasks. We have also discussed the current limitations of the system and the improvements we are making to position Helix's Task Framework as a reliable piece of distributed infrastructure here at LinkedIn as well as for others in the open source community.
We would like to thank our engineers Jiajun Wang and Harry Zhang and our SRE, Eric Blumenau. Also, thank to Yun Sun, Wei Song, Hung Tran, Kuai Yu, and other developers and SREs from LinkedIn's data infrastructure for their invaluable input and feedback on Helix. In addition, we want to thank Kishore Gopalakrishna, other committers from the Apache Helix community, and alumni Eric Kim, Weihan Kong, and Vivo Xu for their contribution. Finally, we would like to acknowledge the constant encouragement and support from our management Lei Xia, Ivo Dimitrov and Swee Lim.