AWS Logo
Menu
Analyzing Iceberg CDC data using graphs in Apache Spark

Analyzing Iceberg CDC data using graphs in Apache Spark

Analyzing Iceberg CDC data using graphs in Apache Spark

avichay marciano
Amazon Employee
Published Dec 29, 2024
Apache Iceberg is the cool kid on the block when it comes to big data tables. If you’re already using it, chances are you’re also doing some regular housekeeping—like expiring older snapshots to keep storage costs from ballooning like a bad cloud budget. But wait! Before you hit "delete" on those snapshots, let me show you how they’re more than just table clutter—they’re a goldmine of insights. With a little help from the changelog_view procedure and some graph magic in Apache Spark, you can dig deep into your data without the complexity of running your own SCD Type 2 incremental tables.
Let’s start by getting our foundation straight. If you’re using Apache Iceberg, you’ve probably encountered its change query capabilities introduced in v2. These queries are a game-changer for analyzing CDC (Change Data Capture) data. However, the approach I’m about to show you isn’t universally applicable—it works best if you’re using Iceberg in COW (Copy-On-Write) mode. Why? Because, as Jack Vanlightly points out, Iceberg’s current implementation for generating CDC data relies on comparing data and metadata files between snapshots. In COW mode, delete operations overwrite data files directly, which makes CDC extraction clean and straightforward.
On the other hand, MOR (Merge-On-Read) mode introduces delete files—external files that log what’s been removed without modifying the original data files. Unfortunately, these delete files aren’t yet processed in the changelog generation, meaning your results might be incomplete or misleading. this limitation however is a work in progress in the Iceberg community. But for now, let’s focus on leveraging the clean simplicity of COW mode to extract some valuable insights.
In this blog, we will dive into analyzing Change Data Capture (CDC) data using the GraphFrames library. GraphFrames, open-sourced by Databricks, is an advanced tool that allows us to perform stateful traversals and graph-based computations on structured data. We will use it later to analyze the changes in our dataset. But first, we’ll start by setting up our base table and applying a series of updates to simulate real-world scenarios.

Setting up our scenario

We begin by creating a table that represents our product catalog. This table contains information about various products, including their ID, name, category, price, status, and version. Below is the code to create the table and insert some initial test data:
And this is our resulting base table:
Now , for some updates:
Competitor Launches New Product: Price Adjustment for Laptops
Due to competitive market pressure, we reduce the price of laptops by 10%. The version of each affected row is incremented to track the change.
Related Accessories Price Adjustment
Accessories related to the laptops experience a price drop of 15%. This change reflects a bundled marketing offer.
Supply Chain Issue Affects Premium Products
Due to supply chain disruptions, premium products are marked as having limited availability.
Price Increase for Limited Availability Items
Items with limited availability see a price increase of 10% to reflect higher demand and lower supply.
Generating our changelog
We can now generate our changelog and observe the changes :
We can also call this procedure to run incrementally , by providing a map for specific snapshots , or unix timestamps :
options => map(‘start-snapshot-id’, ‘1’, ‘end-snapshot-id’, ‘2’) OR map(‘start-timestamp’, ‘1678335750489’, ‘end-timestamp’, ‘1678992105265’),
and Let’s observe the outputs for product id=1 :
Already, we can see the value in extracting insights from our changelog before deleting our snapshots. The changelog provides a detailed historical record of changes to our data, making it a powerful resource for downstream analysis and decision-making.
But let’s take it up a notch. Imagine the analysis we wish to perform is to introduce another column for each product ID that indicates the number of updates that specific record went through in the last 5 minutes. This type of analysis could uncover patterns such as frequent updates to specific products, potentially highlighting operational issues or volatile demand trends.
This task is inherently complex. Using SQL alone to achieve this would require extremely complex recursive Common Table Expressions (CTEs) and multiple self-joins to trace relationships between rows and calculate the desired metrics. Moreover, to account for the time dimension, we need to enrich our changelog with timestamp information to track when each change occurred.

Leveraging the Snapshots Table

To introduce the time dimension, we will join the changelog data with the Iceberg snapshots table, which contains metadata about the commit times of each snapshot. By joining the changelog with the snapshots table, we can add a committed_at column to track when each change occurred:

Enter Graph Analysis

This is where graph analysis truly shines. By transforming our enriched changelog into a graph structure, we can efficiently represent relationships and dependencies, making it ideal for tasks like tracking updates over time. Using the GraphFrames library ( Notice however that the version in this documentation and in maven – 0.8.0 is not the latest one available in the spark-packages repository, which is 0.8.4 ) we can model our data as a graph and perform powerful analyses, leveraging its built-in capabilities for traversals, aggregations, and more.
To achieve this, we will create a graph representation of our enriched changelog:
  • Vertices: Represent individual product states (e.g., versions of the same product) as nodes.
  • Edges: Represent transitions or updates between states over time, connecting the vertices.
