How not to collide with an Apache Iceberg
Lessons learned facing a challenging use case
Published Sep 15, 2024
Apache Iceberg is an open source table format, designed to store large data tables. It It is based on simple files that can be stored anywhere, and works well with data processing and analytics engines like Apache Spark, Hive, Trino and similar tools.
Iceberg tables are designed to be scalable, durable, and high-performing.
The format supports a variety of data formats, including JSON, CSV, and Parquet. Tables can be partitioned, which allows for more efficient storage and retrieval of data using also parallel processing, if needed.
The basic concept on which Iceberg is based is dividing the dataset into smaller chunks, each of which is represented as a separate data file (the lower layer in the diagram below).
Each data file is assigned a unique ID, and each file can be linked by one or more manifest files, each of which contains a row for each underlying data file, together with its metrics.
Groups of manifest files are then stored into manifest list files, each one containing references to multiple manifest files, together with stats and data file counts.
These layers of metadata are used like a sort of index to avoid reading manifests and data that are not required for an operation.
When a user wants to access a specific record in a dataset, they can specify the record's ID and Iceberg format will only retrieve the chunk that contains the record. This reduces both storage usage and retrieval times, because only the necessary chunks need to be accessed.
Apache Iceberg offers many advantages that makes it a great solution for data warehouses and data lakes.
When the schema of the data is updated, these changes are just metadata changes, and data files don't need to be rewritten to consolidate the update.
Hidden partitioning
Hidden partitioning
Iceberg supports a variety of partitioning strategies, and tables can be partitioned on multiple columns. Partitioning offers the benefit of more efficient querying and updating of the data, because it can quickly locate the subset of data files that is relevant.
In addition to this, Iceberg handles hidden partitioning, meaning that the client doesn't need to be aware of it, and can normally query the data, without thinking explicitly to partition-dedicated columns.
Also, if you need for any reason to change the partitioning, you can fix your table without having to plan an expensive migration, because the partition scheme can change over time, and Iceberg can handle this without ever touching the existing data.
Iceberg stores data as a series of snapshots. Each one is a point-in-time snapshot of the data, which can be used to restore the state of the data in that specific instant. This pattern makes possible time travel, which means that you can query the data at any point in time, not just the current content at query-time.
It's up to you to decide how many snapshot you want to keep, configuring Iceberg according to your needs.
In the context of a web application for the monitoring of fleet of vehicles, our need was to store about 1 million records every day, and to make them available for at least 10 years. Each record is a medium-length data structure that can vary from few to some kilobytes.
Some clients are owners of tens of record per day, other clients can have tens of thousands of records per day, and others admin clients should have visibility to the whole dataset.
The goal was also to make clients able to get their data on a per-day basis, but also to get data near-real-time, meaning new data being available just a few minutes after it is produced in the main platform.
As usually happens in the IT world, ideally cost and complexity should be as lower as possible, while performance and scalability should unexplainably tend to infinite.
Our plan was to find a compromise for those requirements using Iceberg on S3 as a data store, and then exposing the data through an API with mandatory parameters to force the clients to trigger data queries that leverage the partitioning columns at every request.
To validate our plan, we started a POC that could then easily have been converted in a production feature.
It was our first experience with Iceberg, so we needed to learn by doing.
We created a new AWS Glue table on an S3 bucket, configured to use Apache Iceberg as Table Format, and Parquet for File Format.
We configured the partitioning on 2 data columns:
- the company that owns the entity
- the calendar day from the record timestamp
We implemented a writing process this way:
- Data is produced in the platform, through many decoupled different pipelines
- A Flink job produces the aggregate that represents our final denormalized record
- Records are saved into the S3 Iceberg table defined in AWS Glue
We built and deployed the artifacts, and at every data flush performed by Flink we started seeing new metadata files like these:
and a lot of new new data files:
Everything seemed to be working as expected.
The query pattern used by our clients, based on company and date, could leverage the partitioning and gave extraordinary good performance while being very cost-effective.
For example, launching a query with Athena we could get very good reading perfomance, and Athena scanned only an incredibly small amount of data, in proportion to the whole table size.
We expected that this first release would have had a problem due to our feeding strategy, and in fact, after a few days, while the table was growing, we started seeing a little degradation in performance.
Even if the size of the scanned data was still very low, the performance was getting worse.
Our writing process was writing the data on average every 5 minutes, and the partitioning was per day. As a consequence we had a lot of small files, and a lot of metadata references to them.
After a few days feeding the table in fact, the size of the metadata was about 90% of the data folder size.
The analysis of the metadata had became far more complex and costly, even if the real data that was needed to be read was very small.
We replaced the table with a new one, then we enabled the compaction managed function on the Glue table, confident that this would have had resolved the problem.
After a few days we queried Athena about the size of the metadata, calling this query:
The number of manifests was actually smaller than before, and the performance was good and not giving any sign of degradation.
So far so good. But... then we made a double check and launched also a "Calculate total size" on the metadata and data folders.
Interestingly, the S3 folders were not smaller than before but were far bigger. The increase was a scary 500% in number of files, and 150% in size.
Of course we were missing something, and the cost of the S3 storage would have been unsustainable.
The problem was an oversight reading the documentation, because the managed compaction, by design, has just the responsibility to rewrite data files into a more optimized layout, based on their size and number. In other words, if inside a partition there is a high number of small files (the exact number and size depends on config parameters) the job rewrites them as a single data file, simplifying also the metadata references to that chunk of data, that is now actually "compacted".
What we missed was that compacting the data doesn't imply that the orphan files are deleted, exactly as happens with some RDBMS, where deleting records does not mean that you are freeing disk space.
So, even if the metadata complexity was low, the performance was good, and the compaction was optimizing the data reducing its fragmentation, the system was leaving behind on S3 a huge quantity of junk files.
We ended up creating a Lambda function that continuously executes a VACUUM command, so that all expired snapshots and all orphan files (both data and metadata) are physically removed from S3.
Comparing the first implementation "just write data" and the final solution "optimize+vacuum" this is the improvement measured on S3:
- metadata files/day dropped to 1/250
- metadata size/day dropped to 1/140
- data files/day and size/day dropped to 1/3
The downside is that compaction and vacuum operations performs a lot of API calls on S3, so it is necessary to tune the parameters in order to find a good balance that works for each use case, balancing cost of storage, cost of API calls, optimal file size for Iceberg to work good with your access pattern.
The road to mastering Apache Iceberg is long and hard for newbies, but for sure it's a game-changer technology.