How we improved latency through projection in Espresso
March 5, 2020
Espresso is LinkedIn’s document-oriented, highly available, and timeline-consistent distributed datastore. Clients, such as services fetching profile data, read a subset fields of whole documents from Espresso in different use cases for reasons related to performance and costs. For example, one use case is reading only a member’s first and last names, while another use case could be reading a member’s positions and education background.
In this blog post, we will share how we implemented a projection feature used by clients to select a subset of fields of a document stored in Espresso. From there, we will share a case study of rolling this feature out to one of the most critical services at LinkedIn—the identity services, which manage member profiles and privacy settings—and celebrate the ROI observed in improved latency and reduced cost to serve.
Let’s start with a quick introduction to the various technologies used in this post.
Apache Avro is a data serialization system. It uses schemas to specify the structure of the data being encoded. Different from other serialization systems such as Protocol Buffer and Thrift, Avro schema definition does not come with tag numbers. To parse Avro binary, a schema is needed to interpret the data. Avro has concepts of reader’s schema (schema used when decoding the data) and writer’s schema (schema used when encoding the data). Through its schema resolution system, Avro is able to resolve the difference when the former is different from the latter. This will become important in our discussion of Espresso projection.
Rest.li is an open source framework developed at LinkedIn for building RESTful web services. Rest.li uses Pegasus to specify the schema of data models referenced in service API definitions, and supports the notion of projections at the API level.
Espresso is a document-oriented distributed datastore developed at LinkedIn. The diagram below gives a high-level architectural overall of the database design.
There are three main parts of Espresso: routers, storage nodes, and custer management. Routers receive requests from online clients and forward the requests to storage nodes. It does so by inspecting URI to get the database name and record key, applying hash function to determine partition, and looking up the routing table for appropriate storage nodes to send requests to. It also assembles the results and sends it back to clients. Storage nodes are the building blocks to scaling Espresso horizontally. A node stores and serves documents; provides transaction support for documents with the same partition key and secondary index; and maintains commit log for replication. Espresso uses Apache Helix to monitor and maintain the cluster.
In standard SQL databases, data is organized in records consisting of different fields, and the SQL Select statement is used to pick the fields that a client wants in a query. Field are top level parts of a table and the structure inside the fields is usually opaque to the database.
A document-oriented store like Espresso is schema-aware because it stores rich structured documents with named structures at multiple levels, addressed via a unique key (either single or composite). This naturally facilitates many key features such as secondary indexing, partial updates, and projections of fields within documents. Field projection will be the focus of this post. There are many reasons for why support for field projections is necessary:
- Performance. Requesting and transferring more data degrades performance across multiple aspects. It takes more time to transfer the data over the network, to encode and decode the data on both sides, and to convert the deserialized data into the appropriate formats clients expect.
- Cost to serve. Transferring more data consumes more network resources due to increased usage of bandwidth. In addition, due to degraded performance, both clients and database servers will be spending more CPU cycles in each request, resulting in more capacity required for the same traffic.
- Access control. Projection support provides a way for applications to support access controls to various fields, especially for those applications maintaining critical data such as member profile data. Access control prevents data from being freely accessible by any applications which can call a service serving that data, providing an additional layer of security for member privacy settings.
Design and implementation
To support this feature, there were several hurdles to be addressed:
- How to retrieve a partial Avro object from the original object
- How to internally represent Avro projections
- How to translate Rest.li projections into Avro projections on the client side
- How to get the client to read the projected Avro binary
We will explore each challenge in this section.
Retrieving a partial Avro object
The first question to be addressed was how we should “project” the original object by selecting a subset of values from the original object. In our implementation, we utilize Avro’s schema resolution mechanism. As briefly noted earlier, Avro has both reader’s schema and writer’s schema concepts. An Avro implementation can read the data written in writer’s schema using a different reader’s schema. If the schema evolution is backward compatible, a newer version of the schema can be used as the reader’s schema and an older version can be used as the writer’s schema. For example, the reader’s schema can have an additional field with a default value. But it cannot have an additional field without a default value, since the newer schema will not be able to read data written by an older schema. Conversely, a newer version of the schema can be used as the writer’s schema and the older version can be used as the reader’s schema if the schema evolution is forward compatible. For example, the writer’s schema can remove a field with a default value, but cannot remove a required field without a default value since the older schema will not be able to read data written by the newer schema.
This means when a client specifies a subset of fields (discussed in the next section), we can generate a new Avro schema from this subset of fields as a reader’s schema. For fields not in the reader’s schema but in writer’s schema, the values of the fields are skipped during schema resolution. This generated schema is cached so we don’t have to repeatedly compute the same schema. Specific to Espresso, the storage node fetches the whole Avro document, generates the reader’s schema, and performs schema resolution to produce the projected Avro document. Only the projected Avro document will be sent back to the router.
We made a few performance improvements in this step. These improvements are implemented in a library shared by both storage nodes and Espresso clients, so it helps upconversions and down-conversions in both places when the reader’s and writer’s schemas differ.
- Avro’s Java implementation provides a class called GenericDatumReader providing the capability of converting Avro binary into a GenericRecord, which is an instance of a record schema containing the actual data easily accessible by field name and indices. While we could convert the original Avro binary into a generic record first using the reader’s schema and then convert this generic record into Avro binary again for the client, this would be inefficient. Instead we implement direct copying from Avro binary to Avro binary. The same parser decoder used to construct generic records can be used to drive the copying to directly generate Avro binary.
- When generating the projected schema, we do not reorder the fields. In this way, we can process or skip fields without any reordering.
- When generating the projected schema, Avro’s default behavior is to generate symbols per thread per schema. This initially created a lot of memory pressure when we ramped up the feature and caused OutOfMemory problems. We optimized this by caching the generated schema in memory shared by all threads.
Representing Avro projection
Avro specification doesn’t prescribe any way to represent projections. Internally, an Avro schema consists of a list of fields that can reference schema types to represent a tree-like structures. To make it easy for clients to specify projections, we use the dot symbol. For example, the code below shows a snippet of the Profile schema in Espresso.
The schema depicts that a profile has a first name of various locales and a map from position ID to a position. To only get the first name and the title of all positions, we just need to specify a list of paths: firstName, positions.title. We do not need to use any wildcards to specify the title from all positions because Avro’s internal field representation does not need it. This also applies to arrays and unions. Using the dot syntax, we can specify the needed fields at any arbitrary level. This is also very straightforward for the clients to use.
Translating from Rest.li to Avro
Rest.li itself comes with projection support. For example, the below is a snippet of the Profile schema used by the service serving profile data. Similarly to the Espresso schema, it defines three fields: firstName, lastName, and positions (with the Position schema defined separately).
In Rest.li, we do need to use wildcards for maps or arrays etc. The format could be different depending on where the projections are represented. For example, to specify the first name and titles of all positions when invoking a service on a command line, we will use something like firstName, positions:($*:(title)), where “$*” is a wildcard mask, meaning that it will get the title field for all positions in the map. The translation is actually very straightforward. Because Avro schemas don’t project slices of arrays or parts of maps, our schema projector doesn’t really need the wildcard. We will just remove the wildcard in the in-memory presentation (which is simply a star symbol).
Reading projected Avro binary
After all the work done on the Espresso side to return the projected Avro binary, the client must deserialize it and construct a GenericRecord out of the binary for further in-memory processing. In our implementation, the client repeats what Espresso does by generating and caching the projected schema. This piece of logic stays in a shared library that can be used by both Espresso and all clients. The client then invokes Avro’s datum reader to convert it into a generic record.
In a nutshell, the overall flow is as follows: the Espresso client receives a request from its own client (e.g., some frontend application) with a projection specified under the Rest.li framework. It then converts the Rest.li projection into Avro projection we defined, and sends the request with Avro projection to Espresso router. Espresso router inspects the URI, and forwards the request to appropriate storage node(s). The storage nodes fetches corresponding records in Avro binary format, generates the projected schema (or gets it from the cache), and performs schema resolution to generate a projected Avro binary directly. This data is returned to a router which in turn returns it back to the client. The client finally generates the projected schema (or gets it from the cache) using the same library shared by Espresso, and converts the binary Avro to a generic record easy for in-memory processing.
Next, we will describe a case study where we apply the use of Avro projection to identity services.
LinkedIn employs a microservices architecture to deliver most member experiences. One of the most critical services is identity service, which serves members’ profiles and privacy settings.
The software architecture of the identity services stack
The services in this diagram can be grouped into four categories. On the left side are 150+ clients requesting data, such as member profiles. An example of a client is the frontend application handling profile pages. On the bottom is Espresso, providing a scalable document store. In the middle, we have two services: identity and identity-mt. Identity is a service on top of Espresso exposing raw CRUD operations that defines various tables in Espresso for different entities, such as profiles, settings, privacy settings, etc. Identity-mt is a mid-tier service implementing business logic related to profile queries. It is also the place where member privacy settings are enforced. In order to do so, the mid-tier service calls a number of downstream services to fetch information such as invitation status, network distances, etc., shown on the right side of the diagram. The identity services are among the most scalable systems we have built at LinkedIn. At its peak, the identity service serves close to one million QPS. Almost all major features on the LinkedIn platform rely on this service to serve member data.
Because of the massive scale and the critical functionality of the system, we’re constantly looking for ways to improve its quality of service (measured by reliability, latency, etc.) as well as to reduce cost to serve. Before the Espresso projection was available, identity had to fetch full records from the database and filter them through other mechanisms. This has caused a few issues in the past that we knew could be resolved once and for all.
Let’s start with latency. A profile object has over 90 top level fields, with many fields being of complex types. Those complex types may contain many more other subfields. For instance, the education field is an array of type Education, which itself contains fields such as school ID, start date, end date, etc. If a client only asks for a few fields, such as first name and last name, it makes more sense to fetch the whole document.
Next, let’s discuss the impact on reliability. All well-designed systems impose some limits to protect themselves from abusive clients. This applies to both Espresso and our service infrastructure, Dynamic Discovery. Both impose a 2MB limit to response sizes. If the response size is larger than 2MB, a 500 error is thrown instead. This implies that if a client asks for more than one profile (perhaps up to 10) in a batch call, even if they only need first name and last name, we would fetch the whole profiles for all of them, potentially exceeding 2MB and resulting in a bad member experience. Configuring the limit is an option, but is not the most desirable solution as profile size can continue to grow.
Finally, there is the cost-to-serve issue. The more data we fetch, the more data we need to transfer over the network. This requires more bandwidth and incurred costs. Given the sheer scale of the service, we knew that fixing this issue could potentially have dramatic effects in reducing the cost to serve.
Applying Avro projection
While we had implemented some temporary hacks to mitigate the issues described earlier (e.g., falling back to single parallel gets if the combined response is larger than 2MB), we knew utilizing the new Espresso projection feature was the right way to go for a long-term fix.
First, we needed all clients to stop sending queries that requested for whole profiles. This was not a trivial task for a service with 150+ clients. We achieved this via a horizontal initiative—a process within LinkedIn that runs every quarter for technical initiatives that require significant effort from multiple teams. After this initiative, we implemented a request blocker to make sure all requests containing field projections were represented as Rest.li projections.
Next, we translated the Rest.li projections into Avro projections expected by Espresso as described earlier.
We saw significant savings in latency and cost to serve alongside the feature ramp.
Mean response sizes
The graph below shows how much the mean response size of calls to profile-get has changed since ramping Avro projections. The mean response size dropped from ~26KB to ~10KB at 50% ramp, and from ~10KB to ~7KB at 100% ramp. This improved latencies both directly and indirectly. The services spent less time transferring bytes over the network. In addition, the services spent less time serializing and deserializing the bytes, both processes that consume memory and CPU cycles.
Latency in identity for fetching profiles
The following table shows a week-by-week comparison of average latencies at different percentile values in identity for fetching profiles. We saw latency reduction across all percentiles except p50. At p99.9 and p99, there was close to a quarter of a drop in latency. Similar latency benefits were observed in other endpoints such as privacy settings.
|Percentile||Average Latency Without Projection (ms)||Average Latency With Projection (ms)||Latency Change|
Latency in identity-mt for fetching profiles
The following table shows a week-by-week comparison of average latencies at different percentile values in identity-mt for profile fetches. The savings are more moderate compared to identity, mainly because identity-mt also calls many other downstream services as illustrated in the architectural diagram. However, we still saw a 8.85% latency reduction at p99.
|Percentile||Average Latency Without Projection (ms)||Average Latency With Projection (ms)||Latency Change|
413 errors in identity-mt
413 errors in identity-mt indicates that the response size is too large (>2MB in this case). The following table shows we almost eliminate such errors. The remaining 0.035 errors per second was caused by some legacy endpoints that hadn’t been migrated to use projections yet.
|Count Per Second Without Projection||Count Per Second With Projection||Error Count Change|
Thanks to Carol Dmello for supporting the feature ramp and debugging on the SRE side; Zac Policzer for helping debugging memory issues; Chris Gomes for initial work on enabling the feature on identity services; Shun-Xuan Wang, Jean-Francois Desjeans Gauthier, Banu Muthukumar, and Jaren Anderson for reviewing the article; and Sriram Panyam and Bef Ayenew for their support on this project.