I have seen several posts and tutorials on Delta Lake using “Hello World” kind of examples, where everything works wonderfully. However, as most of you know, the performance of data processing technologies changes drastically as the amount of data that it handles increases. That’s why I decided to evaluate Delta Lake in the wild, using a real world in production Spark job that processes around 100GBs of data. Here, I am going to share with you how Delta Lake helped me to fulfill the new requirements for the job, but also the disappointments I had along the way.
The job process a typical data chunk that data engineers handle in a daily basis. It process daily over 2 years of data (~100GB) and updates the dataset in two ways:
1- Inserts data from the previous day.
2- Updates a few rows from the last 6 months of data that had their status changed.
As you might have noticed, at least the first 1.5 years of data processing is completely redundant, and this was one the main motivation for revisiting this job, since it keeps accumulating more and more data to run. This is, unfortunately, a recurrent situation that data engineers have to handle when dealing with jobs coming from legacy DWH.
Now let me go through the main considerations I set while switching the job from Spark to Delta.
The push to store datasets in partitions
In this particular use case, partitioning by day is the natural step for transforming a job that stores full snapshots, to a job that process only the minimum amount of data that needs to be inserted/updated daily. Moreover, a modern data storage should rely heavily on efficient data partitioning for, among other advantages described in my previous post, enabling efficient querying of the dataset.
Now, if you decide to use Delta Lake, then you implicitly also decide to partition your data, otherwise Delta Lake doesn’t bring absolutely any useful benefit. So, point for Delta Lake!
This job runs once a day only, so ACID transactions are not necessarily a big challenge. The main situation that might affect in particular the atomicity of the dataset, occurs when the job is mistakenly triggered multiple times in a short period, thus concurrent applications are trying to upload simultaneously the same dataset. Fortunately, Delta Lake already handles it for you. Another point for Delta Lake!
As I mentioned, I would like to process only the minimum amount of data necessary daily, but still, keep old snapshots of the job available for data quality control.
In this case, the timetravel feature fits like a glove! Since Delta Lake does not really delete any delete any parquet files, but only updates the transaction logs, pointing to the files that builds different snapshots of the dataset. With timetravel we don’t need to have a second process only for reading the latest snapshot of the dataset and storing it in another directory. Delta Lake just takes care of everything for you.
A second very positive point in Delta Lake is the vacuum command, which can be used for deleting files belonging to old snapshots that can be discarded. What I find particularly great about it, is that the vacuum command substitutes the implementation of storage policies, which are often defined in places of the code far away from the job definition. This brings a tiny bit of infrastructure as code to the job definition.
Data Lake has also the manifest command, which creates a manifest file telling which files should be included or excluded for building up the latest snapshot and storing it in the target DWH where the data is exposed to end users.
This is a critical overview of Delta Lake, but at this particular point, I did not face any situation where Delta Lake doesn’t satisfactorily fulfill the requirements I have. A loud and long applause to the Delta Lake team!
Delta Processing: Upsert
For processing only the minimum amount of data necessary in my use case, I need an upsert operation for inserting the new fresh data and updating rows in the table that had their traits changed. As you may know, Delta Lake fully supports that!
Updating rows can be quite complicated, since its an ambiguous operation if more than one row from the source dataset tries to update rows in the target Delta Lake table. Of course, whenever you have this situation the upsert operation fails, because Delta Lake does not know which row from the source dataset it should use in the update.
This intrinsic characteristic of the update operation prevents the creation of duplicates in the dataset, thus increasing substantially the data quality. Needless to say that this situation is completely overlooked in typical join operations, where duplicates will propagate freely and usually increase drastically. You can still partially prevent these duplicates in the later case, by using the expensive dropDuplicates method in Spark. However, this will consume a good chunk of the resources you have available for running the job. Nonetheless if the job has not full duplicates, but almost duplicates that should still not be propagated to the final dataset, then dropDuplicates will not help you, and you need to manually create a transformation for detecting and removing them.
However, even though upsert brings the quality of the dataset to another level, I decided in the end not using it. Why? Well, unfortunately, the upsert operation in Delta Lake for this particular use case is terribly slow. Surprisingly, when I tried it out, the job started to take longer to finish, than the original job that processed full 2 years of data. In fact, when I switched to a join-like solution that involved reading the full last 6 months of data from the target table and reprocessing it (to 99% redundantly) for updating the correct rows, the job finished 3x faster. What most probably made the upsert so slow in this case, is that only a few rows distributed over a large number of partitions were being updated, consequently Delta Lake had to load A LOT of data for performing the upsert. The performance of upsert is much better though if you can restrain effectively the number of partitions that needs to be updated.
A second problematic aspect of upsert: It generates a lot of small files in each of the altered partitions of your dataset, and unfortunately the developer has really no control over that. This is another big blow, since having an excessively large number of files leads to poor query performance on this dataset. You could still run a subsequent job for compacting the dataset, as proposed in https://docs.delta.io/latest/best-practices.html#compact-files . However, in my particular use case, compaction was also surprisingly very slow, bringing another big blow to the total runtime.
Delta Lake is a great technology that brings ACID transformations to ETL pipelines and a lot of very mature features that makes the life of Data Engineers a lot simpler. This alone fully justifies a switch to Delta Lake, especially if you are dealing with streaming jobs, where keeping ACID transactions might be a lot more complicated. On the other hand the performance of upsert for the case where several partitions have to be updated is still very poor (in this particular case 3X poorer), and in the end you still need an extra transformation for compacting the data. That’s quite a shame since as discussed before, upsert ensures a better data quality awareness of your table. However, you can get rid of those duplicates by good engineering, but tedious.
By the way, Delta Lake is still quite new, and useful use cases are not well described in the internet. So it would be awesome if you could also describe your experiences with Delta Lake in this post!