Working with Shuffle
What is Shuffle?
Shuffle is nothing but exchanging the data across the executors in a cluster.
Why Data Needs to be Shuffled at All?
- Shuffle is caused due to the Wide Transformations in Apache Spark.
- Any Wide Transformation makes the data transfer from one executor to the other, causing the Shuffle to occur.
- In order to achieve the results of the Wide Transformations, such as — group by, any aggregate function, or, join operation, the data from one particular partition in one particular executor is dependent on the data from another partition in another executor. So, in order to produce the desired output, it is very important to Sort and Shuffle the data.
- In the above image, the task is to calculate the number of balls with the same number, i.e., calculate the number of balls displaying 1, calculate the number of balls displaying 2 etc. In this case, the balls from all the executors with the same number need to be Sorted first before bringing the balls displaying the same number into one executor to perform the calculation.
This is how Shuffle happens across the executors.
Significance of Shuffle
Shuffle is one of the costliest operations in Apache Spark. So, it is very important to give importance to the Shuffling Parameters.
- Disk I/O - Shuffle involves a lot of Disk Input/Output Operations.
- Network I/O - Shuffle involves a lot of Network Input/Output Operations.
- Data Serialization / Deserialization - Shuffle leads to Data Serialization / Deserialization.
These are the factors hitting the performance of any Apache Spark application.
These are the reasons for choosing the right Shuffling Parameters.
What is Shuffling Parameter?
While exchanging the data across the executors in a cluster in between the Stages of a Spark Job, the desired number of partitions to be created at the end of each Stage is called the Shuffling Parameter.
By default, the value of the Shuffling Parameter is 200, which means that if any Wide Transformation is performed on the data that requires Shuffling in between the Stages of that Spark Job, then at the end of each Stage, 200 partitions would be created.
- A. To find out what the default value of the Shuffling Parameter is set for the current Spark Session, the Apache Spark configuration property spark.sql.shuffle.partitions is used in the following way -
spark.conf.get(“spark.sql.shuffle.partitions”) - B. To set a new value to the Shuffling Parameter that would be set for the current Spark Session, the desired number of partitions needs to be passed to the Apache Spark configuration property spark.sql.shuffle.partitions in the following way -
spark.conf.set(“spark.sql.shuffle.partitions”,<desired-number-of-partition>
)
The value of the desired number of partitions can be less than 200, or, more than 200, depending on the use cases.
How to Decide the Value of the Shuffling Parameter?
- There is no magic formula to set the desired number of partitions at the end of each Stage as the Shuffling Parameter.
- The value of the Shuffling Parameter may vary depending on many factors, such as - the use case, the dataset, the number of cores, and, the amount of executor memory available.
- Selecting a value as the Shuffling Parameter is purely a trial-and-error approach.
What are the Factors to Consider While Choosing the Value of the Shuffling Parameter?
A. For Smaller Dataset - Assuming there is a smaller dataset in a Apahce Spark application, e.g., 1 GB, the data would be split into much smaller chunks to fit into the initial 200 partitions, which is going to add the overhead to the disk and network operations.
B. For Large Dataset - Assuming there is a large dataset in a Apahce Spark application, e.g., 1 TB, the data would be split into much bigger chunks to fit into the initial 200 partitions, which is going to be a hindrance in achieving the benefit of parallelism.
So, the following factors needs to be considered while choosing the value of the Shuffling Parameter -
- Make sure that the standard size of any partition block would be at least 128 MB to 200 MB.
- Make sure that the number of partitions that are created at the end of each Stage of a Spark Job is equivalent to the multiples of the number of cores of the executors in the Cluster.
Example: if the executors in a cluster have 16 cores, then the value of the Shuffling Parameter can be at least 16, or, in the multiples of 16, such as — 32, 64, 128 etc.
How to Identify If Shuffle Has Occurred?
A. If the Developer is already aware that any of the Wide Transformations is used in the code, then Shuffle will happen.
B. If the Developer is not aware if any of the Wide Transformations is used in the code, then -
- In the Spark UI, in the DAG of any Stage in the Apache Spark application, the mention of Exchange will be there, if Shuffle has occurred.
- In the EXPLAIN command is performed on the code of the Apache Spark application, the mention of Shuffle will be there, if Shuffle has occurred.
What is Shuffle Read and Shuffle Write?
Shuffle Read - At the beginning of each Stage in a Spark Job, the Shuffle Read means the sum of the bytes of all the Serialized data, that is read from the disks of all the executors in that cluster.
Shuffle Write - At the end of each Stage in a Spark Job, after the Shuffling operation is executed, the Shuffle Write means the sum of the bytes of all the Serialized data, that is written to the disks of all the executors in that cluster.
The data that got written due to the Shuffle Write in all the executors are to be read by another Shuffle in the future Stage of that Spark Job.
- If a Spark Job in a Spark Application has only one Stage, then no Shuffling is required.
How Shuffle Can be Reduced?
Since, it is not possible to stop using the Wide Transformations in the code of an Apache Spark application, Shuffle operation cannot be eliminated altogether, but it is possible to reduce the effect of the Shuffle operation.
A. Whenever, there is a high Shuffle, instead of creating a cluster with many smaller executors, it would be best to create a cluster with fewer larger executors.
In the case of a cluster with fewer larger executors, most of the data will reside in a single executor and hence, not very much data will be needed to Shuffle across the other executors in the cluster to perform any Wide Transformation.
So, the need to Shuffle the data across the executors in a cluster will be reduced.
B. It is also possible to reduce the effect of the Shuffle operation by reducing the amount of data that would be Shuffled across the executors in a cluster by in the following way -
- Filtering the data first before applying any Wide Transformation.
- Instead of selecting all the columns from a dataset, only the required columns need to be selected.
Instead of using SELECT *, it is better to use the required column names in the SELECT clause.
C. To reduce the effect of the Shuffle operation, it is possible to Broadcast the smaller tables, which are less than 2 GB in size, to all the executors in a cluster.
Broadcasting means, in a join operation, if a smaller dataset is getting joined with a larger dataset, then smaller dataset is sent to each of the executors in the cluster. So, each of the executors in the cluster will have a copy of the smaller dataset.
This kind of join operation, where a smaller dataset is getting joined with a larger dataset, is called the Broadcast Join operation.
Hence, no Shuffling will occur in a Broadcast Join operation.
- One thing is to remember that only the smaller datasets can be Broadcasted. If a larger dataset is Broadcasted, it will exhaust the Memory of all the executors in a cluster, leading to Out of Memory error.
D. It is better to use the De-Normalized datasets, for the frequently-used join operations, to reduce the effect of the Shuffle operation.
E. Whenever, there is a high Shuffle, it is preferred to use the Solid State Drives, i.e., SSDs, to make the Shuffle Read and the Shuffle Write faster.
But, using the Solid State Drives, i.e., SSDs, are much costlier.
F. To reduce the effect of the Shuffle operation, it is better to always enable the Cost-Based Optimizer, i.e., CBO to have the up-to-date Column Statistics and Table Statistics of the tables, on which the Wide Transformations would be used.