Let’s walk through the code step-by-step to build this graph and prepare it for analysis.

Step 1: Create Vertices

We start by extracting the product states from the changelog to form our vertices. Each vertex will have:
  • A unique ID (internal_id).
  • Metadata such as category, status, version, and the timestamp (event_timestamp) of the change.
  • A chain_length initialized to 1, which we can use later for traversals or aggregations.
Here’s how we define the vertices:

Step 2: Create Edges

Next, we define edges that connect the vertices. Edges represent the transitions between product states over time. We use a self-join on the vertices to identify transitions, with conditions such as:
  • Vertices must share the same category.
  • The timestamp (event_timestamp) of the source vertex must precede the destination vertex.
  • Optionally, filter transitions based on a maximum time difference (e.g., updates within 5 minutes).
Here’s how we define the edges:

Step 3: Build the Graph

Using the GraphFrame API, we combine the vertices and edges into a graph structure:

The Punchline: Using Pregel for Stateful Graph Analysis

Graph analysis becomes truly powerful when combined with iterative algorithms like Pregel, which allows us to propagate information across the graph over multiple iterations. Pregel operates by sending messages along edges, updating vertex values based on the incoming messages, and repeating this process until convergence or a set number of iterations is reached.
In this example, we use Pregel to calculate the maximum chain length of updates for each product, leveraging the structure of our graph. Let’s break this down step by step:

Pregel Basics

Pregel follows a simple but effective three-step model:
  1. Vertex Update: Each vertex updates its state based on its current value and incoming messages.
  2. Message Passing: Messages are sent along edges to neighboring vertices.
  3. Aggregation: Messages are aggregated (e.g., summed, maxed) to compute the next state for each vertex.
This iterative process continues until:
  • A fixed number of iterations is reached, or
  • No further updates are happening (i.e., the graph converges).

Pregel Implementation in Our Use Case

Here’s how Pregel is used in our graph to compute the chain length of updates:
Let’s explain each step in detail:
Step 1: Define Vertex Updates with withVertexColumn
Update our vertex value (updated_chain_length) during each Pregel iteration
  • Take the maximum between:
    • The vertex’s current chain_length.
    • The incoming message (_pregel_msg_) if it exists; otherwise, fallback to the vertex’s chain_length.
Step 2: Send Messages with sendMsgToDst
Send messages along edges to propagate chain lengths to downstream vertices, and propagate the chain length information downstream to build the longest chain for each product.
  • Check if:
    • The source vertex’s timestamp (src.event_timestamp) is earlier than the destination vertex’s timestamp (dst.event_timestamp).
    • The source and destination belong to the same category (src.category == dst.category).
  • If conditions are met:
    • Send the source vertex’s updated_chain_length incremented by 1.
Step 3: Aggregate Messages with aggMsgs
Aggregate the incoming messages at each vertex Ensure that each vertex receives the most significant chain length propagated to it during the current iteration:
  • Take the maximum of all incoming messages (_pregel_msg_).
Step 4: Set the Number of Iterations with setMaxIter (.setMaxIter(7))
Limit the Pregel algorithm to a maximum of 7 iterations ( Prevent infinite looping and ensure the algorithm terminates within a reasonable number of steps)

Examining the results

the output of our pregel algorithm run is :

The output represents the final result of the Pregel graph processing applied to our changelog data. Let’s break down each column and how the updated_chain_length provides valuable insights into the update history for each product:

Example Rows

  • Row 1 (id = 1):
    • Product in the “Electronics” category started as active with version = 1 at 10:08.
    • It had no prior updates, so updated_chain_length = 1.
  • Row 4 (id = 4):
    • The same product has evolved to limited with version = 4 at 10:09.
    • The updated_chain_length = 4 indicates that this product went through 4 sequential updates.
  • Row 12 (id = 12):
    • Product in the “Accessories” category with id = 4 reached limited status at version 4.
    • Its updated_chain_length = 4 reflects 4 total updates for this product.
  • Row 13 (id = 13):
    • A product in the “Accessories” category with id = 5 remained at version 1 with updated_chain_length = 1.
    • It shows that this product did not undergo any updates.

The Pregel Advantage in a Nutshell

By using Pregel, we:
  • Transformed sequential updates into stateful chains without repetitive joins or complex logic. The same methodology logic be applied to achieve very complex analysis for any dataset
  • Propagated the chain length across versions, ensuring each product’s update history is accurately reflected.
  • Reduced the computational complexity and improved scalability compared to SQL-based solutions.
This approach unlocks insights from our data, enabling us to focus on analysis rather than wrestling with complex query logic. This makes Pregel and graph processing an invaluable tool for scenarios like tracking update histories, identifying anomalies, and analyzing time-dependent relationships at scale.
 

Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.

Comments