Rebuilding messaging: How we bootstrapped our platform
July 21, 2020
Messaging has been a core part of Linkedin since the day we launched and our system behind it has evolved tremendously over the past 17 years. However, the architecture of our messaging backend largely remained unchanged until a recent ground-up overhaul. In the second part of our “Rebuilding messaging” series, we’ll talk about the large-scale migration of existing data to the new database, or as commonly referred to, our journey bootstrapping data from our legacy system to the new system.
For context, we’ll quickly touch on a few of the motivations for the re-architecture:
- The backend was built as a monolith with no clear separation between data-access and business-logic layers. As business logic changed, those changes often bled into the data access layer due to lack of isolation.
- We accumulated a large amount of legacy code that not only made it impossible to improve the system as a whole, but also slowed down new product feature development.
- Due to the nature of the monolith, there were pieces of business logic contributed over the years from engineers from partner teams that the core team lacked context around. These pieces were very critical for the business.
And so, we embarked on an initiative to build a brand new messaging system designed with the following principles:
- Build the right storage platform: Trust and privacy are our core tenets at LinkedIn. The new platform should include privacy and security requirements such as full encryption of confidential data in-flight and at rest.
- Speed, scale, and consistency: It should be the fastest and most reliable message delivery platform in the market.
- Support plug-in capabilities: The platform should be extensible so that developers can write plugins at various message delivery lifecycle stages to execute custom business logic.
- Support new use cases: The new platform should allow developers to build new features and iterate over them quickly.
Below is a simple representation of any persistence-based messaging application or web-based application that supports read/write in general. For the purpose of illustration, the diagram greatly simplifies the data flow that happens when member A sends a message to member B.
When member A sends a message to member B, the message travels via a backend service and then is persisted in the database. Member B then reads the message via the read API exposed by the service. In this blog, we will be referring to the old backend as “old-system” and the new backend as “new-system.”
Fundamentally, a messaging platform’s persistence layer comprises shared data and personal metadata associated with each message. For instance, when member A sends “Hi” to member B, the shared data is the actual message content, “Hi.” The time at which member B reads it is member B's personal data. If member A marks the message as important or starred, that’s considered member A’s personal metadata (not visible to member B) for this message.
In the database, we can either normalize the data or store it denormalized. Old-system chose to store the data denormalized, and the general benefit of this was improved read performance by eliminating the need for joins. It also provides complete isolation between the sender and recipient’s copies of the message. In the above example, both member A and member B have their own records in the database with the text “Hi” duplicated in each of their records. However, as the system grows, and as conversations increase in the number of participants, this approach soon becomes costly with a lot of duplication. Additionally, providing features, like editing and deleting messages, with low latency is almost impossible, since every participant’s copy needs to be modified.
To address these issues in new-system, we chose to normalize the data. This means that shared data and personal metadata are stored in separate tables, and personal metadata has foreign keys back to the shared content. In the above example, there’s a single copy of the message “Hi,” and member A and member B each have their own personal metadata records which have a pointer to the record ID of the “Hi”’s.
A similar pattern of shared and personal data is seen in conversation-level data. Properties such as titles are shared, but the number of messages that a participant hasn’t read in a conversation is personal metadata.
We spent several quarters building out all the new microservices needed for new-system. Next, all existing messages in old-system had to be made available to read and update in new-system. At LinkedIn’s scale, having downtime to bootstrap the new databases was not an option. So we had to come up with a way to migrate billions of messages while millions of new ones were added and updated daily by members on the site.
We chose the classic approach of dual-writing/updating to both the old and new systems while serving read traffic only from old-system. We rolled out this change of dual-writing/updating and gradually started ramping the approach to members. Once 100% of live write traffic was being continuously replicated to new-system, we could take a snapshot of the data in old-system and move the 17 years’ worth of existing messages over to the new-system via offline flows (we chose Hadoop). I’ll talk about this in more detail in the next few sections.
As briefly touched upon earlier, a prerequisite for bootstrapping is the dual-writing of live traffic. Once that was set up, the bootstrap was performed by running massive Hadoop jobs and uploading the data into online databases.
The migration was executed in three phases. The goal of the first phase was to replicate every write and update action performed by the user on the site to the new-system in real time. The second phase was focusing on finding and reserving a place for every existing historic message from the old-system into the new-system. In this phase, we generated ID mappings between the two systems. The third phase took every record in the old-system’s databases written since day 1, applied schema transformations, and uploaded them to new-system’s databases at the ID chosen in phase 2. The three-phased approach allowed us to break a large problem into more manageable ones.
Phase 1: The dual-write
We called this phase online replication. As mentioned above, the end-objective for this phase was for every write and update action by the members to be replicated in (almost) real-time to the new-system. If not, new data written during and after the offline bootstrap would not be replicated to new-system. We built robust retry mechanisms that guaranteed eventual replication. The replication writes were completely transparent to the sender and recipient as they were asynchronous, and recipients continued to read from old-system’s databases.
Phase 2: ID generation and mapping
As part of phrase 2, a new field was introduced in the old-system’s database to store the ID of the message in the new-system. We generated synthetic new-system IDs for every single message and populated this field. Every message in the old-system maps to a corresponding conversation ID and message ID in the new-system.
conversation ID, as the name suggests, represents the conversation a message belongs to. We chose UUIDs to generate conversation IDs so that generation could be done across offline and online systems uniquely.
One of the most important aspects of the conversation ID generation utility is that it has to be deterministic and consistent. To be deterministic means that a given conversation in the old-system should always generate the same new-system conversation ID every time it’s called, and that the offline flows that generate these IDs are idempotent since they can fail and be retried. The other requirement is consistency across online systems that dual-write as well the offline systems that bootstrap. Not doing so could split a single conversation into multiple conversations in the new-system. Let’s say member A and member B have been messaging each other since 2010 and have 5 messages in conversation T1 in the old-system. We begin the bootstrap process of generating new-system IDs, the bootstrap process running Hadoop maps T1 to say conversation UUID1 and message M1 in the new-system. Since this is an offline job, it could take a while to complete. While the job is running, member A decides to send the 6th message to member B. The 6th message is replicated to the new-system via the online dual-write logic as mentioned above. The key is to make sure that the online dual-write logic also uses UUID1 as the conversation ID for message 6. If it generates a brand new UUID for the conversation, we will end up with two disjoint conversations between member A and member B in the new-system.
To solve this problem, we used version 5 UUIDs which essentially hashes some information about the conversation in the old-system to generate the same UUID in both offline and online flows. Hence, the 5 messages being migrated via offline flow and the 6th message being dual-written online get the same conversation UUID in the new-system.
Phase 3: Transform and bulk upload
Once the IDs were in place, we took a snapshot of the old-system database and ETLed it to a secure Hadoop cluster. We then ran a series of Map Reduce jobs to transform the old-system schema over to the new-system schema. The output records when then bulk uploaded into the new-system databases.
The schema transformation blue box above is a complex framework we built as a Hadoop application. The diagram explains it in detail.
Messaging at Linkedin has several features and use cases. Each use case has its own set of business rules during schema transformation. In order to fully parallelize the work of each business use case, we designed a framework in which our engineers can easily plug in any particular conversion logic independently. These are described as “transformers” in the diagram above.
As mentioned earlier, snapshots of old-system tables were ETLed to a secure Hadoop cluster. The various old-system tables were then joined into a composite structure that contains all fields necessary to derive records in the new-system schema. These composite messages were sent to a series of transformers. The output of each transformer was a database upload request object (the actual upload to the database hasn’t happened yet). Every transformer could optionally modify the output of a previous transformer or generate its own upload request object. The upload requests were persisted as files on HDFS.
Because old-system was denormalized (senders and recipients have their own copies), there are “n” old-systemCompositeMessages, one for each participant. These had to be normalized to one copy in the new-system for shared content (like the actual message text, conversation title, etc.). Hence, the output of SharedContentTransformers were reduced by “dedupers,” which could accept custom dedup keys based on use case.
Once transformers and dedupers wrote database insert request payload as files on HDFS, Database storage nodes pulled them from HDFS via high-throughput secure connections.
Once we uploaded the billions of records into the new-system, we were ready to switch live traffic over to the new-system. Before doing that we had to make sure the data was accurate. It was absolutely necessary to verify that members would be able to continue viewing and interacting with their mailbox as they had done in the old-system. To solve this problem, we built a sophisticated shadow verification system.
When member B opens a message on LinkedIn, the message is served by old-system backend services and old-system databases. The shadow verification service (depicted above) spins off an asynchronous process that reads the same message from new-system backend service. The message from both backends are normalized to the same client models and compared field by field, recursively. If messages from both backends do not match, it prints a tree-like log clearly articulating the fields that failed to match and the data mismatched. It also sends the failures to LinkedIn’s metric service, and the resulting metric acted as the gating factor to launch the new-system to all members.
We managed to drive the verification error metric to zero, but soon realized that verification was only being processed for messages that members had read, which is mostly the recent messages. We wanted a way to verify that the billions of messages (over 17 years’ worth) had been verified too. Since it was not scalable (nor actionable if there’s a verification failure) to verify every single message, we sampled a few million messages by year, type, geographic location, etc.
Before uploading 17 years’ worth of messages, we ran our schema transformation mappers for the sample and uploaded them to the new-system. We extended the shadow verification service to trigger for all messages supplied via file (the sample) rather than members having to request them on the site.
So the development cycle was the following:
- Write/modify schema transformers
- Run schema transformation map-reduce jobs for the sampled messages in the old-system snapshot
- Upload the transformed messages into the new-system databases
- Trigger shadow verification
- Monitor/inspect for verification error metrics and logs
- Repeat steps 1 to 5 till verification errors are within SLA
This strategy allowed us to vet our schema transformation business logic before we spent resources and time uploading billions of messages into the new system.
The final bulk upload
After the transformers were accurate, we ran them against a complete old-system snapshot which included all messages ever sent on LinkedIn. There were a couple of learnings from this stage worth sharing.
Update conflicts: The bootstrap process is offline and based on a snapshot. Between the snapshot time and final upload into the new-system's database, a member might have modified their message (let’s say they read the message). When the bootstrap map-reduce job completes and the records are uploaded into the database, the modification from member might be overwritten in the database. To prevent this, we included the snapshot time as the If-Unmodified-Since header in every insert/update request to the database. The database keeps track of the last modified time for each row, and if the last modified time is greater than the snapshot time of a request, the update/insert request is dropped.
Table size: We used LinkedIn’s distributed noSQL database in the new-system. While the issue of table size might be an implementation detail of our specific database, it is worth calling out since other databases might have a similar pattern. We noticed that uploading data into some tables were significantly slower than others even though the record sizes were similar. We discovered that the number of indices in a table play a significant role in upload throughput. A single upload request fans out into multiple write requests, one for each index. Hence upload time is inversely proportional to number of indices in a table.
Looking back, this data migration was the first in a string of successes that led to the launch of our new messaging platform. Key to this initial success was the substantial effort and careful planning behind a phased approach, the idempotent nature of our design, and the patience to do it right.
For those considering a similar migration, a key piece of advice we’d like to share is to defer timelines until you are certain about your throughput. Our initial calculations were based on the ideal performance of our databases, and we soon found that real-world differences in hardware, the extended duration of the migration and more, caused multiple revisions to our timelines. In the end, this entire data migration effort took us nearly six months to complete. It’s important to remember that a data migration is a marathon, not a sprint.