Opal: Building a mutable dataset in data lake
March 16, 2022
At LinkedIn, trusted data platforms and quality data pipelines are essential to meaningful business metrics and sound decision-making. Today, a considerable percentage of data at LinkedIn comes from online data stores. Whether the online data systems fall into SQL or NoSQL categories, we must move those online datasets to our data lake for further processing and business analytics.
However, it can be challenging to reflect online table updates, inserts, deletes in the data lake system, because the latter tends to be immutable. To balance the data quality, latency, scan performance, and system scalability (in terms of both compute resources and operational cost), we developed Opal. We use Opal to ingest mutable data, like records from databases (Oracle, MySQL, Espresso, Venice, etc.). It builds a mutable dataset on top of the immutable file system without eagerly reconciling the ingested data files. At the same time, it also exhibits the flexibility to query the snapshot view at any historical time point, which is called "time traversal."
Before and after Opal
Figure 1: Lumos and Opal data flow
Before Opal, a typical approach for building a snapshot dataset was to lay out the data in an LSM-tree format to facilitate the merging of a base snapshot and delta data. We call this system “Lumos.” An ingestion job consumes change streams from Brooklin and lands data bucketed by time. Another job reorganizes the change stream data into the delta format. Periodically, the snapshot is evolved by merging the base snapshot with the delta. Among those operations, rewriting files is inevitable, thus we often encounter the write-amplification problem, where small updates can waste system resources and make our data system hard to align with our continued growth in data volume and use cases. Also, the significant latency introduced by the merge makes it unrealistic to deliver reliable, fresh data in a timely manner.
Opal uses read-time filtering, using validity metadata instead of sort-merge to determine the latest row for a given key. This decoupling of the data layout from the filtering condition allows us to present an updated view of the data without requiring the data to be sorted and reorganized. When the filtering metadata is updated within a streaming ingestion job, the latency of a snapshot update can be much lower than with the legacy system.
Opal data format
To maximize the full-table scan speed, Opal leverages a columnar file format (e.g., Apache ORC). Data in an Opal dataset is organized into logical “segments.” A segment is identified by a two part id: (SegmentIdPart, SegmentIdSeq) and each row in the segment has a SegmentOffset. These three values compose a unique row id for the row within the dataset. The SegmentIdPart and SegmentIdSeq are included with each row to support combining multiple logical segments in a data file. These three fields are part of the data file’s schema and can be used globally to identify a record, denoting the upstream mutation (update, insert, delete). This tuple is called the "Opal row id" and must be unique within the dataset. The mechanism of generating those values is up to the ingestion engine. For example, an incrementing SegmentIdPart can be assigned to a batch job whenever it gets started. Files within the same execution are assigned a unique SegmentIdSeq by incrementing a sequence number maintained in the ingestion state. The SegmentOffset is used to track the logical offset within a single segment. All these values are populated during the ingestion time to assign a unique row id to each ingested record.
Now, think of each mutation record as a key-value pair, where the key is the dataset's primary key. With Opal columns, it looks like this (A and B are updated spanning across two job runs):
Figure 2: File data illustration
The mod_time here is a special column to tell when the mutation happened. This column comes from the upstream source and is usually a modification time or sequence number. We call it the “delta column” and all Opal datasets must have this column. The usage of this column in computing the validity metadata will be described later.
Opal maintains a validity metadata bitmap to determine whether a record is visible during the read time. This bitmap is created from two components: a dataset-level key store and segment-level segment logs. The key store provides the before-image information for each mutation, containing the Opal row id for the last valid record for every key in the dataset. Segment log entries track the validity state transition for each Opal row id, organized by the logical segment id (segmentIdPart, segmentIdSeq). The segment log entries are generated for the incoming change stream records by referencing the key store to determine whether the change invalidates an existing record.
You can imagine this component as a key-value store, where the key is the dataset's primary key. The main difference is that the value only contains the Opal-related columns: SegmentIdPart, SegmentIdSeq, SegmentOffset, and delta column (i.e., mod_time). Think of these values as a state associated with a given key. In the case of updates, we use the key to look up the previous state from the key store. This state tells us at which position (exact segment and offset) an earlier record needs to be invalidated.
In Figure 3, the key-store’s mutation is shown as a gray cylinder. A batch job can checkpoint the key-store on the disk with some versioning support.
Figure 3: Key-store updates
Once a key’s previous state is retrieved, we are able to know which segment and offset needs to be invalidated. This invalidation is represented by a SET_VALID_UNTIL log event, which has an event time set to the “delta column” value. Conversely, a SET_VALID_FROM event is recorded to indicate a new value has been set at the same event time. These events may be written to a distributed log, but in our actively used implementation, the events are written to a file on a distributed file system, with events for multiple segments written to a single segment log file.
Look at the previous example (Figure 3). When a mutation record (A, 1, 101, 3, 4000) is applied on top of the previous state, two log events are emitted (shown as bold text, green-colored rows in Figure 4). The record at offset 0 (segmentIdPart=1, segmentIdSeq=101) is invalidated first, immediately followed by a SET_VALID_FROM event at offset 3.
Figure 4: Key-store and segment log updates
The complete series of validity events now forms the segment logs. The following segment log snippet shows the log events for segment (segmentIdPart=1, segmentIdSeq=101).
Segment logs for (segmentIdPart=1, segmentIdSeq=101)
Figure 5: Segment log
In order to quickly relate segment logs to a given data file, Opal implements some internal tracking metadata. This is needed because segment logs and data files are not always one-to-one mapping (due to compaction). The correlation between data and segment logs is achieved by two-staged mappings:
Data to segment: Gives all the segments related to a data file. The metadata mapping segment ids to data files is stored in a state store called the “data management metadata.”
Segment to segment logs: Gives all the segment logs if a segment is specified. The metadata mapping segment ids to segment logs and segment bitmaps is stored in a state store called the “log management metadata.”
The structure of segment logs enables record-level “time traversal.” This means a client can select any time point and reconstruct a point-in-time view. In Figure 5, an instantaneous view of the row’s visibility is obtained after iterating all the log events. Initially, each segment has a bitmap where all bits are cleared. When iterating log events, the bit is sequentially turned on when it comes across SET_VALID_FROM and cleared when a SET_VALID_UNTIL entry is encountered. When all the log events are processed, the bitmap for that segment is finalized and can be used for reading.
Opal assumes all users by default want the latest view of a dataset, so if the time point is not specified, the pre-generated current bitmap is used. The current bitmap is updated whenever new data is made visible in the snapshot.
Ingestion and metadata generation
Data is ingested into the storage format described above with the addition of the Opal row identifier tuple (SegmentIdPart, SegmentIdSeq, SegmentOffset) added to uniquely identify each ingested row. The key-store keeps track of the latest row id for each key in the dataset, and the segment log is updated based on this information.
The data ingestion and metadata generation may be decoupled in separate batch processes or may be inlined in a streaming processing model.
Gobblin drives much of the offline ingestion at LinkedIn, so we leveraged Gobblin for ingesting data into the Opal format. With this approach, we were able to migrate existing ingestion pipelines for Brooklin and Venice to Opal by swapping in Gobblin components to manage the injection of the Opal row id tuple for each ingested record and to change the output format from Apache Avro to ORC. This allowed us to build on stable pipelines and take advantage of existing operational expertise and monitoring support.
The ingestion job updates the data management metadata to keep track of all files that have been ingested for the dataset.
Most of our ingestion flows are based on batch processing. So we use a batch metadata update job to pair with the batch ingestion job. The metadata update job is called “MetaGen” and is a Spark job. We also have an inline metadata update, which uses a RocksDB-based key-store for point lookups.
Batch metadata update
The MetaGen job takes as input the data files generated by the ingestion job and outputs updates to the segment log files and segment bitmap files that are used by the Opal dataset reader to filter out inactive rows. It also updates the key-store with the latest location of each updated key. The log management metadata is updated in this job to track the list of data files that have been processed. Batch metadata updates are more efficient for the datasets having a high change rate relative to the overall data size.
The high-level logic is below:
Figure out the new files to process by comparing the data management state and the log management state.
Join the new data with the key-store to figure out whether a row is an update, insert, or delete.
Output segment log entries to a segment log file.
For each update or insert, emit a SET_VALID_FROM log entry
For each update or delete, emit a SET_VALID_UNTIL log entry
Materialize a new latest bitmap by applying the new segment log entries on top of the previous latest bitmap. This is an optimization to avoid generating a bitmap for current-time queries.
Update the key-store with the latest row id for each updated key.
Update the log management metadata to record the processed data files.
Inline metadata update
With inline metadata update, the Opal metadata is updated as part of the ingestion job. Data is not visible until it has been processed and the metadata is updated, so inline metadata update may reduce latency. In this mode, the key-store is stored in a low latency key-value store such as RocksDB, which supports point lookups and updates.
Many of our database datasets have a low change rate, but a large dataset size. This means that only a small portion of the key-store is updated during each ingestion batch. For these datasets, the inline metadata processing can be more efficient than the scan-based processing that occurs with the Spark MetaGen job.
The high-level logic is below:
For each ingested record:
Load the RocksDB key-store partition from the distributed data store to the mutable local file system for the record's key if it has not been loaded.
Look up the key.
If the key is found, then this is an update, so emit both a SET_VALID_FROM and a SET_VALID_UNTIL log entry.
If the key is not found, then this is an insert, so emit a SET_VALID_FROM log entry.
If the ingested record has a delete marker, then emit a SET_VALID_UNTIL log entry.
Update the key-store to associate the record's row id with the record's key.
Push the updated RocksDB key-store partitions from the local mutable file store to the distributed data store.
Push the segment log entries to the distributed data store
Update the current bitmap to reflect the new segment log entries.
Figure 6: Reader operations
All Opal data files are stored under a base directory. Inside this base directory, the key-store, segment log metadata, and Opal versioning metadata are stored in sub-directories.
Steps for reading:
Filter the list of files to process only the active files. There may be inactive files due to compaction.
File splits may be generated for parallel processing in a distributed engine. This may be done from within Hive, Spark, Trino, etc.
For each record that is read:
Load the bitmap for the segmentIdPart and segmentIdSeq if required.
For current-time reads, the current bitmap is loaded if it has been pre-generated.
For point-in-time reads, a base bitmap before the point-in-time is loaded, then segment log entries are applied to bring the bitmap to the point-in-time state.
Check the offset for the record to see whether it is active. If the bitmap has a 0 for the offset, then the record is an invalidated record and is discarded.
Both Opal data and metadata occupy storage space. Opal chose to add background compaction jobs (with retention) to ensure the storage usage is not impacted by the growth of the ingested files. Also, there are two types of compaction jobs running in batch mode: minor compaction and major compaction.
The minor compaction doesn’t rewrite the history, but instead merges the historical files in a certain way that best serves the reads. Both data and segment logs require compaction.
Data: All compacted data files are written as HDFS block-size files. This will reduce the total number of files if retention for old files is set.
Segment logs: When mutation happens, log events for the same segment, identified by (segmentIdPart, segmentIdSeq) can span across multiple log files. Opal can reduce the amount of random access IO by grouping records for the same segment together.
The major compaction can purge unnecessary historical information up to a lookBack, which is a timestamp specified by the table administrator. The time between lookBack and current time is considered as a valid time-traversal window. Query for views beyond this window will be considered invalid and will result in error.
Data: Opal will only maintain records which are still visible in the time-traversal window. If a dataset has a high churn-rate, this compaction will release a lot of space.
Segment logs: Opal can remove unneeded log events beyond the time-traversal window. This will shrink the log file size.
Opal overlaps in functionality with systems like Apache Iceberg, Delta Lake, and Apache Hudi. At the time that Opal was conceptualized, only Hudi was available in open source. As one of the goals for Opal was to avoid read time merge and minimize the reliance on compaction, we did not pursue an approach based on Hudi. Iceberg and Delta Lake came out as we were productionizing our first use cases and did not support the row-level mutation that is core to the Opal design, and critical for our large base data, small change size datasets. These datasets would have substantial copy-on-write overhead. The Iceberg V2 specification has provisions for row-level deletes and the implementation is going on as of this time. LinkedIn has adopted Iceberg for some use cases, so with the V2 spec, we will evaluate ways to integrate or leverage the spec with our use cases. The detailed comparison of these related systems will be part of our future Opal blogs.
Opal continues to support LinkedIn’s low-latency use cases, such as DED at LinkedIn, spanning petabytes of data since 2020.
We’re also working to push the latency limit of Opal even further by using a streaming ingestion job in some instances. This takes the “inline metadata update” further, with stopless data landing and metadata update at the same time. Gobblin already supports the streaming data ingestion, and while we haven't integrated Opal with the streaming data yet, we don’t anticipate any technical blocks for doing so.
Meanwhile, as new use cases are onboarded, there will inevitably be some users who want to write mutation records directly to the data lake in the Opal format. The high level APIs should be well designed to help generate data and metadata without ingestion jobs. This high level API might be some SQL-like statement, such as “insert into,” or it could be a DataFrame API like Spark provides. Spark is already increasing its support for Row-level-operations, which can be leveraged to build the capabilities for Opal. In this mode, a key-store is not needed if the processing is already scanning the dataset to extract out the rows to update.
Launching Opal and leveraging it as our default database view on data lakes required a cross-functional, cross-team effort. Throughout this journey, we are grateful to our partner teams (Gobblin, Gobblin SRE, Oracle DBA, BDE, Espresso, Venice, Waterloo, and Dali) for the great work in the past years; without them, this would not have been possible.
We would like to thank the contributions from all the individuals including our LinkedIn alumni, (names in no particular order) : Eric Ogren, Ramana Ramakrishnan, William Lo, Nisarg Thakkar, Salil Gokhale, Felix GV, Bryan Hsueh, Raymond Zhang, Walaa Eldin Moustafa, Shardul Mahadik, Vishnupriya Reddy Naredulla, Sadheeshkumar Manavalan, Vijay Bais, Kumaraguru K, Vinoth Kumar, Ratandeep Ratti, Shirshanka Das, Jean-Francois Im, Zhixiong Chen, Pratap Kudupudi, and Mayank Tripathi.
Big thanks to our management: Sreenivas Thiruvambalam, Yuval Degani (alumni), Jayavel Sengodan, Jabir Ahmed, Sumitha Poornachandran, Azeem Ahmed (alumni), Chid K, Vasanth Rajamani, Eric Baldeschwieler, and Kapil Surlaker for leading through the challenges and consistent support for Opal’s vision.