Member/Customer Experience

Near real-time features for near real-time personalization

Co-authors: Rupesh Gupta, Sasha Ovsankin, Qing Li, Seunghyun Lee, Benjamin Le, and Sunil Khanal

At LinkedIn, we strive to serve the most relevant recommendations to our members, whether that’s a job they may be interested in, a member they may want to connect with, or another type of suggestion. In order to do that, we need to know their intent and preferences, which may be revealed through their actions on our application. For example, if a member visits LinkedIn and applies for a web developer job in San Francisco, then this action can reveal their intent to find a job, with a preference for web developer positions in San Francisco. Such information is leveraged by our various recommenders to better personalize recommendations for the member. For example, our job recommender can recommend other web developer jobs in San Francisco, our connection recommender can recommend web developers to connect with in San Francisco, and our feed recommender can recommend content related to the job market in San Francisco to this member.

However, there is usually a delay between when a member takes an action and when it can be leveraged to adapt recommendations for that member. This is because member activity data is typically processed periodically into features in a batch environment and then made available to recommender systems. A conventional feature pipeline is shown in Figure 1. Every time a member takes an action, an event containing information about the action is emitted to a data stream (such as Apache Kafka). These action events are periodically ETLed into an offline data store (such as HDFS). A batch job (such as an Apache Spark job) periodically reads the activity data of all members in this offline store and processes it into features. These features are then pushed to an online store (such as a key-value Venice store). When a member lands on a page that contains a recommendations module, the corresponding recommender system reads features from the online features store and uses them in a model to score candidate recommendation items and return recommendations ordered by their scores. 

image-of-conventional-feature-pipeline-for-leveraging-past-actions-of-a-member-to-personalize-recommendations

Figure 1. A conventional feature pipeline for leveraging past actions of a member to personalize recommendations

Depending on the speed of ETL to the offline data store and the frequency of the batch job, this approach can introduce a delay of anywhere from a few hours to even days between when a member takes an action and when it can be leveraged to adapt recommendations for that member. This might mean missing several opportunities to better personalize the experience of the member. For example, let’s say that there is a delay of 24 hours between when a member takes an action and when it can be leveraged to adapt recommendations for that member. Now, let’s say that a member last visited LinkedIn one month ago and applied for jobs in San Francisco. Based on the past activity data of this member in the offline data store, we would infer that the member prefers a job in San Francisco. However, this member might visit LinkedIn today and start applying for jobs in New York. This can reflect a change in the member’s preference; however, we would not be able to adapt job, connection, or feed recommendations for this member for the next 24 hours due to the delay. 

The impact of the simulated delay in leveraging job apply actions of a member in our job recommender model is shown in Table 1. In this table, we use a delay of 1 minute as the baseline for computing the impact on the model’s ROC-AUC (a measure of the model’s performance). It’s clear that the performance degrades with an increase in the delay.

Simulated delay Impact on model’s ROC-AUC
1 minute 0%
1 hour -3.51%
6 hours -4.27%
24 hours -4.45%

Table 1. Impact of simulated delay in leveraging job apply actions of a member in our job recommender model. A one minute delay is used as a baseline.

In this blog post, we describe our solution for leveraging a member’s actions in near real-time to adapt recommendations for that member in near real-time. Our solution is based on the following two ideas:

  1. Recent actions of a member may be ingested into an online store, as we only need to retain actions taken within the last few days in this store. This is because features computed from actions in this online store should complement the features computed through the conventional (batch) feature pipeline.

  2. Rather than precomputing features based on recent actions of a member, these features may be computed on-demand when recommendations need to be generated. The computation is fast, as typically only a small amount of data needs to be processed for these features. This allows us to use the most recent actions of a member for computing these features.

Computation of features based on a member’s past actions

We surveyed several AI teams at LinkedIn to understand how they compute features (through the conventional feature pipeline) based on a member’s past actions. We noticed a generic pattern in computation of a majority of these features. This computation comprises three steps:

Step 1: Get relevant actions taken by a member over a duration of time. For example, get all the job-apply actions taken by a member over the last 7 days.

Step 2: Look up certain attributes of the entities on which the above actions were taken. For example, look up the embedding (a numeric vector representation) of each job which the member applied to from the previous step.

Step 3: Perform a summarization operation on the attributes of all the entities. For example, compute the average of the embeddings of all the jobs from the previous step. 

The output of the third step is a feature. In our example, it’s an embedding that represents the kind of jobs that this member has preferred in the last 7 days. This is an example of a member feature based on the member’s past actions.

A recommender system might also compute a (member, item) pair feature based on the member’s past actions. Such a feature depends on not just the member for whom recommendations need to be generated, but also on the candidate recommendation item being scored. For example, the job recommender might use the following pair feature when scoring a candidate job recommendation jobi for a member: the number of times this member applied to any job in the same geographic location as jobi in the last 7 days. To compute this feature, the geographic location of each job would be looked up in Step 2, and then a count of the number of jobs in each location would be performed in Step 3. The output of Step 3 would be a map which looks like this: {“new york” : 10, “new jersey” : 2}. When the recommender scores candidate job recommendations for this member, then the value of this pair feature is 10 for a job based in New York, and 0 for a job based in San Francisco. 

