Concourse: Generating Personalized Content Notifications in Near-Real-Time
May 25, 2018
Introduction to Notifications
Social media are computer-mediated platforms that facilitate creation and sharing of information, ideas, career interests, and other forms of expression via virtual communities and networks. In the last few years, notifications have been proven to be the mainstay for surfacing social media’s value to users and engaging members. Before the advent of notifications, these social media applications relied on a member’s intent for site visits and discovering information, without any prompting by the network.
Compared with emails, whose feedback loop (between email sent and an ensuing member visit) is typically hours or days, mobile app notifications shortened the feedback loop to minutes because of their inherently attention-grabbing nature. Also, notifications have higher engagement rates compared to emails, primarily due to the selection bias of app installers who are eligible to receive notifications. The greater effectiveness of notifications is accompanied by members’ expectation of a much higher relevance bar compared to email.
In general, there are two categories of notifications. The first is the class of notifications that do not need filtering, which include member-to-member(s) messaging (on Facebook Messenger, Whatsapp, LinkedIn messaging, etc.) and member-to-member invitations (on Facebook and LinkedIn). These are also called transactional notifications. The second category is the class of notifications that do need filtering to ensure high relevance of the event to the recipients (for example, a notification about daily news or new jobs). These notifications are also called non-transactional notifications. While a high-utility notification has the potential to engage a member, a low-utility one can annoy a member. Concourse is the platform we use to generate notifications which are of high utility to Linkedin’s members, to surface content which they would engage with to derive value. Our ultimate goal with Concourse is to generate only notifications that will be highly relevant to the member.
LinkedIn’s notification ecosystem
The notification has been an effective mechanism for surfacing LinkedIn’s value to members and getting members to engage with each other on the LinkedIn platform. As notifications are typically attention-grabbing by nature, the user’s experience of notifications is affected by several dimensions, namely: is the notification timely, relevant, actionable, and delivered via the right channel? LinkedIn serves a number of different notifications, including connection invites, job recommendations, daily news digests, and other activities within one’s social network. Some of these can be scheduled in advance, such as job recommendations, work anniversaries, and so on. Other content, such as a new job posting or a connection’s job change update, might benefit from a more timely distribution. Systems like Air Traffic Controller (ATC) are used to ensure a positive member experience when it comes to delivering relevant, timely notifications.
Notifications at LinkedIn
Legacy flow of content notification targeting and scoring
Besides all the above benefits, social activity-based notifications (a.k.a content notifications) have also proven to be an incredibly powerful way to make content contributors feel heard and to move LinkedIn members through the engagement funnel and trigger viral loops that cause further increased engagement. Members post content on linkedin.com to share with other professionals in their network and engage them in conversations. LinkedIn’s news feed and notifications are among the most powerful ways to distribute relevant content to members and keep them engaged and informed. Project Pepper was conceived a few years ago and was designed to drive member engagement through targeted, relevant content push notifications.
Pepper notifications at LinkedIn
As shown in the above figure, the legacy pipeline to generate content notifications is a Hadoop-based offline workflow, i.e. running in batches several times a day. When members post or interact with content on the site, a Kafka message is generated and then written to Linkedin’s shared HDFS clusters via the ETL process. Each invocation of the offline job examines all the Kafka messages ETL’d since the last run and generates the candidate notifications through a join between Kafka messages and LinkedIn graph data, which we call the “fanout graph.” Afterwards, the candidate messages are decorated with relevance features through a data join, and a machine learning-based model is used to score each candidate notification. The selected notifications are then passed over to ATC, which acts as a gatekeeper for all communications from Linkedin to its members. The notification is ultimately delivered to members through the remaining part of the notifications ecosystem.
While using the legacy flow, Pepper was very successful as a prototype of delivering non-transactional notifications, driving a significant lift in key session metrics. However, several drawbacks in the offline flow limited the potential of non-transactional notifications:
6-10 hour delay between activity generation and notification
Prohibitory expensive data joins to support a larger targeting graph
Freshness of the notifications
The fact that multiple batch processes were involved in generating these notifications meant that a recipient would receive a notification of new content posted on the site several hours late. After such a time delay, the original notification may no longer be relevant, for example if it was originally about a current event. Also, a notification recipient interacting with content might not gain significant engagement or a chance to participate in conversations that would have already taken place when the content was distributed via other more realtime channels such as the feed.
Richer targeting of the notifications
Non-transactional notifications use cases supported by Pepper heavily leveraged machine learning models to select the most relevant notifications among the generated candidates for each recipient. Typically, a richer targeting strategy (i.e. a larger fanout graph) will increase the coverage of LinkedIn members that are actually targeted by LinkedIn notifications. This is particularly helpful in engaging dormant quality members with LinkedIn. Additionally, a richer targeting strategy also provides a larger pool of notifications for machine learning models to pick from and thus boost the performance on sending the most relevant notifications.
A system capable of enabling freshness and richer targeting of notifications is critical for taking LinkedIn’s notification business to the next stage, that is, getting content contributors to feel that they are heard promptly, triggering viral and conversational loops, and sending only the most relevant content to members. Given the limitations of the legacy offline Pepper workflow on these aspects, we decided to build a nearline system with low latency and better scalability in targeting and scoring.
Concourse as a platform
Concourse was designed to be LinkedIn’s near-real-time distributed targeting and scoring platform for generating content notifications. Concourse selects the most relevant recipients for a notification using various targeting criteria, business rules, and relevance. Concourse provides the following high-level functions:
Fanout: Consumes a content piece and identifies all potential recipients
Scoring: For each <content, recipient> pair (a payload), Concourse fetches appropriate features and computes a model-based score on the fly to determine whether this payload will be delivered to the recipient or not.
Concourse was initially scoped to serve as the nearline replacement for the offline flow of feed notifications (Project Pepper) in its first iteration. Later on, Concourse was re-scoped to be the one solution to provide nearline fanout, feature decoration, and relevance scoring for all such LinkedIn use cases.
To this end, we wanted to build the system to scale to many use cases without having to maintain too much of the use-case-specific logic ourselves. We also wanted to try to provide the right abstractions to enable producer teams to express their fanout logic while keeping the maintenance costs of the system manageable.
The image below shows the high-level architecture for Concourse and the processing steps each notification goes through. Note that Concourse is not the service that sends the in-app or push notification, but is instead the service that makes the decision as to which notification is most relevant to the given recipient.
The workflow manager is the component that onboards new use cases to Concourse. This component standardizes the incoming requests’ schema, extracts features that are readily available in the input streams, and then pushes the standardized requests to the fanout stage’s Kafka queue. At this point, the Kafka messages are partitioned by actor, that is, the member who generated the content. This means that all content generated by an actor gets processed by the same instance of the Samza processor. This allows us to take advantage of localized caching while decorating actor features.
This component retrieves the candidate recipients for each incoming payload using client-specified fanout logic and then produces a new payload for each candidate recipient. This is modelled as propagating messages between two nodes connected by an edge in a graph. Edges can be explicit, such as connections and follows between members and companies or follows between a topic and a member, or they can be implicit or derived edges, such as members in a certain geographic area with a certain title.
For the initial use cases that Concourse was built to support, fanout was defined as all members who are either connected to or are following the notification creator. This data is fundamental to LinkedIn and is available via an internal API called Edges. So, to perform the fanout, Concourse calls the Edges API to fetch all connections and followers of a content creator and iterates through this list, generating a candidate notification for each recipient, which will be consumed by the next stage. This stage also identifies recipients that could be reached via multiple edges and consolidates all these edges together, so that the scorer can pick the most relevant edge.
Machine learning-based scoring and decision making
The image below illustrates the First Pass Scoring component, which consumes candidate notification messages after fanout from the Kafka queue and calculates the partial score for each payload using the features retrieved in Concourse. In addition, this component also supports payload queuing and feature passing to the downstream ATC component. Once the recipient is identified for a message, the message stream is partitioned by recipient in the fanout stage. This is again done to take advantage of local caching for recipient- and edge-based features, as explained below.
For the purposes of scoring, the relevance algorithm needs to know information that is not directly a part of the notification data. This might include LinkedIn’s estimation of the strength of a connection between the content creator and candidate recipient, which would be learned from their interaction on the site and other sources, the country of the sender and recipient, and so on. This data is called feature data; to rank a notification, the ranker needs access to four sets of feature data:
- Features of the source (actor) (ActorFeatures).
- Features of the notification content (ItemFeatures).
- Features of the recipient (RecipientFeatures).
- Features of the relationship between the sender and recipient (EdgeFeatures).
This data is easily accessible at LinkedIn, either via online REST APIs or through tables in LinkedIn’s shared HDFS clusters. Actor features and notification content features can be fetched before we do the fanout and can be sent as part of the notification payload. These features do not change after fanout, and the volume of messages before fanout is much lower compared to after fanout. This allows us to make network calls to fetch this data without impacting throughput. We chose to implement this feature decoration functionality in a separate, “prefanout” Samza instance.
Since the recipient is identified only after fanout, features about the recipient and features of the relationship between the sender and recipient need to be fetched by the scorer before ranking a candidate notification. As mentioned earlier, the scorer cannot make network calls to fetch this data, so these features need to be made available in the scorer’s local Key-Value store to support fast lookups at scale. This data set is also large and often runs several terabytes. Therefore, we cannot push a full data set into every scorer instance, and the data needs to be partitioned across multiple scorer instances.
So we need to ensure that the scorer instance which ranks notifications for a recipient has feature data about that recipient as well as feature data for all edges which include that recipient cached locally. Samza also manages this for us. When a Samza job processes multiple Kafka topics, Samza can be configured to process data for the same partition across all topics in the same Samza instance. However, all the Kafka topics need to use an identical partitioning key and identical number of partitions for this to work. This feature data is generated from multiple data sets, all of which have their own requirements. This necessitates another Samza job, called “features-processor,” which consumes data from multiple sources at LinkedIn, re-keys each messages based on the recipient, and emits the data to a different Kafka stream. This Kafka stream is then consumed by the scorer job to index data in its local Key-Value stores.
Challenges and design decisions
In order to provide LinkedIn’s over 550 million members with the best possible user experience, Concourse is dedicated to generating the highest possible liquidity of candidate notifications and selecting the most relevant ones to be sent to our members in a timely manner by leveraging machine learning and AI technologies. In this regard, besides requirements on redundancy and low-latency, Concourse faces the following challenges in scalability:
- Concourse needs to support scalable scoring on the order of millions of candidate notifications per second and make a send or no-send decision for each notification in real-time.
- To enable machine learning-based scoring, Concourse also needs to support scalable retrieval of features, which is on the order of hundreds of billions of records and several terabytes in data size.
A few design decisions and optimizations have been made throughout Concourse development to overcome the above challenges.
Leveraging Samza brings a number of benefits along with it:
- Concourse needs to store data to perform fanout and to rank notifications. Samza manages this state for us and takes care of backups and bootstrapping this data on a new machine.
- When a host goes down, Samza’s YARN scheduler automatically moves the job to a new host from the host pool.
- Samza’s YARN scheduler also efficiently uses the resources available on the host pool by scheduling multiple jobs on the same machines.
Partitioning metadata and scoring requests by recipient
The mission of the Concourse platform is to make personalized decisions for the recipient for each notification on the fly. Concourse adopts a recipient-based partitioning strategy for all the incoming Kafka topics in its scoring processor. This design ensures that all notifications and metadata for a specific recipient are routed to the same scoring host to allow for distributed scoring of the candidate notifications, thus enabling scalable capability in scoring. Given the fact that some superactors (such as Bill Gates) have extremely large connections and followers (over millions), each of their activity will generate millions of candidate notifications and result in spiky traffic, imposing a fatal system challenge if not handled properly. Partitioning based on recipient allows for a balanced load to the scoring processors and thus enables Concourse to easily handle spiky traffic caused by superactors or influencers. Using partitioning based on recipient, we avoid the need to replicate entire metadata in every host, which would be prohibitively expensive in terms of storage. Instead, each host is only required to store metadata for the recipients whose member ID is hashed to this host.
Storing external signals in local store
To enable machine learning model-based scoring, Concourse consumes various signals, like member metadata and edge metadata. To allow for scalable scoring (~500 k QPS at peak), Concourse first retrieves these signals from both online data stores and offline HDFS and then puts these data into the RocksDB instance residing on the disk of the processing host. These local RocksDB stores allow for a fast read while scoring each candidate message and thus increases the scoring throughput. While the client-side latency of remote calls could range from a couple milliseconds to hundreds of milliseconds, the read to RocksDB has a much lower latency in the order of tens of microseconds.
Distributing incoming traffic across data centers
To further boost the scalability of the Concourse platform, we have decided to let Concourse consume raw activity content from local Kafka topics instead of the ones aggregated across data centers. At LinkedIn, any Kafka event is first queued in a local Kafka topic (within each data center), which only contains events that are routed to the current hosting data center. Afterwards, each event is copied from the local Kafka topic to a global Kafka topic, which contains all the events that have been queued in all data centers. By consuming from local Kafka topics, Concourse hosts in a particular data center only need to handle traffic routed to that particular data center, which increases the processing capacity of Concourse to four times if compared with consumption from global Kafka topic.
Over-partitioning Kafka topics
The number of Samza processes that process this workload in parallel is determined by the number of Kafka partitions in the topic between the fanout and scoring Samza jobs. A higher partition count implies a higher degree of parallelism, and each individual task then becomes more manageable in terms of CPU cost. So we over-partitioned the Kafka topic that feeds into the scoring job so that each process has to score at most a few thousand records per second. The scoring job also needs to store feature data, which is also partitioned. We also needed to ensure that each Samza job stores at most a few GBs of feature data for Samza to effectively distribute these jobs across the shared set of machines.
The feature data which runs several terabytes and multiple billion records are also produced in Hadoop by joining multiple different data sets. Pushing this full data from Hadoop, re-partitioning, and indexing it locally in the scoring jobs takes almost a day, since these pipelines are shared and have a limited amount of bandwidth per stream. The scoring job also needs to process and index this data while it is ranking notifications in real time. Pushing this data also puts a lot of strain on the underlying Kafka systems.
We realized that each underlying data set gets refreshed at a different frequency. For example, one data set gets fully refreshed each week in a single large batch. Other data is more live in nature, that is, it is constantly getting updated based on live traffic. So, to scale the feature push job better, we implemented a solution where a delta is computed in Hadoop where only rows which actually changed since the last push are selected. This smaller data set is pushed everyday, which significantly reduces the cost. We also push the full data set once a week over several days to reduce pressure on underlying systems.
Concourse is now powering all content recommendation notifications to their network. We realized that there are other use cases at LinkedIn which are not necessarily around content but which also require near-realtime fanout and relevance based filtering and scoring. One such example is the notification that gets sent when a member updates their profile about a job or title change. We are in the process of onboarding other types of notifications to Concourse.
We also realized that fetching a list of candidate members from a service, such as fetching connections from the LinkedIn Graph, is one such of method of performing fanout. Edges other than connections are also possible. For example, we might want to notify certain LinkedIn members working in the New York metropolitan area about highly relevant local news. Or we might want to notify certain LinkedIn members working in social media companies about new GDPR regulations. The set of members for such edges is very large and it might not be practical to fetch them page-by-page using an online API. The approach we are considering here is to index these attributes in a local Key-Value store, partition across multiple fanout instances, and compute the fanout by iterating through this table.
Designing and releasing Concourse to production was a very challenging endeavor which would not have been as smooth without support of our partner teams: Samza, Kafka, Communications Relevance, and others. We'd also like to thank the following individuals for their support and contributions to this project: Banu Muthukumar, Chien-Chun Hung, Eric Brownrout, Ankit Gupta, Curtis Wang, Ajith Muralidharan, Viral Gupta, Shaunak Chatterjee, Shipeng Yu, and Parin Shah.
We look forward to continued collaboration as we expand Concourse’s capabilities!