Reducing Apache Spark Application Dependencies Upload by 99%
March 9, 2023
Co-authors: Shu Wang, Biao He, and Minchu Yang
At LinkedIn, Apache Spark is our primary compute engine for offline data analytics such as data warehousing, data science, machine learning, A/B testing, and metrics reporting. We execute nearly 100,000 Spark applications daily in our Apache Hadoop YARN (more on how we scaled YARN clusters here). These applications rely heavily on dependencies (JAR files) for their computation needs. This is especially true for machine learning pipelines, which often involve multiple stages of data processing, model training, and evaluation, each stage requiring different software components and dependencies. Every day, we upload nearly 30 million dependencies to the Apache Hadoop Distributed File System (HDFS) to run Spark applications. Uploading these dependencies introduces substantial overhead to application runtime, with nearly 20% of the time spent on dependency preparation for a median-length application.
In this blog post, we will share our analysis of Spark Dependency Management at LinkedIn, highlight interesting findings, and present our design choice of using a simple user-level cache over more complex alternatives. We will also discuss our rollout experience and lessons learned. Finally, we will demonstrate the impact of accelerating all Spark applications at LinkedIn at the cluster level. Faster Spark jobs translate to increased data freshness, leading to an enhanced member experience by way of more relevant recommendations, timely insights, effective abuse protection, and other similar improvements.
Spark Dependency Management
Spark applications require external libraries, resources, and configurations to run. These dependencies often come in the form of JAR files.
Here is the current life cycle of a Spark Dependency:
Before Spark applications are submitted to YARN, the required dependencies are uploaded to HDFS via distributed cache, so that they are accessible by all Spark executors.
When the application is submitted to YARN, NodeManagers used to run Spark executors will download the required dependencies to their local file systems, a process known as "dependency localization." This ensures that the dependencies are available locally when a Spark executor begins running.
Dependency Usage Pattern
Recently conducted some benchmark evaluations on Big Data Pipelines running in LinkedIn, with a majority of them being Spark Jobs. We were surprised to find that Spark applications were facing substantial overhead during the dependency upload process:
On average, 20% of runtime was spent on uploading dependencies
In some extreme cases of short running jobs with many dependencies, we observed that up to 80% of the application's runtime was spent on uploading.
To better understand the uploading overhead at cluster level, we deep dived into our cluster metrics. Our analysis confirmed it was a huge issue at scale. Specifically, in one of our production clusters in April 2022, Spark applications uploaded over 146 million dependencies, but only 87,000 of them were unique dependencies. From user perspective, a typical Spark user normally uploads 132,000 dependencies per month, but only 440 of them are unique, which means a same jar could have been uploaded to HDFS 300 (132,000/440) times.
We conducted further analysis on the JAR usage patterns for each user in our cluster. Figure 1 illustrates the results, where each dot represents a user and the x-axis displays the number of unique JARs they uploaded each month, while the y-axis shows the percentage of unique uploaded JARs. The results show that the majority of our users are situated at the bottom of the graph, indicating that only a small portion of the JARs they upload each month are unique. So why are unique dependencies small from either cluster level or user level? It is mostly due to the frequency of dependency updates being much lower than the frequency of scheduled execution as well as many common dependencies being shared by many jobs.
Figure 1: Spark JAR Usage Pattern
Design Space Consideration
A dependency cache can be created to store commonly used dependencies and eliminate the need for repetitive uploading. This poses a question: should the cache be implemented at the user level (a separate cache for each user) or cluster level (one global cache for each cluster)?
When considering the number of dependencies that can be reduced, a cluster-level cache is the obvious choice. However, taking into account factors such as return on investment, maintenance, security, and scalability, a user-level cache becomes more advantageous.
The user cache is almost as effective as cluster cache. In terms of load placed on our infrastructure, the user cache can reduce the number of uploaded dependencies by 99.7%, while the cluster cache can reduce it by 99.94%. From a user's point of view, a cache with a 99.7% hit rate is almost equivalent to one with a 99.94% hit rate. Therefore, we can balance the costs of implementing and maintaining the cache against the marginal benefit of achieving the slightly higher hit rate. Yarn Shared Cache is a common example of a cluster-level cache implementation. However, an extremely important feature like strong authentication has not been completed yet (YARN-7282) since 2019, which is critical for enterprises like LinkedIn to provide strong security guarantees to our members. To utilize this feature, significant changes to YARN and Spark would be necessary, as well as the maintenance of additional services such as the Shared Cache Manager and Shared Cache Uploader.
Caching at the user level also removes the need for centralized global coordination, making it easier to scale and maintain. Additionally, the dependencies are stored within the user's own directory, simplifying access control and management of resources.
Based on our observations and analysis, we decided to implement the dependency cache at user level.
User Level Dependency Cache Implementation
To ensure the success of our implementation, we established the following objectives:
Reduce repeated uploads for each user.
Minimize infrastructure overhead.
Make the feature easy to use and maintain.
Ensure compatibility with existing applications with no user involvement.
At LinkedIn, we primarily schedule our batch workflows using Azkaban. Azkaban employs Jobtype plugins to support different types of jobs. For instance, Spark applications are submitted using the Spark JobType plugin, which takes care of all the required configuration tuning and submits the Spark job to the YARN cluster. Our dependency cache implementation resides inside Spark JobType. Figure 2 below illustrates the overall cache implementation. Specifically, it will perform the following steps:
Cache Cleanup: We cache dependencies within separate HDFS directories for each user, organized by month. Subdirectories older than two months are automatically cleaned up by the next execution, preventing the cache directory from becoming too large without the need for any additional services.
Cache Lookup: We look up dependencies in the current cache directory. The checksum of each dependency is calculated to confirm its integrity and detect any corruption and it is stored as part of the HDFS file name. Storing the checksum in the file name, instead of relying on the checksum stored internally by HDFS, allows us to efficiently retrieve checksums for all cached files by performing a single file listing operation instead of one checksum lookup call per file.
Dependencies Upload: We upload dependencies to HDFS, ensuring that multiple job submissions can operate concurrently without causing race conditions.
Job Configuration Update: We update spark.jars option, where unnecessary uploads get eliminated by replacing local dependencies provided by users with remote dependencies in users' cache. Dependencies which shouldn't be cached, such as SNAPSHOT JARs, are left as-is.
Figure 2: Spark User Level Dependency Cache Implementation
The design in Figure 2 provides a stable, scalable, secure, performant, and efficient dependency cache for Spark applications. Further optimization can be achieved by implementing parallel uploading and moving files from previous months to the current month prior to cleanup. Though our design is based on Azkaban Spark JobType, for other orchestrators like Apache Airflow, Flyte Workflow, the dependency cache can be implemented at SparkSubmitOperator.
Rollout with Caution
We have rolled out this feature to all clusters since it is not limited to a specific user or workload. Given the high volume of Spark applications that run daily, many of which are critical to LinkedIn members and business, we approached the rollout with care and consideration.
During our initial rollout, some of the testing flows were interrupted because the feature failed to resolve spark.jars when it contained invalid paths. As a result, the testing flows failed, and we had to roll back the feature. In preparation for our second rollout, we took the following actions:
We conducted extensive pattern analysis of how Spark users set their spark.jars configuration, which was included in our unit tests during development.
We gradually ramped this feature up within clusters and across clusters to minimize the risk of major failure caused by this feature.
We instrumented this feature to report feature-enabled and feature-completion status in addition to the Spark application's final status. This allows us to proactively monitor any failed Spark applications due to this feature during the ramp-up process.
Impact on Infrastructure and Business
This feature has been fully implemented at LinkedIn since November 2022, and its impact on our infrastructure and business metrics has been observed and validated as described in the following charts.
On a daily basis, almost 45TB (approx. 25 million) of JARs are cached, while only 200GB of JARs are not cached. Daily HDFS write operations have dropped by 25 million, which constituted approximately 10% of the total HDFS write operations.
Prior to implementing this feature, we were allocating more than 2,000 hours per day towards uploading those JARs. With the feature in place, the time required for this task has been reduced by a factor of 10.
This optimization has significantly improved the Spark application runtime for small-to-medium jobs, resulting in a median Spark Job runtime reduction from 9.4 minutes to 5.5 minutes, a decrease of 40%. For example, our anti-abuse models can refresh more frequently, providing a higher level of protection for our members.
Our project aligns with the "doing more with less" philosophy. We demonstrate the strength of data-driven system design, which enables us to identify the best practices that deliver the greatest infrastructure savings while reducing development and maintenance overhead. We will continue sharing our experiences and lessons learned with our fellow engineers to drive innovative and efficient infrastructure solutions.
The journey from identifying the issue to validating its impact has been a fulfilling experience. In addition to contributions from Shu Wang, Biao He, and Minchu Yang, the project was greatly contributed by Erik Krogen, Ye Zhou, and Mridul Muralidharan. The Spark team, including Aravind Patnam, Chandni Singh, Rob Reeves, and Min Shen (alumni), also played a key role. The project received support from sibling teams, including Binyao Jiang, Hao Qin (alumni), and Deepak Jaiswal from the COP team, Xing Lin and Virajith Jalaparti from the Storage team, Bowen Jiao and Chen Qiang from SRE team, and Yi Wu from the Anti-Abuse team. The management team, including Sunitha Beeram, Kunal Narula, Chen Xie, Keqiu Hu, Xiaotong Chen (alumni), and Rae Tewari, provided significant and sustained support. The design was further improved through reviews and discussions with experts such as Sreeram Ramachandran, Arvind Pruthi, Owen O'Malley, Eric Ogren (alumni), and Maneesh Varshney.