Summary of requirements

The generic computation pattern above helped us define the requirements for our desired solution. These requirements were:

Requirement 1: Ability to record any member action of interest within a few seconds.

Requirement 2: Ability to join any attributes of the entity on which an action was taken. 

Requirement 3: Ability to retrieve actions taken by a member (along with joined attributes) that meet certain criteria and compute features from those actions in less than 100 milliseconds. 

  • The retrieval criteria could be, say, the job-apply actions taken in the last N hours. N is no more than 96, as any feature based on actions taken by a member more than 96 hours ago should be obtained from the conventional feature pipeline. 

  • The 100-millisecond latency constraint ensures that recommendations are served to a member within a few hundred milliseconds after the member lands on a page that contains a recommendations module. 

Other than these hard requirements, we also wanted to achieve the following goals. We wanted the solution to be simple, easy to adopt, and easy to maintain. We also wanted the solution to allow us to leverage the same activity data across multiple recommender systems. For example, let’s say that the job recommender is using the solution to record job-apply actions and compute a feature—say, the average of the embeddings of all the jobs that the member applied to in the last 24 hours. Now, if the connection recommender wants another feature that is also computed from job-apply actions, such as the average of the embeddings of all the jobs that the member applied to in the last 6 hours, then it should be able to do so with minimal effort.

Representing actions in a standard schema 

Our first step towards designing a solution based on the above requirements was to define a standard schema for representing any member action. We chose to represent an action as:

Code Block 1. A standard schema for representing any member action

where, “actor” is the ID of the member who took the action, “actorAttributes” are attributes of the actor, “verb” is the type of action that was taken, “verbAttributes” are attributes of the verb, “object” is the entity on which the action was taken, “objectAttributes” are attributes of the object, and “timestamp” is the time at which the action was taken.

For example, if a member with ID 111 (who is from the internet industry) applies (from the mobile application) for a job with ID “job:222” (which is based in New York and has the embedding [0.1, 0.4, 0.9]) at Unix timestamp 345678, then this action can be represented in the above schema as:

Similarly, a click on an article in the feed may be represented as:

While having JsonObject datatype in a schema is usually discouraged, it enables flexibility of including any desired attributes of actor, verb, or object for each type of action. This flexibility would have otherwise been hard to achieve.

Design of solution

With the standard schema for representing any member action, we designed our solution as shown in Figure 2. 

design-of-real-time-personalization-solution

Figure 2. Our solution leveraging actions of a member in near real-time to adapt recommendations for that member in near real-time

We introduced an Apache Samza stream processor to listen for and process events corresponding to member actions of interest from Kafka. We chose to support the Samza SQL API for writing the processing logic in this processor. It limits processing logic to simple operations such as filtering, stream-table joins, and projections. This limitation helped us ensure that the stream processor is always simple and lightweight. The exact processing logic can be different for different use cases, but generally looks like the following:

  1. Read an event corresponding to an action.

  2. Filter the event out if the action is not worth recording. For example, filter out an event if the ID of the member who took the action is null in the event. Such an event might be triggered when an action is taken by a bot.

  3. Join any required attributes of the actor, verb, or the object of the action. These attributes may be stored in external stores. For example, a Venice store may contain attributes of each job, such as its embedding and/or the geographic location where it is based.

The stream processor finally prepares and emits a new event in the standard action schema shown in Code Block 1. This event goes through Kafka again and gets ingested into an Apache Pinot store, which has the same schema as the standard action schema. Each action is ingested into this store as a new row. An example snapshot of the data in this store is shown in Figure 3.

snapshot-of-pinot-store

Figure 3. An example snapshot of the Pinot store containing actions

The store is configured to retain data for 96 hours, which means that an action is deleted from the store 96 hours after it is ingested—this keeps the size of the store under control. The store is also configured to use “actor” as the primary key, so that the data is partitioned and sorted based on the “actor” column, which allows quick retrieval of actions taken by a specific member.

We chose Pinot as our store for several reasons. The main ones were:

  1. It supports near real-time ingestion of data from Kafka. 

  2. It can answer analytical queries with low latency. This allows computation of a variety of features from activity data in the store in less than 100 milliseconds.

  3. It is horizontally scalable. This allows multiple recommender systems to query from the same store.

  4. It supports purging of old data.

A recommender system can now query this Pinot store when recommendations need to be generated for a member. For example, if a member with ID 111 visits a page that contains the job recommendation module, then the job recommender can get attributes of all the jobs that this member applied to over the last 24 hours by issuing the following query and can then perform a summarization operation on the retrieved job attributes to create features.

If the connection recommender wants the attributes of all the jobs that this member applied to over the last 6 hours, then it can do so by issuing a slight variant of the above query:

In some cases, the summarization operation may be a part of the Pinot query and performed inside Pinot. For example, 

