Working with Shuffle

Oindrila Chakraborty
3 min readMay 6, 2024

--

What is Shuffle?

“Shuffle” is nothing but “Exchanging” the Data across the “Executors” in a “Cluster”.

Why Data Needs to be Shuffled at All?

In order to achieve the results of certain calculations, it is important to “Shuffle” the Data across the “Executors” in a “Cluster”.

“Shuffle” is mainly used in “Wide Spark Transformations”, like -“Group By”, any “Aggregate Function”, or, “Join Operation”. In order to get the output for these operations, the Data from “One Particular Partition” in “One Particular Executor” is dependent on the Data from “Another Partition” in “Another Executor”.

So, in order to produce the desired output, it is very important to “Sort” and “Shuffle” the Data.

In the above image, the task is to calculate the number of balls with the “Same Number”, i.e., calculate the number of balls displaying “Number 1”, calculate the number of balls displaying “Number 2” etc. In this case, the balls from “All” the “Executors” with the “Same Number” need to be “Sorted” first before bringing the balls displaying the “Same Number” into one “Executor” to perform the calculation.

This is how “Shuffle” happens across the “Executors”.

Significance of Shuffle

“Shuffle” is one of the costliest operations in Spark Development. So, it is very important to give importance to the “Shuffling Parameters”.

  1. Disk I/O - “Shuffle” involves a lot of “Disk Input/Output Operations”.
  2. Network I/O - “Shuffle” involves a lot of “Network Input/Output Operations”.
  3. Data Serialization / Deserialization - “Shuffle” leads to “Data Serialization / Deserialization”.

These are the factors hitting the performance of any Spark Application.

These are the reasons for choosing the “Right Shuffling Parameters”.

What is Shuffling Parameter?

while “Exchanging” the Data across the “Executors” in a “Cluster” in between the “Stages” of a “Spark Job”, the “Desired Number of Partitions” to be created at the “End” of “Each Stage” is called the “Shuffling Parameter”.

By default, the value of the “Shuffling Parameter” is “200”, which means that if any “Wide Spark Transformation” is performed on Data that requires “Data Shuffling” in between the “Stages” of that “Spark Job”, then at the “End” of “Each Stage” the “200 Partitions” would be created.

A. To find out what the “Default Value” of the “Shuffling Parameter” that is set for the current “Spark Environment”, the following “Spark Configuration Command” is used -

spark.conf.get(“spark.sql.shuffle.partitions”)

B. To set a “New Value” to the “Shuffling Parameter” that would be set for the current “Spark Environment”, the “Desired Number of Partitions” needs to be passed to the following “Spark Configuration Command” -

spark.conf.set(“spark.sql.shuffle.partitions”, <desired-number-of-partition>)

The value of the “Desired Number of Partitions” can be less than “200”, or, more than “200”, depending on the use cases.

How to Decide the Value of the Shuffling Parameter?

There is “No Magic Formula” to set the “Desired Number of Partitions” at the “End” of “Each Stage” as the “Shuffling Parameter”.

The value of the “Shuffling Parameter” may vary depending on many factors, such as - the use case, the Data Set, the “Number of Cores”, and, the “Amount” of “Executor Memory” available.

Selecting a value as the “Shuffling Parameter” is purely a “Trial and Error” approach.

What are the Factors to Consider While Choosing the Value of the Shuffling Parameter?

A. For Smaller Data Set - Assuming there is a “Smaller Data Set” in a “Spark Application”, e.g., “1 Gigabyte”, the “200 Partitions” would split the Data into “Much Smaller Chunks”, which is going to add the overhead to the “Disk” and “Network” operations.

B. For Large Data Set - Assuming there is a “Large Data Set” in a “Spark Application”, e.g., “1 Terabyte”, the “200 Partitions” would split the Data into “Much Bigger Chunks”, which is going to be a hindrance in achieving the benefit of “Parallelism”.

So, the following “Factors” needs to be considered while choosing the value of the “Shuffling Parameter” -

  1. Make sure that the “Standard Size” of any “Partition Block” would be at least “128MB” to “200MB”.
  2. Make sure that the “Number of Partitions” that are created at the “End” of “Each Stage” of a “Spark Job” is “Equivalent” to the “Multiples” of the “Number of Cores” of the “Executors” in the “Cluster”.
    Example: if the “Executors” in the “Cluster” have “16 Cores”, then the value of the “Shuffling Parameter” can be “At Least 16”, or, in the “Multiples of 16”, such as - “32”, “64”, “128” etc.

--

--

Oindrila Chakraborty

I have 11+ experience in IT industry. I love to learn about the data and work with data. I am happy to share my knowledge with all. Hope this will be of help.