Cache vs. Persist in Databricks
What are “Cache” and “Persist”?
“Cache” and “Persist” are nothing but the “API”, provided by “Apache Spark”, or, the “Spark Functions” through which it is possible to “Store” the Data either in the “Memory”, or, in the “Disk”, or, in the “Combination” of both.
What is “Cache”?
“Cache” is very common across all the Databases, and, it is available in “Apache Spark” also.
“Cache” is a “Programming Mechanism” through which it is possible to “Store” the Data in the “Memory” across the “Worker Nodes” in a “Cluster”.
When to Use “Cache”?
When it is desired to “Store” the Data only in the “Memory”, then “Cache” can be used.
What is “Persist”?
“Persist” is another “Programming Mechanism” through which it is possible to “Store” the Data either in the “Memory”, or, in the “Disk”, or, in the “Combination” of both across the “Worker Nodes” in a “Cluster”.
Why to Use “Cache” and “Persist” at All?
Since, as per the Spark Architecture, it is already performing the “In-Memory Computation”, then the question arises of why to use the “Cache” and “Persist” Functions/APIs at all.
To answer the above question, first it is required to understand the concepts of “Transformation” and “Action”.
Since, the “Transformations” are lazily evaluated in “Spark”, when some “Transformations” are submitted in “Spark”, only the “Logical Steps” of those “Transformations” would be kept under the “Directed Acyclic Graph” (DAG). But, the execution of those “Transformations” would start only when an “Action” is called.
Example: if there are 1000s of “Transformations”, and, at the end an “Action” is called, then the “Action” will “Trigger” the actual Data Processing, i.e., Starting from the “First Transformation” all the “Transformations” will be applied one by one, and, finally the “Action” will be applied.
That means, only when an “Action” is “Triggered”, the “In-Memory Computation” will start.
But, if it is desired to execute some “Transformations” intermittently without executing an “Action”, and, to “Store” the Data either in the “Memory”, or, in the “Disk”, then the “Cache” and “Persist” Functions/APIs can be used.
Understanding Why “Cache” and “Persist” is Required with a Simple Example
Suppose, the DataFrame “df” contains 100 Columns for the “Employee” Table.
1. The “First Transformation” is to select only the required 5 Columns out of the 100 Columns.
df_1 = df.select(df.emp_name, df.emp_doj, df.emp_dob, df.emp_salary, df.emp_status)
df_1 = df.select(df.emp_name, df.emp_doj, df.emp_dob, df.emp_salary, df.emp_status)
Now, this “Transformation” is a “Lazy Evaluation”, meaning that when this “Transformation” is applied, it doesn’t get executed immediately. Instead, the “Logical Steps” required to execute this “Transformation” would be kept under the “Directed Acyclic Graph” (DAG).
2. The “Second Transformation” is to apply a filter on the DataFrame “df_1”.
So, the Source DataFrame for the Output DataFrame “df_2” is “df_1”.
df_2 = df_1.filter(df.emp_status = “active”)
df_2 = df_1.filter(df.emp_status = "active")
3. The “Third Transformation” is to create a “New Column” using the DataFrame “df_2”.
So, the Source DataFrame for the Output DataFrame “df_3” is “df_2”.
df_3 = df_2.withColumn(“bonus”, df_2.emp_salary * 0.1)
df_3 = df_2.withColumn("bonus", df_2.emp_salary * 0.1)
4. The “Fourth Transformation” is to create another “New Column” using the DataFrame “df_2”.
So, the Source DataFrame for the Output DataFrame “df_4” is “df_2”.
df_4 = df_2.withColumn(“no_of_years_of_service”, current_date () — df_2.emp_doj)
df_4 = df_2.withColumn("no_of_years_of_service", current_date () - df_2.emp_doj)
5. The “Fifth Transformation” is to create the final “New Column” using the DataFrame “df_2”.
So, the Source DataFrame for the Output DataFrame “df_5” is “df_2”.
df_5 = df_2.withColumn(“emp_age”, current_date () — df_2.emp_dob)
df_5 = df_2.withColumn("emp_age", current_date () - df_2.emp_dob)
After the series of “Transformations”, an “Action” is getting called.
df_3.count()
df_3.count()
Now, when the DataFrame “df_3” is called for an “Action”, the control flow of the program would go to “df_3”.
Since, the DataFrame “df_3” is dependent on the DataFrame “df_2”, the control flow of the program would now go to “df_2”.
Again, the DataFrame “df_2” is dependent on the DataFrame “df_1”. So, the control flow of the program would now go to “df_1”.
The DataFrame “df_1” is dependent on the DataFrame “df”. If there is any previous step, then the control flow of the program would go to that step.
So, first the operation on the DataFrame “df” will be executed, then the operation on the DataFrame “df_1” will be executed, followed by the operation on the DataFrame “df_2” will be executed, and, finally, the operation on the DataFrame “df_3” will be executed.
Now, in later point of time, “Actions” can also be called on the DataFrame “df_5”, or, on the DataFrame “df_4”. Calculating the DataFrame “df_5”, or, the DataFrame “df_4” is dependent on the DataFrame “df_2”.
So, it can be seen from the above image that the DataFrame “df_2” is getting “Re-Computed” again and again. This is a “Time-Consuming Process”.
But, in order to increase the performance, it is desirable to “Calculate” the DataFrame “df_2”, and, “Store” the “Intermediate Result” within the “Memory” or “Storage Disk” of the “Worker Nodes” in the “Cluster”.
Then, when the “Actions” will be called on the DataFrame “df_5”, or, on the DataFrame “df_4”, or, on the DataFrame “df_3”, the control flow of the program will directly check for the DataFrame “df_2” that is already “Calculated”, “Computed”, and, “Stored” within the “Memory” or “Storage Disk” of the “Worker Nodes” in the “Cluster”.
The DataFrame “df_2” will not be “Re-Computed” anymore.
Why Not All the Intermediate Transformations are Cached or Persisted?
If the “Intermediate Results” of all the “Transformations” in a “Spark Application” are “Cached”, or, “Persisted”, then definitely it should improve the performance and the “Spark Application” should be super-fast, but that is not the case.
But, any “Cluster” has a “Limited Memory”. Suppose, all the “Worker Nodes” in a “Cluster” would contain “128GB RAM”. That means, any “Worker Node” in that “Cluster” would accept at max “128GB Data”.
But, the entire “Memory” can’t be used to “Store” the Data. Some “Memory” of any “Worker Node” in that “Cluster” would be required to perform the “Calculations” as well.
So, if the “Intermediate Results” of all the “Transformations” in a “Spark Application” are “Dumped” into the “Memory” of all the “Worker Nodes” in a “Cluster”, then there will be “Shortage” of “Memory” for the “Computation” in the “Worker Nodes”. Hence, the “Spark Application” will throw “Out of Memory Error”.
That is the reason for being calculative to select the Data to “Cache” or “Persist” into the “Memory”.
So, if the “Memory” of all the “Worker Nodes” in a “Cluster” are “Exhausted” by “Storing” the “Intermediate Results” of all the “Transformations” in a “Spark Application”, then the “Spark Application” will not display a better performance because there will not be enough “Memory” to perform the “Computations”.
Different Storage Levels of “Persist”
The following are the different “Storage Levels” provided by “Apache Spark” for “Persisting” the Data -
- MEMORY_ONLY: This “Storage Level” option is equivalent to the “Cache”.
In “cache”, the Data would be “Stored” within the “Memory” of the “Worker Nodes” in a “Cluster”.
Similarly, when this “Storage Level” option is provided to the “Persist”, the “Intermediate Results” of the “Transformations” will be “Computed”, and, the Data would be “Stored” within the “Memory” of the “Worker Nodes” in a “Cluster”.
What Happens If the Data to Cache Does Not Fit into the Memory?
Suppose, the size of an “Intermediate Result” of a “Computed Transformation” is “150 GB”, and, the size of the “Memory” of a “Worker Node” in a “Cluster” is “128 GB”. So, the size of the Data is huge compared to the “Memory” of the “Worker Node”.
Under the circumstances, when the “Intermediate Result” of the “Transformation” is “Computed”, and, “Stored” into the “Memory” of a “Worker Node” in a “Cluster”, only the “Partial Data” would be “Stored” in the “Memory”, and, the “Rest” of the Data would not get any space in the “Memory”, and, would always be “Re-Computed” using the “Directed Acyclic Graph” (DAG).
So, when the “MEMORY_ONLY” option is used with “Persist”, or, when the “Cache” is used to “Store” the “Intermediate Results” of the “Transformations” into the “Memory”, and, the size of the Data does not fit into the “Memory”, then only the “Partial Data” gets “Stored” into the “Memory”, and, the “Rest” of the Data would always be “Re-Computed” using the “Directed Acyclic Graph” (DAG).
Using “MEMORY_ONLY” option with “Persist” is not “Space-Efficient” as the Data is not “Serialized”, which does not reduce the space of the Data “Stored”.
But, using “MEMORY_ONLY” option with “Persist” is “Time-Efficient”, because, while reading the Data from the “Memory”, it does not need to be “Deserialized” to be used in the “Spark Application” further as the Data is already “Deserialized”. So, no overhead process on CPU is required.
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.MEMORY_ONLY)
2. MEMORY_ONLY_SER: This “Storage Level” option is similar to the “MEMORY_ONLY” option.
The “MEMORY_ONLY_SER” option will “Store” the “Intermediate Results” of the “Transformations” into the “Memory”, and, if the size of the Data does not fit into the “Memory”, then only the “Partial Data” gets “Stored” into the “Memory”, and, the “Rest” of the Data would always be “Re-Computed” using the “Directed Acyclic Graph” (DAG).
The only difference between the “MEMORY_ONLY” and “MEMORY_ONLY_SER” options is that the latter option “Serializes” the Data to be “Stored” into the “Memory”.
Benefit of Serializing the Data to Store in the Memory
By default, when any Data is “Stored” into the “Memory”, that would always be in the “Deserialized” form.
“Deserialized” is a “Space-Consuming Storage Type”.
But, when the “MEMORY_ONLY_SER” option is used to “Store” the “Intermediate Results” of the “Transformations” into the “Memory”, the Data is first “Serialized” into the “Binary” form and then “Stored” into the “Memory”.
The “Serialized Data” would always take “Lesser Space” compared to the “Deserialized Data”. This is an advantage as it is possible to “Store” more Data into the “Memory” of the “Worker Nodes” in a “Cluster” when the “Serialization” is applied, i.e., when the “MEMORY_ONLY_SER” option is used with “Persist”.
“Persisting” the Data in a “Serialized”, or, “Binary” form helps to reduce the size of the Data. Thus, making space for more Data to be “Persisted” in the “Memory” of the “Worker Nodes” in a “Cluster”.
Drawback of Serializing the Data to Store in the Memory
To read the Data from the “Memory” when the “MEMORY_ONLY_SER” option is used with “Persist”, the Data first needs to be “Deserialized”. Only then the “Spark” can use the “Deserialized Data” throughout the entire “Application”.
This adds an overhead process on CPU. So, it takes more time to read the Data that is “Persisted” with “MEMORY_ONLY_SER” option.
Using “MEMORY_ONLY_SER” option with “Persist” is “Space-Efficient” as the “Serialized Data” reduces the space of the Data “Stored”, but, not “Time-Efficient”, because, while reading the Data from the “Memory”, it needs to be “Deserialized” to be used in the “Spark Application” further.
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.MEMORY_ONLY_SER)
3. MEMORY_AND_DISK: In this “Storage Level” option, the Data would be “Stored” in both the “Memory”, and, the “Disk” of the “Worker Nodes” in a “Cluster”.
When the “MEMORY_AND_DISK” option is used with “Persist”, as long as the Data fits into the “Memory”, the Data will be “Stored” into the “Memory” of the “Worker Nodes” in the “Cluster”.
Once, the size of the Data exceeds the size of the “Memory”, the “Rest” of the Data would be “Stored” into the “Disk” of the “Worker Nodes” in that “Cluster”.
Advantage of Storage Level “MEMORY_AND_DISK” Over the Storage Level “MEMORY_ONLY”
Using the option “MEMORY_AND_DISK” with “Persist” is always better than using the option “MEMORY_ONLY” with “Persist” because, in the former case, there is no need to “Trigger” the “Re-Computation” of the Data that could not be fit into the “Memory” of the “Worker Nodes” in a “Cluster”.
Instead, the Data that could not be fit into the “Memory” gets “Stored” into the “Disk” of the “Worker Nodes” in that “Cluster”.
How Data are Stored in Memory and Disk in the Storage Level “MEMORY_AND_DISK”?
By default, when any Data is “Stored” into the “Disk”, that would always be in the “Serialized” form.
So, when the Data is “Stored” in the “Memory”, that would be in the “Deserialized” form, whereas, when the Data is “Stored” in the “Disk”, that would be in the “Serialized” form.
Using “MEMORY_AND_DISK” option with “Persist” is not “Space-Efficient” as the Data that is “Stored” in the “Memory” is not “Serialized”, which does not reduce the space of the Data “Stored”.
But, using “MEMORY_AND_DISK” option with “Persist” is somewhat “Time-Efficient”, because, while reading the Data from the “Memory”, it needs not to be “Deserialized” to be used in the “Spark Application” further as the Data is already “Deserialized”. So, no overhead process on CPU is required. Whereas, while reading the Data from the “Disk”, it needs to be “Deserialized” to be used in the “Spark Application” further. So, overhead process on CPU is required.
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
4. MEMORY_AND_DISK_SER: This “Storage Level” option is similar to the “MEMORY_AND_DISK” option.
The “MEMORY_AND_DISK_SER” option will “Store” the Data into the “Memory” as long as the size of the Data fits within the size of the “Memory” of the “Worker Nodes” in the “Cluster”.
Once, the size of the Data exceeds the size of the “Memory”, the “Rest” of the Data would be “Stored” into the “Disk” of the “Worker Nodes” in that “Cluster”.
The only difference between the “MEMORY_AND_DISK” and “MEMORY_AND_DISK_SER” options is that the latter option “Serializes” the Data to be “Stored” into both the “Memory”, and, into the “Disk”.
Using “MEMORY_AND_DISK_SER” option with “Persist” is “Space-Efficient” as the Data “Stored” in both the “Memory”, and, in the “Disk” are “Serialized”, which reduces the space of the Data “Stored”.
But, using “MEMORY_AND_DISK_SER” option with “Persist” is not at all “Time-Efficient”, because, while reading the Data from both the “Memory”, and, the “Disk”, it needs not to be “Deserialized” to be used in the “Spark Application” further. So, overhead process on CPU is required.
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
5. DISK_ONLY: The “DISK_ONLY” option will “Store” the entire Data in the “Serialized” form into only the “Disk” of the “Worker Nodes” in the “Cluster”, which would require “Network Input/Output Operations”. Thus, using “DISK_ONLY” option with “Persist” is “Time-Consuming”, but, still gives better performance than “Re-Computation” of the Data each time using the “Directed Acyclic Graph” (DAG).
Using “DISK_ONLY” option with “Persist” is “Space-Efficient” as the Data “Stored” in the “Disk” is “Serialized”, which reduces the space of the Data “Stored”.
But, using “DISK_ONLY” option with “Persist” is not at all “Time-Efficient”, because, while reading the Data from the “Disk”, it needs not to be “Deserialized” to be used in the “Spark Application” further. So, overhead process on CPU is required.
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.DISK_ONLY)
The default “Storage Level” provided by “Apache Spark” for “Persisting” the Data is “MEMORY_ONLY”.
The “persist ()” Method is used to assign a new “Storage Level” to the DataFrame.
Different Storage Levels of “Persist” with Replication
The following “Storage Levels” provided by “Apache Spark” for “Persisting” the Data works in the similar way as the already mentioned “Storage Levels”, but, the only difference with the previous counterparts is that the following “Storage Levels” creates “Replication” of the “Stored Data” in another “Worker Node” in the “Cluster”.
One particular “Partition” of the Data would be “Stored” across “Multiple Different Worker Nodes” in the “Cluster”. So, in case of any failure in “One Node”, the “Stored Data” is still accessible through the “Other Nodes” in the “Cluster”.
- MEMORY_ONLY_2
- MEMORY_ONLY_SER_2
- MEMORY_AND_DISK_2
- MEMORY_AND_DISK_SER_2
- DISK_ONLY_2
In the above “Storage Levels”, the Data would be “Stored” across “Two Different Worker Nodes” in the “Cluster”. That is why the “Replication Factor”, i.e., “2” is appended with the “Storage Level” names.
How to Remove the Cached Data from Memory and Disk?
“Apache Spark” automatically monitors every “cache ()", or, “persist ()” call being made, and, it checks the usage on “Each Worker Node”, and, drops the “Persisted Data” if not used by using the “Least-Recently-Used” (LRU) Algorithm.
Still, it is possible to manually remove the not required “Persisted Data” using the “unpersist ()” Method. This Method marks the DataFrame as “Non-Persistent”, and removes all the “Blocks” for it from the “Memory” and the “Disk” of “Each Worker Node” in the Cluster.
df.unpersist()