Introducing and Open Sourcing Ambry
LinkedIn’s New Distributed Object Store
May 16, 2016
Media content has become ubiquitous around the web and almost all of Linkedin's new features interact with media in some form or the other. Profile photos, email attachments, logos, and influencer posts are a few examples of where photos, videos, PDFs, and other media types get uploaded and displayed to the end user. These media types get stored on our backend and are predominantly served by our Content Delivery Networks (CDN). The backend storage system acts as the origin server for the content served by the CDNs.
As LinkedIn traffic grew, the system that we were using to store media content traditionally had increasing scalability, availability and operational issues. Two years ago, we revisited the technology we were using and started on a journey to fix these issues. Ambry was the outcome of that effort. Since we started sharing data about this internal project back in 2014, Ambry has made significant performance improvements, in both latency and network efficiency. Also, we have given presentations to a few companies about the system and have seen great interest in adopting it for their use case.
Today, we are announcing that Ambry is now available as an open source project under the Apache 2.0 license. Ambry is optimized to store and serve media. Media content has become critical for any website to increase user engagement, virality and monetization. Media pipelines will need to be supported by more companies, especially with the advancement of video and virtual reality. Ambry can play a critical role in this future, and in the future of any company that is interested in diverse kinds of media to a global audience.
Ambry is a distributed immutable object store that is highly available and very easy to scale. Ambry is optimized to serve immutable objects of few KBs to multiple GBs in size with high throughput and low latency. It also enables end to end streaming from the clients to the storage tiers and vice versa. The system has been built to work under an active-active setup across multiple data centers (DCs) and provides very cheap storage.
We found that there were hardly any open source options that could meet our needs for horizontal scalability, availability, and active-active data center configurations. The distributed file systems we found were not designed to work well for small objects, sacrificed availability for stronger consistency, did not focus on real-time use cases and were hard to operationalize. There were few object stores available and they were not mature, not optimized for variable sized objects and did not meet our performance goals to serve real-time traffic. We believe Ambry satisfies all the design goals and could be a critical part in building media pipelines in the future.
In this post, we will explore our previous architecture and its limitations, dive into how we built and deployed Ambry, the media ecosystem at LinkedIn, open source, and outline the future plans for the project.
Media Storage in 2013
The previous system used at LinkedIn was called media server (for lack of a better name) and consisted of filers for media file storage and a monolithic Oracle database to store the metadata. These systems were fronted by some stateless machines running on SOLARIS that routed the request to the appropriate filer or database. The filers were NFS mounted onto these stateless machines and were accessed remotely using Java’s File APIs. The frontends interfaced with a set of caches within the data center (DC) that ensured lower latency and protected the site from outages due to performance issues or unavailability of the downstream systems (filers/Oracle).
As the demand for media increased at Linkedin, the legacy system had severe limitations in satisfying our requirements. We decided that it had to be replaced due to the following reasons:
- Frequent availability issues: The legacy system had latency spikes every time there was an increase in metadata operations on the file. The number of metadata operations increased when a lot of small files were accessed. Each file operation went through multiple levels of translations (Java, NFS, Filers) and made it hard to debug. We had frequent outages which forced us to add a caching layer to alleviate some of that effect.
- Hard to scale: The underlying systems we were using to store data and metadata were monolithic. It was impossible to scale the metadata horizontally and it required a lot of manual process to add more hardware for the storage of the data.
- Inefficient support for small and large objects: Media dataset consists of trillions of small objects (50KB - 1MB) and billions of large objects (1MB - 1GB). The number of metadata operations proved to be very expensive for storing and managing trillions of small objects. Also, the old system did not have any end to end streaming support for large objects and was a limiting factor to enable new product use cases.
- Bad MTTR (Mean Time To Repair): Most parts of the old system were largely in a black box, which needed us to get support licenses and be on call to explain issues and fix them. This affected our MTTR, which would sometimes extend to multiple days. It was really hard to debug and move fast with no insights into the system. It was equally hard to collect the metrics that we needed as we were limited to only the metrics that were supported by these black box systems.
- Expensive: The old media store was expensive and was cost prohibitive as we reached more scale. It was obvious that we cannot continue with the current solution if we needed to manage media at scale.
This convinced us that we needed to find a better approach. We took it as an opportunity to look at different end-to-end solutions. We looked at many possible solutions including distributed file systems, storage appliances, cloud, and an in-house implementation. We thought deeply about our design goals, understood the tradeoffs and were convinced to build an in-house solution that fit our needs better.
How does Ambry work?
Before we get into the design and internals of Ambry, it would be useful to highlight what the design goals were when we started.
Highly Available and horizontally scalable
The system had to serve real-time traffic and could impact the site directly due to unavailability. This meant that the system had to be highly available. We wanted to target a monthly uptime percentage of at least 99.95 percent. That translates to a downtime of no more than 20 minutes per month.
Another pain point that we wanted to alleviate with the new system was to make cluster expansion as seamless as possible. In most distributed systems, cluster expansion is really hard to get right mainly because there are many moving parts and topology is really complex. We wanted expansion to be really simple and wanted it to work the first time every time.
Low operational overhead
Distributed systems are hard to manage. It is so important to automate frequent cluster operations to ensure that the system does not become an operational burden. However, complex system design makes automation hard to implement and run reliably, which is why we wanted a system whose design was simple, elegant, and automated.
Lower MTTR (Mean time to repair)
Failure is inevitable in distributed systems. Software and hardware fail all the time and for unexpected reasons. Good testing does alleviate some of this, but failures are always going to happen. What is more important is to ensure that we can repair the fault quickly and have the sub component back up and running. This would require a system that has a simple design, no single point of failure and few moving parts. We wanted a system that was designed based on these philosophies to enable a really low MTTR.
Active-Active cross DC
LinkedIn has multiple data centers and we are constantly expanding our presence across the globe. This means that all our systems need to support active-active configuration. This is the ability of a system to enable updates to the same object from different data centers.
Efficient for large and small media objects
Media traffic is unique in the sense that it comprises of both small and large objects. Small objects are usually between 1K to 100K, while anything larger would fall under the large object bucket. It is usually hard to build systems that work well for different object sizes. The system would need to support trillions of small objects and billions of large objects. Large number of small objects tend to have high metadata overhead, create disk fragmentation and need a lot of random IO. Large objects need good memory management, end-to-end streaming and bounded resource utilization. We needed a system that can work well for both cases.
Finally, media tends to be storage bound. They quickly manage to take up a lot of storage because of their size. Another characteristic of media is that old data becomes “cold,” which means it does not get accessed much. There are many cost optimization techniques such as using denser hardware, tiered storage, erasure encoding and deduplication for these types of workload. We wanted to ensure that the system that hosts the media content can work efficiently on denser machines and has a design that makes it possible to apply the other cost optimization techniques easily.
10,000 feet view
At a high level, Ambry consists of a set of data nodes that are responsible for storing and retrieving data, front-end machines that route requests after some preprocessing to the data nodes, and a cluster manager that coordinates and maintains the cluster. The data nodes replicate data between themselves and supports both intra and inter data center replication. The frontend interacts with the data nodes in the remote data center when read-after-write consistency is required. The frontend provides an HTTP API to POST, GET, and DELETE objects. Alternatively, the router library that the frontend uses can be used directly by clients for better performance. At LinkedIn, these frontend nodes act as the origin servers for the CDN.
Ambry is a handle store. This means that an object id is returned to the caller when an object is PUT into Ambry. This greatly simplifies the internal system design and helps to keep everything decentralized. The id has the appropriate information encoded in it that helps to locate the object on a GET. This also means that objects that are PUT in Ambry are immutable. It is trivial to build a system on top of Ambry that provides a key-value access pattern and simulates mutability.
Ambry supports a REST API that is applicable to most of the use cases. However, there are some scenarios where better performance is required. In such cases, Ambry also supports using the router library in the client to directly stream bytes to and from the data nodes. The router library is currently blocking (synchronous) but we are actively working to provide a non-blocking (async) version. We also hope to provide multiple language support for the router library.
The REST API typically looks like this:
You can read more about the API here: https://github.com/linkedin/ambry/wiki/Rest%20API.
The clustermap controls the topology, maintains resource states and helps coordinate cluster operations. There are two parts to the cluster map:
- A hardware layout that contains the list of machines, disks in each of the machines and capacity of each of the disks. The layout also maintains the state of the resources (machines and disk) and specifies the hostname and ports (plain and SSL) that can be used to connect to the data nodes.
- A partition layout that contains the list of partitions, their placement information and their states. A partition in Ambry is a logical slice of the cluster. Typically, a partition has a numeric ID, a list of replicas that can span across data centers and a state that indicates if it is available for writes. Partitions are fixed-size resources and any data rebalancing across the cluster happens at the partition level.
The data nodes and the frontend servers have access to the clustermap and use their current view at all times to make decisions such as choosing an available machine, filtering down replicas and identifying location of an object.
A storage node hosts replicas for different partitions. Typically, each storage node has N disks across which the replicas are distributed. The structure and management of these replicas are the same.
Each replica on disk is modelled as a preallocated log. All new messages get sequentially appended to the log. A message comprises of the actual object chunk and associated metadata (system and user). This helps writes to be of very high throughput and avoids fragmentation on disk. The log serves two purposes - it is used to store the actual messages and also acts as the transaction log for each write operation. An index is used to map object ids to messages in the log. The index itself is a set of sorted file segments ordered from most recent to the oldest entries for efficient lookups. Each entry in the index maintains the offset of the message in the log, message attributes and some housekeeping fields. Each segment in the index also maintains a bloom filter to optimize the time spent doing actual disk IO. The persistence layer provides a tunable configuration to control when data gets flushed to disk. By default, the system is configured for better performance at the cost of durability.
One of the important design challenges for supporting large objects is to ensure that all objects are streamed end to end and memory usage is bounded. All reads at the store do a zero copy of the bytes from the log to the network by using the sendfile API. This enables better performance by saving on extra system calls, ensuring that the bytes are not read into user memory and eliminating any need for buffer pool management.
Recovery is probably the hardest to get right in a stateful system. Recovery is needed because systems and machines crash or data on the disk can get corrupt. On startup, the storage layer reads the log from the last known checkpoint and restores the index. The recovery also helps to restore an in-memory journal (more on that under replication). The log is the source of truth and is permanent. This helps to ensure that the index does not have to store the actual object which could make the index lookups very inefficient and provides the ability to restore from time zero if needed.
The storage node is also responsible for maintaining the replicas of a partition in sync. Every node has a replication service that is responsible for keeping the replicas in the local store in sync with all the remote replicas. The protocol identifies objects that are missing between two replicas and fixes them. Numerous optimizations have been done to ensure that replication is performant and reliable. Some of the key optimizations include batching partitions between two nodes to maximize throughput, using separate thread pools for intra and inter colo replication for better isolation, maintaining an in- memory journal in the store for most recent objects, and prioritizing lagging replicas to speed up disk repairs. The in-memory journal is used to optimize the lookups for the most common case where the replicas are all caught up and they are just fetching newer objects. Replication also generates a change capture for every object that gets replicated. These events are then used offline to perform auditing.
The frontend servers provide a HTTP interface for clients to communicate. They are also responsible for setting the right headers for the CDNs, doing security validations (antivirus, objectionable content detection, etc) and streaming the object to the router library and clients.
The router library contains the core of the request management logic and is embedded in the frontend. Alternatively, it can be used as a standalone library directly by the client for better performance.
The end to end lifecycle of a request is managed by the router. The router handles put, get and delete requests. For each of these request types the router tracks the number of successful and failed responses from the replicas to decide on a quorum, maintains chunk states, generates object Ids and invokes callbacks on success or failure.
Large objects are broken down into chunks and each of the chunks are routed independently across all partitions. Each of these chunks have an id to uniquely identify them. The router generates a metadata object that contains the list of chunks and the order in which they need to be fetched. The metadata object is stored as a separate blob and its id becomes the id of the blob. On a read, the metadata blob is fetched and the chunks are retrieved and served to the client. The router pipelines the request for these chunks to ensure that the next chunk is always ready to be sent to the client. The router also does memory management to ensure that the number of chunks that are in memory is fixed and configurable.
This is another critical functionality of the router that ensures high availability. The failure detection logic is responsible for proactively identifying resources that are down or in a bad state. The resources could be machines, disks, or partitions. Machines can fail, disks can stop working and partitions can be unavailable due to the process being down. The router marks the resources as unavailable to ensure that subsequent requests do not use them. On a put, only healthy partitions are chosen. On a get, the failed resources are ignored and requests are sent only to the healthy ones. The failure detector piggybacks on the actual request and uses a heuristic to identify down resources and proactively verifies if the resources are back to a healthy state.
Ambry implements a multi-master strategy for writes and reads. This enables higher availability and reduces end-to-end latency by eliminating an extra hop which is typically present for a master slave based system. Requests are typically sent to M replicas and waits for at least N successful responses where N <= M. The router prioritizes the replicas in the local data center to send the requests to. It proxies the request to remote data centers if the quorum cannot be achieved locally. In practice, puts and deletes require a stronger quorum for ensuring strong durability. Reads usually happen from a single replica but multiple requests are sent on failure to reduce the 99th percentile latency.
The router also generates a change capture on every successful put or delete. The information in the change capture includes the blob id and the metadata associated with the blob. This information can be consumed by downstream applications to build their products. This could be a search service that updates its media index, a relevance system that needs to rerun its algorithms or a feed system that needs to display all the new media content.
Let us look at how a simple put and get operation work. These operations are more complex than what is depicted here but they are made simple for the sake of readability.
The client streams the object along with some metadata to the frontend. The frontend chunks the object as the stream arrives, chooses an available partition, generates the blob id for the blob or chunk and sends the request to W replicas. The frontend then waits for at least Q (<= W) successful responses before returning the blob id to the client. If the quorum did not succeed, the frontend reports an error. We have made this even better by choosing another partition when a quorum fails. This helps us to achieve even higher availability during puts and withstand more downstream system failures.
Client requests for a blob by sending its id to the frontend. The frontend determines the partition from the id and retrieves the chunks pertaining to the blob from the data nodes. For each chunk, the frontend sends R requests in parallel to individual replicas. The frontend waits for Q (<=R) successful responses before sending the blob or chunk to the client. In practice, the only reason the requests are sent in parallel to multiple replicas of the same partition is to reduce the 99th percentile latency. This however adds more load to the data nodes. We hope to add future optimizations to send the subsequent requests only after a fixed timeout. This would ensure that in most scenarios we do not have to incur increased load on the data nodes.
Operationalization is hard
Anyone who has experience in building distributed systems would be aware that the easiest phase in the entire lifecycle of a system is the coding phase. The operationalization phase (making the system actually work in production) is the hardest and needs tooling, measurement and extensive testing to ensure things work as expected. We did all of this and were able to proactively identify issues and fix them. This helped us ship high quality software with aggressive deadlines.
We incubated and open sourced Simoorg, a distributed failure inducing system that induces arbitrary faults in the cluster. We pushed continuous traffic in our test clusters and verified correctness of the system by injecting failures such as GC pauses, disk errors, node down and network partitioning. We were able to identify critical bugs through this process and fix them proactively. We would have identified most of the issues only in production if we had not invested in such a framework. We are also happy to note that Kafka, another popular open source project, also leverages this framework within LinkedIn.
Correctness test in production
This is another important testing procedure that is not followed by many systems to ensure their health in production. This is specifically helpful when a new version of the software gets canaried (deployed to subset of machines for validation) in production and engineers have to evaluate the health of the canary. The correctness test ensures that correct results are achieved by stressing all the available APIs with all possible combinations of input arguments. This helps us to proactively identify issues during canary instead of waiting for the clients to discover problems with those APIs at unpredictable times, with much worse consequences.
Most of the distributed systems are nontrivial to manage and it is important to have a really good set of admin tools to manage the cluster. We invested time right from the start of the project to identify all the critical tools that we would need to ensure that managing the cluster was really really simple. Some of the important admin tools that we built include adding more partitions and replicas to the system, data dump tools to deserialize the data on disk for debugging purposes and marking resources as down for maintenance.
As specified previously, each of the storage nodes emits a replication event when a new blob gets written to the disk. This event contains information about the blob and the source of the event. All of these events across all storage nodes are aggregated in Hadoop today. This makes it possible to audit whether replication actually wrote the blobs to all the replicas. However, this is not real-time today. We plan to build a real-time auditing system by consuming the events that are generated and reporting on the blobs that have not been successfully replicated. This would give us very fine grained insights into the state of the replicas for a blob.
Metrics and alerts
Every system needs to have relevant metrics and alerts to identify anomalies in the behavior of the system. This is very obvious but rarely gets done correctly. It is important to ensure that the right metrics are used for tracking, the number of metrics do not go overboard and the right alert thresholds are chosen. The entire set of metrics and alerts can be pretty useless without having a disciplined approach to tracking and measuring them. We took a lot of care to do this right (or so we believe) and this has helped us immensely to stabilize the system at a more accelerated pace.
How did the migration go?
We had to migrate all the media content from the legacy system to Ambry while serving live traffic and not introduce any downtime. In addition, we had to meet some hard deadlines -
- The company was migrating all services off Spring RPC. We had four months from when we started building Ambry to support the new APIs and remove Spring RPC.
- A new data center was being set up and we did not want to deploy the legacy system there at a very high cost. This meant that we had eight months to finish Ambry in order to avoid deploying the legacy system.
- Finally, we wanted to decommission the Solaris operating system from our data centers and the legacy system was on it. This had a deadline of one year.
We decided to build Ambry in a way that we could achieve all these milestones. We started by building the frontend and proxying the request to the old system. We then migrated all the clients to use the new frontend. This was a huge effort but we were able to achieve the first deadline.
The next step was to get Ambry working end-to-end and have it setup only in the new data center. We then migrated all the data from the old system to Ambry. We added logic in code to ensure that we fell back to the old system in the remote data centers on failure in the new data center. This was going to incur more latency but we decided to take the risk. Essentially, Ambry behaved as a cache for the new data center. We were able to successfully meet the second deadline with this architecture.
We took time to operationalize and stabilize Ambry for the next few months after the new data center was launched. Once we were really confident about the system based on all our testing and auditing, we decided to pull the plug on the legacy system. We were able to achieve this within the one year deadline.
We have completed another year since the migration and Ambry has been the only source of truth for media at Linkedin. This success was only possible due to careful planning and incremental development of the infrastructure.
Where does Ambry fit in Linkedin’s media ecosystem?
Media infrastructure is the end-to-end pipeline for media which includes upload, storage, processing, metadata management and download of the content. Ambry is a critical piece of the infrastructure and it was really important to get the foundation right. With Ambry in place, we now continue to expand and focus on the rest of the ecosystem around it. We are actively building this space out and are always looking to hire great systems engineers who can help us build a world class media infrastructure.
Open Source is the way to go
We are very happy to announce that we have open sourced Ambry (https://github.com/linkedin/ambry). We believe this system will be immensely useful to the community to support real-time upload and serving of media content. We have detailed documentation available at https://github.com/linkedin/ambry/wiki and we hope to constantly improve them. Your feedback will be really useful to make this project better. If you are interested in contributing please look at https://github.com/linkedin/ambry/wiki/Development%20Guidelines. Please reach out to us if you have any issues using Ambry or questions w.r.t contributing to the codebase at firstname.lastname@example.org.
SIGMOD in June
We are super excited to also announce that a paper on Ambry has been accepted in SIGMOD 2016 as part of the industrial track. We would like to thank our intern Shadi Abdollahian for spending many hours in coming up with the proposal and getting it published. The conference happens in June 2016. You can get more information here.
We are working on a lot of cool things as I write this post. We are currently focussed on making our frontend and routing tier non blocking as well as making the storage nodes rack aware. We hope to continue adding features to Ambry and build a very active open source community around it. Some of the current and potential projects are specified below. You can see the complete list here.
Blocking request typically holds on to a thread till the request is complete and does not support pipelining. To enable very high throughput and avoid resource starvation due to large objects, we need to make the router and frontend completely non blocking. This will enable us to support much larger throughput and will further increase availability since we are not thread resource constrained to perform operations. The frontend implementation is complete and testing is in progress. The router library will be code complete very soon. You can take a look at the repository for following the updates.
Modern data centers have single top of the rack switches to reduce cost. This means that the software needs to have the intelligence to overcome switch failures. We are building functionality in Ambry that would ensure that replicas of new partitions are placed across the data center in a rack aware way. This is currently in progress and you can follow the conversation on GitHub.
Currently, Ambry does not support the notion of a namespace. Namespaces are really useful to enforce controls at a group level. We would like to introduce the notion of a bucket or container in Ambry. This will help us define user groups, access controls, quotas at the bucket level which is much easier to maintain than at the object level.
Ambry currently supports encryption between data nodes. It also enables encrypted communication between the frontend and data nodes. However, we would like to make more progress on security. We would like to support authentication and authorization as well as support encryption at the REST level. We hope to start working on this once we have implemented buckets/containers.
This has been an amazing team effort from design to launching Ambry in production. We hope to actively develop Ambry in the open and help the community leverage it to build their applications. The engineers who were involved in this effort include Priyesh Narayanan, Gopalakrishna Holla, Sivabalan Narayanan, Casey Getz, Ming Xia, Tofig Suleymanov, Arjun Shenoy, Dmitry Nikiforov, Tianwei Li, Hardik Kheskani and Jay Wylie. It would not have been possible to conceive and execute successfully without the immense backing and support from Mammad Zadeh. We would also like to thank the rest of the management who have constantly been a source of encouragement and support including Greg Arnold, Alex Vauthey, Swee Lim and Igor Perisic.