Optimizing CTR Model Training on 1B+ Datapoints
How I took a CTR model training job from 2 days down to 15 hours.
Tanner McRae
Amazon Employee
Published Jul 31, 2024
Training on larger datasets can be time intensive. When optimizing these types of jobs, the problems usually begin at the data loading. You typically start with the data loader using techniques like interleaving, prefetching, and using multi-gpu / multi-node training, etc.. But what do you do when you’ve exhausted all of those options?
One of the more interesting problems I’ve worked on was optimizing a click through rate (CTR) model training job. The model was written in Tensorflow 2 & trained on 3TB of tabular data (roughly 1 billion datapoints).
After a couple iterations, I ultimately got the training time down from ~2 days to 15 hours.
This blog will go through some of those optimizations and discuss what worked.
There are a number of different architectures that show up for CTR use cases. To date, there’s no single best architecture. Model choice is use case and data dependent. Some examples of common deep learning architectures I see are Deep & Cross Network (DCN), Deep Factorization Machines (DeepFM), Wide & Deep (W&D).
One common theme across these workflows is that the models tend to be on the smaller side to keep inference cost down. It’s not uncommon to see 1MM+ requests per second (RPS) for larger companies so the smaller models save on resources.
The initial training job was taking ~2 days to complete. The model was a typical architecture for CTR and was around ~4MM parameters. When I started the project, the ask was to reduce the training time so that an engineer could kick off a training job at the end of the day, and have the results the next morning.
The data loader (tf.data APIs in Tensorflow) was already fairly well optimized using interleave(), batch(), and prefetch(). The data itself was stored in Gzipped TFRecord files that were around 1GB each.
Now that we’ve discussed the background, lets talk about the problems in the training job and what was done to resolve them. Ultimately the changes below took the training time from 2 days down to 15 hours.
When optimizing a training job, problems are often masked by other problems further upstream and there’s no single root cause for a slow down. In this case, there were 4 problems. Solving one problem surfaced another.
The problems can be broken down into 4 categories:
- I/O
- Slow pre-processing in the data loader
- GPU communication overhead
- Kernel launch times
The initial latency issue was caused by I/O. Pulling 3TB of data from storage (regardless of sharding and parallelism) was fast, but not fast enough to keep up with a training job and created a bottleneck.
The data loader was prefetching batches, but it couldn’t fetch the files fast enough to keep up. See Tensorboard analysis below. Periodically, steps would stall as a new file was pulled.
The solution was to simply use a faster storage option. An Elastic Block Store (EBS) volume or Amazon FSX for Lustre both resolved the issue. Generally, I recommend using FSX for Lustre if you plan on running multiple model training jobs using the same data. EBS (using GP2) is cheaper for 3TB of data so if you’re cost conscious and aren’t running multiple jobs on the same data, that’s a good option.
Using Amazon FSX for Lustre: When using FSX for Lusre, you can choose to sync the file system with Amazon S3 using a data repository association (DRA). This links the files, but doesn’t pull them down until they are requested by a caller. This means the first time the file is requested, there’s a latency penalty. You can preload these files using hsm_restore to mitigate the latency penalty on the first epoch.
Using EBS: If using EBS, make sure to partition it using GPT vs MBR. MBR caps out at 2Tb per partition which makes a 3TB dataset difficult to work with.
Note: Some Amazon EC2 instances (like a g5.12xl) have 3.7Tb+ of instance storage. This storage is not persistent and the data will be deleted if the instance stops. I wouldn’t recommend this approach but, for the extremely cost conscious, you could use this storage to run your training job & keep your data backed up somewhere else (S3 for example). An EBS volume would be safer, but using instance storage could save you some money if you’re willing to take the risk.
Once I/O was resolved, slow preprocessing in the data loader became the next bottleneck. To resolve this, I did 4 things. (1) Unzipped the *.tfrecord files, (2) increased parallelism in the data loader, (3) removed unnecessary casting layers (ex. converting float64 to float32), and (4) increased the batch size.
This is unintuitive, but there’s significant CPU overhead in unzipping files as a preprocessing step. Now that we’re using a faster storage option, the 3Gb saved per compressed file became negligible. To solve this, just leave the TFRecord files unzipped.
In Tensorflow, you typically interact with your dataset using the tf.data APIs. You have a couple values you can play with. The optimal functions and parameters for each are use case dependent, but generally you want to use interleave(), batch(), map(), and prefetch().
Each of these functions has a num_parallel_calls parameter. For this particular model I used 16 but your mileage may vary depending on your instance type.
Secondly, in interleave() you also have block_length, and cycle_length. For large sharded datasets, increasing block_length reduces the number of file opens needed to prefetch your batches which provided a marginal decrease in preprocessing time.
It’s not uncommon to step into a code base and see Tensorflow code that look like this:
When using the casting layer above, it creates two issues. (1) Tensorflow has to create brand new tensors for all these values which takes time and (2) it creates issues with Python’s garbage collector. The memory consumption of the model as the batch size increases becomes huge and causes the training job to periodically hang while the garbage collector catches up. Similar things happen in Pytorch. See link.
When you have large tensor casting layers, you’ll often times notice gaps in between training steps. See tensorboard screenshot below
Zooming into those gaps, you’ll see a LOT of “convert_to_tensor” calls which are caused by the casting.
To resolve this, we just need to remove these casting layers and pre-process the data outside of the training job so that they’re in the desired precision before the training job starts.
Now that the casting / memory consumption issues were resolved, you can increase the batch size to reduce the # of calls between the host and devices.
Adding multiple GPUs becomes a bottleneck at a certain point. When that happens is generally correlated to the size of the model & speed of each training step. For smaller models (like the one we’re discussing) that tends to happen faster than something larger.
When running Nvidia GPUs, they communicate through NVIDIA’s Collective Communication Library **(**NCCL). This communication overhead increases as you add more GPUs and is also affected by your networking configuration.
For a ~5MM parameter deep network, the optimal number of GPUs ended up being 4 A10s.
Some things that didn’t work:
Using 8 gpus caused all_gather and all_reduce (NCCL operations) overhead to overtake the efficiency gains of training on 8 gpus. Running multiple 4-gpu nodes made the problem even worse since all_gather and all_reduce were happening over a network call.
I also spent some time modifying the NCCL environment variables. Ultimately they had little effect on the time. I also tried linking 2 EC2 instances together in an EC2 placement group / enabled elastic fiber adapter (EFA) to split up the data loading across 2 machines. The overhead of multi-node communication was substantially slower than just using 4 GPUs on a single node.
Takeaway: For smaller models, sometimes a lower # of GPUs is just more efficient.
For Loop in MemcpyH2D
Another interesting issue found, is that Tensorflow’s MirroredStrategy uses a pythonic for loop to copy data from the host to device. Because each batch was large, the job was getting hung up on transferring 8 large batches sequentially to each gpu.
See the Tensorboard screenshot below. MemcpyH2D is happening iteratively for each GPU slowing down the whole training job. If you scroll down to each GPU, you can see the MemcpyH2D happing iteratively.
When you have this much parallelization in the data loader, Tensorflow spins up a lot of threads. For I/O operations, these threads end up in an “uninterruptible sleep” or “D” in the htop screenshot below. This means that the I/O operations are consuming a lot of the CPU resources leading to bottlenecks in other parts of the system.
What happens, is that all these threads doing I/O contend with threads that are spinning up GPU kernels causing higher launch times. These kernel launch operations get backed up.
Below is a screenshot of $ htop on a G5.12XLarge.
To resolve this, I just added more vCPUs. Upgrading to a G5.24XLarge instance resolves this. These I/O operations are necessary for Tensorflow to get the throughput it needs so vertical scaling is the logical choice.
The last piece of the puzzle is to simply reduce the number of kernels being launched. I compiled the model using a deep learning compiler called “accelerated linear algebra” or XLA. Compiling with XLA creates a much more efficient model by optimizing the computation graph and fusing kernels together. Fusing kernels means that there are simply less kernels to launch which brought that launch times down from 40–50% to 12.5%.
Optimizing a training job can be tricky. There’s lots of documentation on improving training time, but for large tabular datasets (1B+) with small model (3–10MM parameters) you have to get a little creative.
Hopefully you found some value out of this blog and it helps you with future training optimizations.
Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.