Introduction to Databricks Optimization and Performance Tuning
Recap of Parallel Processing Concept in Apache Spark
- The main concept behind Spark in Parallel Processing.
- If a Dataset is given to Spark, it will divide the Dataset into multiple Partitions.
- Each of these Partitions will be assigned to each Executor in the Cluster.
- This way each of the Executors can work in Parallel.
- Example — If there are 1000 rows of data to process, then those 1000 rows can be divided into 10 Partitions, where each of the Partitions would contain 100 rows.
Then each of these Partitions will be assigned to each of the Executor in the Cluster to process in Parallel.
Recap of Spark Architecture
- When a Developer submits a program to Apache Spark for execution, actually the Developer submits the program to the Driver Node, which is the Driver Machine of the Cluster.
- Then, from the Driver Machine, the submitted program goes to the Cluster Manager.
The Cluster Manager takes the whole piece of code of the submitted program, and, divides into the natural hierarchy of — Jobs, Stages and Tasks respectively, and, sends each of these Tasks to each of the Executors of the Cluster for processing.
Each Task is assigned a Partition of data to process.
Recap of Internal Optimization Steps by Apache Spark
- Before sending the Tasks to the Worker Node to process the assigned Partition of data, Apache Spark internally optimizes the piece of code by creating a Directed Acyclic Graph, i.e., DAG that represents how the submitted piece of code will be executed.
- Apache Spark first creates a Logical Plan of how the submitted piece of code is going to get executed and tries to optimize it.
- Then, Apache Spark creates a Physical Plan of how the submitted piece of code is going to get executed and tries to optimize it.
- After that, Apache Spark creates the Stages out of the Optimized Physical Plan, and, within each of the Stages, Apache Spark creates one or multiple Tasks.
- Each of these Tasks are then sent to the Executor / Worker Node to execute, and, assigned a Partition of data to process.
- Finally, the outputs are collected from each of the Executors / Worker Nodes to the Driver Node.
Levels of Optimization or Performance Tuning in Databricks
The Optimization or Performance Tuning can be done in Databricks at two levels.
1. Application Level: The first level is at the Application Level.
The following types of Performance Issues are taken care of, by Optimizing, at the Application Level -
- A. Skew
- B. Spill
- C. Storage
- D. Shuffle
- E. Serialization
2. General Level: The second level is at the General Level that are provided by Databricks.
The General Optimizations can be done at two levels -
i. Table Level
ii. Cluster Level
The following General Optimizations are done at the Table Level -
- A. Z Order
- B. Data Skipping
- C. Vacuum
- D. Partitioning
- E. Deletion Vectors
- F. Liquid Clustering
- G. Predictive Optimization
Five Most Common Performance Issues of an Application in Databricks
The five most common Performance Issues faced in any application, or, the five most common areas in which the Performance of an application can be improved are the following -
1. Skew - The data that is sent for processing to an application in Apache Spark is partitioned. Whenever there is any imbalance in the size of Partitions, the Data Skewness will occur.
Data Skewness means one partition will have more data than the other partitions. Example -
- There are 100 rows of data in a dataset called Location, which is partitioned on Country column. Out of those 100 rows, suppose, 99 rows have USA as the Country column and only 1 row has India as the Country column.
- So, the data is Skewed as 99 rows went to the partition with USA as the Country, and, only 1 row went to the partition with India as the Country.
- So, now when the data will be processed in parallel based on the partition, the partition with 99 rows will go to one Worker Node for processing and the partition with only 1 row will go to another Worker Node for processing. Hence, the Worker Node with only 1 row to process will complete the work very fast. But, the Worker Node with 99 rows to process will take a lot of time to complete the work. This Worker Node might not even have enough Memory to process the data, which causes the Performance Issue due to the Skewness of the data in the partition to process.
2. Spill - Spill refers to the event of writing the temporary files to the Disk due to lack of Memory.
Data Skewness can cause Spill. Example -
- Since, the partition with 99 rows has gone to one Worker Node for processing. That Worker Node might not even have enough Memory to process the data. So, the data will be Spilled to the Disk. Hence, the Spill error occurs due to the lack of Memory to process the data.
3. Storage - The Storage problems are a set of problems that are directly related to how the data is stored in the Disk.
Although Apache Spark usually performs In-Memory computation, but, since the Worker Nodes also have Disks, the data can be stored in the Disks as well.
Hence, in a Spark application, the data can be processed in the Memory of a Worker Nodes, as well as, the data can be processed in the Disk of that Worker Nodes, because, the data can be read from the Disk and written to the Disk of a Worker Nodes too.
4. Shuffle - In a Spark application, data gets processed in parallel in different Worker Nodes. Now, when any Wide Transformation, like — Join, or, Group By is used, the data from other Worker Nodes would be needed to perform that Wide Transformation in each of the Worker Nodes.
So, each Worker Node would be taking the data from other Worker Nodes to perform the Wide Transformation operation. This is a Shuffle., i.e. data from one Worker Node is moving to another Worker Node, i.e., the data is Shuffling between the Worker Nodes.
The more the data is being Shuffled, the more time will be taken to complete the work. Hence, the Shuffle needs to be tuned to reduce the time taken to complete the work.
5. Serialization - The actual code in a Spark application runs on a Java Virtual Machine. So, the actual code needs to be Serialized and De-Serialized, which can cause error.
How One Performance Issue Can Cause Another Performance Issue?
1. Skew Can Induce Spill - While processing data using a Spark application, if one partition much more data in it that the other partitions, then the Worker Node that is processing that partition will take a lot of time to complete the work.
If that Worker Node does not even have enough Memory to process the data, then the data will be Spilled to the Disk due to the lack of Memory.
In this way, Skew can induce Spill.
2. Storage Issues Can Induce Excess Shuffle - In a Spark application, the data is processed in multiple Worker Nodes in parallel.
Suppose, there are 10 Worker Nodes in a Cluster, and, the size of each of the Worker Nodes is very small. Hence, each of the Worker Nodes can’t hold much data. So, in case of any Wide Transformation operation, whenever each of the Worker Nodes need data from the other Worker Nodes, a lot of Shuffling occurs.
In this way, storage issues can induce excess Shuffle.
But, if the Worker Nodes in a Cluster are of balanced size, then each of the Worker Nodes can hold significantly much more data. So, in case of any Wide Transformation operation, only when each of the Worker Nodes need data from the other Worker Nodes, only then Shuffling occurs. Hence, the Shuffling would be reduced.
So, between a Cluster of 10 small Worker Nodes, and, another Cluster of 5 significantly larger Worker Nodes, the Shuffle will be less in the Cluster of 5 significantly larger Worker Nodes.
3. Incorrectly Addressed Shuffle Can Lead to Skew - Shuffle can be reduced by creating a Cluster of significantly larger Worker Nodes. Even after that Skew may be induced in any of the Worker Nodes, i.e., most of the partitions, with data to process, are coming to a particular Worker Node than the others.
Many of the above three problems may be present at the same time.