In some other cases, it may be better to join certain attributes after retrieving actions from the Pinot store. For example, let’s say that the embedding of each feed article is a vector of 800 dimensions. Then, it may be better to avoid storing the embedding of the article as an objectAttribute of each feed-click action in the Pinot store, and instead be preferable to get those embeddings from the attributes store after getting the list of articles that the member clicked on in the recent past, as shown below. 

Step 1:

Step 2:

This two-step process may or may not increase the overall retrieval latency significantly, depending on the type of the store to which the call is made in Step 2. In cases where the overall latency is expected to increase significantly, the attributes may be stored in the Pinot store with adequately more hardware.

Depending on the types of features required, the recommender system appropriately queries the Pinot and attributes stores to compute the near real-time features. It then uses these features, along with other features (such as those computed through the conventional feature pipeline) in a model to score candidate recommendations. The near-real time features can capture the short-term intent and preferences of a member, while the other features can capture the longer-term intent and preferences. After scoring, the recommender system also emits an event to log the computed features to HDFS. Since features based on a member’s actions can be very time sensitive (for example, a member may apply to two jobs within a minute, or click on two feed articles within seconds), logging them ensures that we have the correct value of these features associated with each impression of recommendations. This makes it easy to prepare training data for future iterations of the model. 

Results

Our solution has been successfully adopted by several recommender systems at LinkedIn to leverage actions of a member in near real-time to adapt recommendations for that member in near real-time. It has been able to meet all the requirements: 

  • Member actions of interest can be recorded in the Pinot store within 0.1 to 15 seconds, depending on the frequency of the type of action. This means that if a member takes an action now, it can be leveraged to adapt recommendations for that member within the next few seconds.

  • Actions can be retrieved (along with attributes) in less than 50 milliseconds at a rate of over 20,000 queries per second with the appropriate number of Pinot servers.

  • New use cases can be onboarded within a few days.

  • The maintenance cost has been small.

It has also resulted in significant gains in business metrics. The gains realized from the job, feed, and search typeahead recommenders are shown below.

Metric Impact
Job applies +0.66%
Dismisses of job recommendations -20%
Weekly active users +0.03%

Table 2. Impact of near real-time features in the jobs recommender

Metric Impact
Feed contributors (members who like, comment, share, post) +0.27%
Sessions in which members engage with feed +0.69%
Revenue from sponsored feed updates +0.46%

Table 3. Impact of near real-time features in the feed recommender

Metric Impact
Typeahead click-through rate +0.31%
Abandoned typeahead sessions -0.35%

Table 4. Impact of near real-time features in the search typeahead recommender

It was pleasing to observe an increase in the number of weekly active users (number of members who visit at least once within a week) when we added near real-time features to the jobs recommender. Increasing this metric is hard, and while 0.03% may seem small, it actually translates to a large increase in the absolute number of members. This increase was driven by better personalization of job recommendations for our new members, for whom we have the least amount of past activity data.

Limitations 

Although this solution works well for the types of member features and (member, item) pair features described above, it may not work well for other types of features based on recent actions of members. An example is item features based on recent actions of all members. An item feature is a feature that depends only on the candidate recommendation item being scored, such as a summarization of the embeddings of all the members who clicked on a feed article in the last 24 hours. This feature might help the feed recommender suggest articles to a member that have been gaining popularity among similar members in the recent past. Since there can be thousands of candidate recommendation items that need to be scored for a member, it becomes computationally expensive to compute the feature for each one of those items in a timely manner. Our solution for such cases is to pre-compute the feature in a separate streaming job and store it in an online key-value store for quick retrieval. This solution is currently under development.

Lessons learned and future work

Based on our experience thus far, we have learned the following lessons:

  • Always test your assumptions. When we started, there were several concerns about the feasibility of our solution. After performing load tests with real data and queries, it was apparent that on-demand computation of features using Pinot is performant enough for many use cases.

  • Early beats perfect. Deploying the first few use cases to production revealed certain intricacies that we wouldn't have been able to foresee. These informed our future development plans. 

  • Short development cycle helps. Successful completion of this initiative required several iterations. Our short development cycle allowed fast iterations. This was in a big part due to prior investments in scaling machine learning productivity, our experimentation platform, and continuous deployment.

Armed with a better understanding of the advantages and limitations of the technologies we used, we are working to evolve our solution to support all types of near real-time features. If you are interested in solving problems like this, please get in touch with us or check out the LinkedIn careers page.

Acknowledgements

We would like to thank Jiaqi Ge, Aditya Toomula, Mayank Shrivastava, Minhtu Nguyen, Justin Zhang, Xin Yang, Ali Hooshmand, Yuankun Xue, Xin Hu, Qian Li, Hongyi Zhang, Marco V. Varela, Manas Somaiya, Shraddha Sahay, Raghavan Muthuregunathan, Anand Kishore, Daniel Gmach, Joshua Hartman, Shipeng Yu, Abhimanyu Lad, Tim Jurka, Romer Rosales, and many others who helped us.