Improving performance and capacity for Espresso with new Netty framework
June 27, 2019
In this blog post, we’ll share how we migrated Espresso, LinkedIn’s distributed data store, to a new Netty4-based framework and achieved a large performance and capacity gain throughout the Espresso system as a result. In the larger scheme, this is particularly important since Espresso is a master data hub that serves many important applications across LinkedIn, and every improvement we implement has an impact on the user experience in one way or another.
First, we’ll provide a refresher on Espresso if you’re not familiar with it. Espresso is LinkedIn's scalable and elastic data-as-a-service infrastructure and, as mentioned, is a master data hub that serves many important applications across LinkedIn.
Here’s a 30,000-foot view of the overall Espresso ecosystem:
It’s important to note the Espresso operation can be divided into two parts:
- Control Path: The Control Path maintains and controls the state transitions of the different components.
- Data Path: The Data Path is how the data flows in the system. It includes three different operations, including online, nearline, and offline. The work we discuss in this post is mainly referencing the online operation, which services real-time client read/write and requires high availability and low latency.
Espresso is a horizontally-scaling, multi-colo data service, which has proved to be the most cost-effective and efficient for a large distributed system. However, even with commodity hardware, as the system grows larger and larger, the cost to build and maintain such a large system is significant. Therefore, improving the performance and capacity of each node is imperative to driving cost savings, considering the scale of the system.
About Netty framework
Netty is a Java application framework for networking services that is widely used at LinkedIn and in the industry. Netty provides high efficiency with a non-blocking, asynchronous, event-driven framework and high flexibility with a chained handler pipeline structure.
Here is an overview of protocol stacks in Netty framework:
The bottom of the stacks provide core functionalities like zero-copy-capable byte buffer, asynchronous event model, and universal APIs. The upper layers of the stacks provide comprehensive services like compression, SSL support, and network protocols like HTTP and WebSocket.
Netty migration on Espresso
The original Espresso data pipeline framework was developed around 2011 and the technology has evolved with new features and capabilities since then. However, most frameworks eventually run their course and we decided it was time to migrate the old system to a new Netty4 framework for a myriad of reasons:
To modernize the infrastructure, upgrade libraries, and enhance capabilities, security, and performance.
To support new features such as HTTP2, streaming, TLS1.3, and more.
To make fundamental changes to the Espresso online data path, including the new thread model and direct (off-heap) memory allocation with buffer pool.
When building the new framework, there were specific features we were set on implementing and building, which included a new thread model for I/O threads, better memory management with direct buffer pooling, streamlining the asynchronous pipeline, and providing native epoll support for socket. This section will expand on each of these.
A new thread model for I/O threads
We implemented a new thread model to avoid inter-thread locking between I/O threads. Thread-local variables are widely used to prevent contention between threads and improve CPU cache hit-rate. In the two following images, you can compare and contrast our old data flow with our Netty4 flow.
Direct buffer pool for better memory management
Instead of putting memory allocation load on JVM heap for transient memory usage, the direct buffer pool is now used for better memory management. The buffer pool directly allocates and manages the memory from the operating system, thus reducing the JVM heap GC pressure and related memory copy operation.
This diagram shows how the direct buffer pool is used to offload request/response data buffers from traditional JVM heap to the direct buffer pools.
Streamline the asynchronous handling pipeline
We made the online data pipeline asynchronous, where only the Store layer is still running in synchronous mode, to fit the JDBC requirements. The HTTP and Espresso layers are fully asynchronous.
In the asynchronous pipeline, the executions are non-blocking. The next execution in the queue does not need to wait for the previous execution to finish before starting the execution. This greatly improves the throughput and reduces the latency of the system.
Provide native epoll support for socket
In a typical Espresso cluster, there are thousands of TCP connections from routers to each storage node. We chose to use Netty’s native epoll approach, instead of the Java NIO epoll approach, because we found epoll to be more efficient for managing a large number of connections.
The advantages of the native epoll approach include: 1) it implements the edge-triggered model, which performs better than the level-triggered model of Java NIO epoll; and 2) the native epoll generates less garbage on the JVM heap, which reduces the GC pressure for the system.
Deployment and the result
Metrics for performance measurement
For a large system migration, it is important to compare the system performance before and after the migration. This generates evidence on what works and what does not, providing guidance for future work and references for other related projects.
In this project, the following metrics were used to measure the performance.
JVM GC: JVM OldGen GC and Young Gen GC are measured to show the heap usage.
Latency: P99 and max latencies of client requests are measured to show the health of the service.
Capacity: For capacity and throughput, we measured RCU/WCU (see the definition in Capacity Improvement section) and QPS.
In the following section, we’ll look at some of our initial results in these categories.
Production cluster results
The production Espresso system is composed of multiple clusters based on the internal customer profiles. Each cluster is configured to best fit the specific needs of the customer. From the traffic access pattern’s point of view, we can categorize the clusters into read-heavy and write-heavy clusters.
For both read-heavy and write-heavy clusters, we saw large latency improvements after the migration.
For read-heavy clusters, this latency improvement is mainly due to the new thread model asynchronous pipeline improvement, and the native epoll support.
For write-heavy clusters, since writing is slower and more expensive compared to reading, memory consumption is generally much higher than in read-heavy clusters. This leads to the direct memory buffer pool taking more load in the write-heavy cluster. Thus, we see the higher JVM GC improvement in the write-heavy cluster. From a latency point of view, in addition to the improvements in the read-heavy cluster, the JVM GC reduction in write-heavy clusters further improves the overall latency.
Here we select two sample clusters, one with write-heavy, one with read-heavy, to show the improvement after migration.
Sample cluster 1 (write-heavy)
In this sample, the migration happened on Sept. 28, and we saw large improvements in JVM GC and latency. For OldGen GC, we saw about a 100x reduction; we saw a 10x reduction for YoungGen GC; and we saw a latency reduction of 60%.
Espresso Storage Node - Total Latency
Sample cluster 2 (read-heavy)
In this sample, the migration happened on Oct. 18, as shown in the graph. We saw large improvements in latency, with a 30% reduction on P99 and max latency after migration.
Espresso Storage Node - Total Latency
Espresso measures the cost of a request based on how many bytes are processed in the storage node. Therefore, we measure RCU/WCU, in addition to QPS (queries per second), for capacity.
RCU: Read Capacity Unit – Up to 8K bytes read is counted as 1 read capacity unit (RCU)
WCU: Write Capacity Unit – Up to 1K bytes written is counted as 1 write capacity unit (WCU)
The cost of 1 RCU is approximately 0.6 of 1 WCU, depending on the hardware.
To measure the capacity improvement, we ran a series of tests to compare the difference between the old system (Netty3) and the new system (Netty4).
The capacity of RCU/WCU is defined as the maximum throughput of RCU/WCU that can be achieved within a specific SLA on a storage node.
We used the following SLA metrics in tests:
Latency: P50 → 10ms, P99 → 30ms
GC pressure: reasonable GC, no anomaly
Capacity with different data sizes
When reviewing the results of RCU/WCU capabilities with different data sizes, we saw:
About 100% RCU improvement across small to large data sizes for read operations
About 60-100% WCU improvement on different data sizes for write operations
Capacity with different QPS levels
We also measured against a fixed data size, looking at the performance differences under different traffic loads. For this scenario, we examined a fixed 4KB data size and looked at the performance of JVM GC and the latencies under different QPS traffic loads.
For GC, we saw large improvements across all QPS levels—from 500 to 20K QPS.
For max latency, we saw a similar improvement threshold, with latency improvement returns significantly increasing when QPS reaches 2K and higher.
This is no small feat, as capacity improvement can directly reduce the cost of serving of the system.
Finding the “sweet spot”
The “sweet spot” is the data size that achieves the best RCU/WCU throughput. In other words, we are striving to find the most cost-effective way to run the system.
As we can see from the following diagram, the point of tradeoff is between CPU-bound (small data size) and Memory-bound (large data size). For our current Espresso system, the “sweet spot” of data size is around 40 KB.
To optimize resource usage, it is in the best interest of both Espresso users and the Espresso team to have the request data size be close to the “sweet spot.” For example, the read request with 100B data size costs about the same as the GET request with 7KB data size. Therefore, aggregating multiple reads of small data sizes into a larger data size of around 40KB would save considerable resources.
Along this journey, we learned a lot from our successes, but equally from the challenges we encountered. We felt it was important to share those lessons.
CPU affinity, or…not
CPU affinity typically plays an important part in performance for SMP (Symmetric Multi-Processor) Architecture, which is common in today’s commodity computer hardware.
CPU caches are 5-100 times faster than Memory access.
Improved CPU cache hit-rate would bring a big performance gain.
Espresso is a heavy-loaded, muti-threading JVM application and we felt if we could improve the CPU cache-hit rate by applying CPU affinity, it’d be a big win. We spent time in this area with available tools and libraries, but found it difficult to achieve CPU affinity and cache line alignment within the Java system. For cache line alignment, there are some workarounds like padding and @Contended annotation, but they are hard to use and are only targeted for part of the problems.
Eventually, we abandoned this effort.
Memory leak detection
Memory leak can be a disaster for mission critical systems like Espresso. By off-loading the memory from JVM heap to the buffer pool, we improved the memory footprint and system performance. On the other hand, this also requires dual diligence to managing the memory buffers with allocate/free and reference cnt, just like a native language programmer does.
There are two approaches we used to effectively detect memory leaks in the system:
Built-in Netty framework tool to detect memory leaks at the development stage.
Turn on JVM option '-Dio.netty.leakDetectionLevel=advanced'
For stress testing and production, we developed utilities to monitor the buffer pool stats. Here is a sample of the buffer pool stats:
ALL Arenas Stats:
All Active: 27348 alloc: 8693759 dealloc: 8666411
Normal: 5201 alloc: 7976932 dealloc: 7971731
Small: 21903 alloc: 465891 dealloc: 443988
Tiny: 244 alloc: 250936 dealloc: 250692
Huge: 0 alloc: 0 dealloc: 0
In the above memory allocation stats, it shows the number of total allocations and deallocations and the active (in use) buffers for each buffer pool. By monitoring the number of active buffers over time, we can detect if there is any memory leak issue in the system.
For complex system changes like Espresso Netty4 migration, we found new challenges in Test and Deployment. On testing, we identified a need for better stress and performance testing tools, in addition to a better testing/canary in a production-grade setup. For deployment, we found existing tools are geared for stateless services, while config change and validation is difficult.
HTTP2 for Espresso
HTTP2 is supported over the Netty4 framework. Implementing HTTP2 for Espresso would bring the following benefits:
Efficient binary protocol to reduce data transport overhead.
Resolve the router connection pool scalability issue.
Provide a foundation for end-to-end streaming on Espresso.
To expand on that last point, enabling end-to-end data streaming for Espresso would allow for an asynchronicity in multi-read that would decrease latency. The current multi-read latency without streaming is at least 3-5 times greater than that of a single read, with the additional time/memory being spent waiting on the slowest response. Also with streaming, responses with a large amount of content can be divided into pieces and sent as a stream, which reduces memory GC pressure because we no longer need to hold large amounts of data in memory. This would allow for response size limits to be removed in Espresso.
Migrating an existing, large distributed system with new technologies, while at the same time maintaining the system up and running with committed SLA requirements, is non-trivial. By completing this project, we modernized the foundation of Espresso with significant performance and capacity improvement, paving the way for the new development and growth in the future.
This work would not have been possible without the contributions and help from many folks at LinkedIn. I would like to thank my colleagues Antony Curtis, Song Lu, Abhishek Andhavarapu, and Ning Xu for their invaluable contributions and feedback. Special thanks to Wei Song, Gaojie Liu, Banu Muthukumar, and Jaren Anderson for reviewing this blog post. Great thanks also to the leadership team Alok Dhariwal and Ivo Dimitrov for their continuous support and guidance throughout this work.