Building the Contacts Platform at LinkedIn
October 23, 2018
Members can choose to import their contacts from third-party address books and calendars into LinkedIn to help them grow their professional network. We use this contact information to suggest relevant connections to members (“People You May Know” or PYMK) and help them grow their professional network. For legacy reasons, until recently, we had two different systems that maintained contact and calendar data separately.
This post discusses our strategy and the work involved to re-architect the contacts and calendar ecosystem (PINT – Personalized INTelligence) to build a single source of truth for all contact and calendar data. We’ll focus on steps we took to keep the services running at all points while managing the migration of data, as well as ensuring no impact to member-facing products. Our project code name is “Godwit.” Godwit is the family of migratory birds that migrate each winter from Alaska to New Zealand. They are known to undertake the longest non-stop flight of any migratory bird, hence they are a worthy namesake for our large and involved migration.
Our move to a new contacts platform reinforces LinkedIn’s commitment to data privacy, providing the opportunity to integrate new technology and features that increase transparency and control for members.
Since the contacts and calendar products were originally built by different teams, the PINT team ended up with quite a fragmented architecture, making onboarding any new use case a challenge.
We had two services and three different databases supporting contacts and calendar use cases. Address Book Service, written in Java, acted as a backed service to ingest contacts from email import flow and mobile uploads. Connected Service, which originally came as a part of LinkedIn’s acquisition of ConnectedHQ, is written in Python and periodically syncs contacts from third-party sources. Oracle was used by the Address Book Service to store contact data and Connected Service used MySQL and Espresso to store both contact and calendar data. These deployments of Oracle and MySQL had significant footprints, with close to 50 and 100 shards, respectively. Espresso was used to process mobile calendars to create notifications for members to learn more about people they were meeting. Since the data was distributed across these three DBs, our Extract, Transform, and Load (ETL) process had to take periodic snapshots of all three databases and make them available in Hadoop Distributed File System (HDFS) for all offline analysis like PYMK. These offline jobs would process the data and compute the suggested connections for the member, making those suggestions available to the frontends.
There were challenges with using a fragmented architecture, including: maintaining and syncing multiple services and databases, dealing with different data models across these systems while onboarding new use cases, manual DBA intervention to run ALTER TABLE scripts on all the shards to do any schema evolution, and huge monetary costs in licensing and provisioning hardware to support multiple databases. On top of that, without LinkedIn’s standard Rest framework for our Python stack, it became quite difficult and hacky for other online services to interact with this system. For these reasons, we decided to build a single source of truth for all contact and calendar data.
Re-architecting the system
As we started this project of re-architecting the system, we had many challenges ahead of us.
We wanted to establish a single source of truth, which required us to migrate hundreds of terabytes of data and 40+ client services. On top of that, we were constrained by the need to migrate with zero downtime, while keeping both systems in sync.
Deciding on the right technology
Before we built out the new system, we needed to decide what was the right database platform that would fit our needs. After careful evaluation, we felt that Espresso met all the requirements for our use cases. Espresso is a fault-tolerant and distributed NoSQL database, and it provided a more scalable approach to our ever-increasing demand. It also provided a quick lookup for member data without the need for global indexes. Espresso’s auto-purging ability reinforces our commitment to data privacy by purging all members’ data if they delete their LinkedIn account. Also, being an in-house technology, Espresso is well-integrated within the LinkedIn ecosystem and has good support from engineers and SREs.
Building the right data model
Since we were going to perform migrations from two relational databases (Oracle and MySQL) to a document-based key-value store (Espresso), re-designing the schemas was a challenge in and of itself. We carefully examined all the tables and fields in our legacy systems and their usage patterns. We looked at all the existing relations and how they actually impacted the system. We also looked at all the usage patterns of our data and optimized more for performance than storage. We had to de-normalize some of the data in favor of better performance. We created secondary tables as per our query patterns and avoided a global index on the primary table.
At LinkedIn, all the database and API schemas need to go through the Data Model Review Committee (DMRC) to make sure they meet common standards and to discover common pitfalls. DMRC consists of many senior engineers at LinkedIn and is often quite helpful in making sure you have designed your data model in the right way so that it can be easily evolved in the future.
Use of personal data routing
Typically at LinkedIn, we replicate member data in all of our data centers to provide quick access to member data anywhere around the globe. This also provides an easy way of switching traffic between data centers in case of failover.
The contacts dataset is one of the biggest datasets at LinkedIn and is growing very rapidly. Following a similar architecture of replication was going to cost a lot in terms of hardware for our Espresso clusters. To combat this problem, we leveraged personal data routing (PDR) for the contacts dataset, originally built by the Messaging team. Instead of writing/replicating members’ contacts data in every single data center, we decided to write data only in each member’s primary and secondary data centers. It seemed two data centers were sufficient for disaster recovery. This decision significantly reduced the hardware cost to N/2, where N is the total number of data centers operated by LinkedIn.
One major downside of utilizing PDR is that we must make a cross-colo call whenever the traffic layer shifts from a member’s primary data center. However, after careful evaluation, we realized that this only impacts less than 10% of members, and we were able to optimize so that a cross-colo call only adds a few milliseconds to page load time. This seemed a manageable risk to our approach of going with PDR.
We ended up with a three-tier architecture for our backend:
Tier 1: The middle tier that talks to the Personal Data Routing Service to know where a member’s data is and then routes the request to appropriate data center.
Tier 2: The backend service that gets data only from local Espresso clusters. It fetches the data from one of the N-1 clusters, where N is the number of data centers.
Tier 3: The Espresso Cluster Pairs that actually store the contacts data. We maintain two replicas of data. One acts as a primary, which serves all application requests, and the other acts as a failover. In the above diagram, Cluster Pair 1-2 and Cluster Pair 2-1 contain the exact same data and any data written into one is replicated into the other.
Since data could be distributed in Cn2 ways, the total number of unique Espresso clusters would be N * (N - 1) / 2. Theoretically, we would need N-1 Espresso Clusters in each data center, with each cluster containing approximately 2/(N * (N-1)) data.
Building the new service
We needed to build a new service that could support in the new architecture all the use cases that had been supported by our legacy systems. After careful analysis, we were able to separate out member-facing flows from non-member-facing flows. For example, if a member imports their address book, the member is only waiting to see all the contacts from their address book to either invite to LinkedIn or connect with them. However, we can still store contacts in our database asynchronously for future member needs. All the online use cases were designed using Rest.li framework and all asynchronous use cases were designed using Kafka.
Migrating the data
Migrating hundreds of terabytes of data was itself going to be a gargantuan task. Most of LinkedIn’s datasets are ETL’ed into HDFS. Similarly, we were having daily snapshots of Oracle and MySQL contacts databases copied into HDFS. We created Map-Reduce scripts in PIG to read from these snapshots and converted the data into our new Espresso format. The scripts were scheduled and run using the Azkaban workflow manager. We then launched the Espresso Bulk Loader tool provided by Espresso engineers to bulk load the data from HDFS into Espresso Clusters, which first copied the data from HDFS to the Local NFS Filter in each cluster and then used Espresso APIs to upload the data while maintaining all the indexes.
Map-Reduce scripts needed to load data from multiple tables, each of them close to 100 TB, run some conversion logic, and then output to HDFS, again into multiple tables. Running such a resource-intensive Map-Reduce job required lot of tuning of the number of mappers, reducers, and mapper size using Dr. Elephant to be able to finish the job successfully in a reasonable amount of time.
Ensuring data privacy
Our members’ data is highly sensitive. We worked very closely with the security team for this migration and made sure we took the right steps in securing our member data. In addition, if a member deletes their contacts from LinkedIn, we needed to ensure the contacts are deleted from both the systems. We built an online workflow that captured every single delete in the legacy system and made sure we deleted the data in the new system being built.
Dual-writing the data
After successfully bulk uploading the data, we needed to start dual-writing the data in both the legacy system and the new system. At LinkedIn, we use InGraphs to monitor live traffic to our services. To be able to successfully dual-write, we built a lot of custom metrics to be emitted on InGraphs to monitor our dual-write ramps.
Our legacy system also used to emit tracking events that got ETL’ed again into HDFS so that data scientists could compute business metrics. If we started to emit the same tracking event during the dual-write phase, this might lead to lot of skewed business metrics and would give wrong reports about LinkedIn’s growth. To avoid running into this problem, we emitted the tracking events from a new system with the same schema but with a different name, so that none of our business metrics get impacted.
To validate if our dual-write is correct, we wrote a lot of custom Map-Reduce jobs in Hadoop to check if all the data is correctly written. The jobs were very similar to our bulk load job, since they needed to read from all the tables in both the legacy and new system, and required a lot of tuning. Our first run of the jobs gave us about 70% accuracy. We had to do lot of analysis on the differences to determine the root cause, fix the issues, and then repeat the cycle. We repeated this process until we reached 99.8% accuracy.
Catching-up the data
After successfully bulk uploading and doing online dual-writes, we needed to figure out the delta that was missed during this period. During the bulk load phase and ramping the online dual-write phase, we had divided our member set into about dozen buckets. We were keeping track of all the timestamps of bulk loads and online ramps for each of the buckets, which gave us a very good idea of what data was missing in the new system. With that knowledge, the catch-up phase was fairly straightforward and required re-running the same scripts that we ran for the bulk load phase, just with different date parameters.
At the end of the final catch-up, we ran our data verification scripts for the final time and saw a more than 99% data match between two systems.
Switching the read traffic
After we were satisfied with the new system, we needed to switch about 40 clients to the new system. We first built a custom InGraphs dashboard that tracks traffic from each of our clients to both our old and new systems by making use of Autometrics. The new system obviously had zero traffic in the beginning. The dashboard helped us figure out all the services calling in to our old system. After getting a list of services, we started reaching out to their owners for the migration and prepared some code samples to help with the migration. InGraphs really helped us keep track of up-to-date progress and identify any issues.
During the clients’ ramps, we identified that our new system has slightly degraded performance. We considered this a major ramp blocker of our system. We did a lot of performance analysis with the help of our call trees and identified potential performance bottlenecks.
The first bottleneck we found that for certain requests, response size was quite large, causing a lot of network latency. We fixed that by introducing compression into all our responses.
The second bottleneck we identified was with the algorithm used for de-serializing responses from the database. We spent time optimizing that algorithm, resulting in a 50% improvement in de-serialization performance.
Migrating all the offline jobs
Similar to our upstream clients, we also needed to migrate our downstream clients, like PYMK, that process the contacts data offline. We had to look for all the jobs that had accessed our datasets in Hadoop and identify the owners. There were lot of jobs reading from our old datasets owned by multiple teams. We had to provide them guidance with the migration with mapping between old datasets and new datasets. Once all the clients had successfully migrated, we locked our old datasets to ensure no one was accessing them and finally deleted all the old data.
Building the ETL pipeline
At LinkedIn, we only have HDFS clusters in some of our data centers. Most of the databases at LinkedIn replicate their data in all data centers. The ETL process for these databases requires taking a snapshot from only one data center.
With the use of PDR, we could not leverage the ETL process for our datasets. So, we had to build the new ETL pipeline using Brooklin.
In the above diagram, Cluster Pair 1-2 and Cluster Pair 2-1 have exactly the same data, so we just need a snapshot from one of them. We could easily leverage the existing pipeline at LinkedIn to simply get a snapshot in data center 1. However, to get a snapshot from cluster pairs that do not exist in data center 1, we have to open a Brooklin stream across data centers to get snapshots in HDFS.
After confirming all our clients had been migrated to the new system, we started working on decommissioning our old services and DBs. This was the last step of the migration to ensure all the unused data are permanently removed from our system. With a single source of truth for all contact and calendar data, we have improved the member experience by giving them more control over their data on LinkedIn. By mid May-2018, we had successfully landed our Godwit to the new nest (PINT – Personalized INTelligence) and saved LinkedIn millions of dollars per year in licensing and maintenance costs.
Building the new platform from scratch and migrating from legacy systems was not done without the hard work of many individuals. This was a year-long project with tireless effort by many team members across the New York and California offices, in particular Hari Gopalakrishnan, Ramana Isukapalli, Sriram Phani Kumar Palacharla, Mengjun Xie, and Jason Khoe. Kudos to every individual involved in this project; despite the time differences we had, we have achieved this huge milestone with perfect planning and collaboration.