Introduction to “Partition” in “Apache Spark”
What is the “Importance” of “Partition”?
- “Apache Spark” is known for its “Speed”. The “Fast Speed” of “Computing” comes from the “Parallel Processing”.
- “Partition” is the “Key” for “Parallel Processing”.
- If the “Data”, to work with, is “Partitioned” in a “Proper Way” then the “Query Performance” on that “Data” would be “Improved” as the “Parallel Processing” will be “Triggered” “Effectively”.
- If the “Data”, to work with, is “Not Partitioned” in a “Proper Way” then the “Distributed Framework” of “Apache Spark” is “Not” being used “Effectively”.
- So, “Partition” plays an “Important Role” in the following -
1. Performance Improvement
2. Error Handling
3. Debugging
Why “Partition Strategy” is Needed?
“Partition” is the “Key” for “Parallel Processing” in “Apache Spark”, which makes “Use” of the “Distributed Framework” in “Apache Spark” “Effectively”.
Hence, the “Best Partition Strategy” needs to be “Adopted” to “Achieve” the “Best Performance” in a “Spark Application”.
There are “Two Important Concepts” in “Choosing” the “Correct Partition Strategy” -
1. “Choosing” the “Right Number” of “Partitions”:
- If the “Data” to work with is “Distributed” in the “Right Number” of “Partitions”, “Based” on the “Number of Cores” available in a “Cluster”, then the “Performance” of a “Spark Application” “Boosted”.
- If the “Data” to work with is “Distributed” in the “Wrong Number” of “Partitions”, then the “Performance” of the “Spark Application” is “Decreased”.
2. “Choosing” the “Right Size” of “Partition”:
- If the “Data” to work with is “Distributed” in the “Partitions” of the “Same Size”, i.e, “Evenly Distributed Partitions”, then the “Performance” of a “Spark Application” is “Improved”.
- If the “Data” to work with is “Not Distributed” in the “Partitions” of the “Same Size”, i.e, “Unevenly Distributed Partitions”, then the “Performance” of a “Spark Application” is “Decreased”.
How to “Choose” the “Right Number” of “Partitions”?
The “Right Number” of “Partitions” can be “Calculated” using either of the following way -
1. The “Total Number” of “Cores” available in a “Cluster”
2. “Multiples” of the “Total Number” of “Cores” available in a “Cluster”
Example: Suppose, in a “Cluster”, there are “8 Executors” available, and, “Each” of the “Executors” has “8 Core”. So, over all, there are “64 Cores” available in the “Cluster” in “Each Iteration” of the “Data Processing”.
- Wrong Partition Number”: But, the “Data” to work with is having only “40 Partitions”.
Now, when an “Action” is “Triggered” on the “Data”, out of the available “64 Cores”, only “40 Cores” will pick “One Partition” of the “Data” to work with “Each”, and, “Start Processing” in the “First Iteration”, but, the “Rest” of the “24 Cores” will be “Idle” in the “First Iteration” itself.
So, “All” the available “Cores” are “Not Used Effectively” for “Data Processing”. - Right Partition Number”: So, in this case, the “Right Number” of “Partitions” would be “64”, or, the “Multiples” of “64”, like “128”, or, “192”, and, so on.
Suppose, if the “Number” of “Partitions” of the “Data” to work with is “128”, then, in the “First Iteration”, “All” of the “64 Cores” will pick up the “64 Partitions” of the “Data” to work with, and, “Start Processing”. Once, the “First Iteration” is “Completed”, then, in the “Second Iteration”, “All” of the “64 Cores” will again pick up the “Next” “64 Partitions” of the “Data” to work with, and, “Start Processing”.
In this way, “No Core” in the “Cluster” will be “Idle” in “Any Iteration” of the “Data Processing”.
So, “Choosing” the “Right Number” of “Partitions” is “Important” so that “No Core”, available in a “Cluster”, “Sits” “Idle” in “Any Iteration” of the “Data Processing”, and, the “Performance” of a “Spark Application” will be “Increased”.
How the “Unevenly Distributed Partitions” Actually “Decrease” the “Performance” of a “Spark Application”?
- Suppose, in a “Cluster”, there are “8 Executors” available, and, “Each” of the “Executors” has “8 Core”. So, over all, there are “64 Cores” available in the “Cluster” in “Each Iteration” of the “Data Processing”, and, the “Data” to work with is having “64 Partitions”.
- But out of the “64 Partitions” of the “Data” to work with, “32 Partitions” are about “1 GB” in “Size”, and, the “Rest” of the “32 Partitions” are about “100 MB” in “Size”, i.e., the “Partitions” of the “Data” to work with are “Unevenly Distributed”.
- Now, when an “Action” is “Triggered” on the “Data”, “All” of the “64 Cores” will pick up the “64 Partitions” of the “Data” to work with, and, “Start Processing”. But, out of the “64 Cores”, “32 Cores” will “Process” the “Data” of “1 GB” in “Size”, whereas, the “Rest” of the “32 Cores” will “Process” the “Data” of “100 MB” in “Size”.
- So, those “32 Cores”, which “Process” the “Data” of “100 MB” in “Size”, will “Complete” the “Data Processing” “Very Quickly” and “Start Sitting Idle” in the “First Iteration”, whereas, the “Other 32 Cores”, which “Process” the “Data” of “1 GB” in “Size”, will take “More Time” to “Complete” the “Data Processing”.
- This way, the “Unevenly Distributed Partitions” of the “Data” to work can make the available “Cores”, in a “Cluster”, “Sit” “Idle” in “Any Iteration” of the “Data Processing”, and, thereby “Decreasing” the “Performance” of that “Spark Application”.
The “Default Number of Partitions” of “Data” that is “Generated” in the “Spark Environment”
- When the “Data” to work with is “Generated” in the “Spark” Environment, the “Default Number of Partitions” are “Created” from the “Default Value” of the “Spark Configuration”, i.e., “sc.defaultParallelism”.
- The “Default Value” for the “Spark Configuration”, i.e., “sc.defaultParallelism” is “8”. So, by default, when the “Data” to work with is “Generated” in the “Spark” Environment, it is “Distributed” in “8 Partitions”.
# Create the List of the Column Names for the DataFrame
employeeColumns = ["Employee_Id", "First_Name", "Last_Name", "House_No", "Address", "City", "Pincode"]
# Create the List for the Data of the DataFrame
employeeList = [\
(1001, "Oindrila", "Chakraborty", "118/H", "Narikel Danga North Road", "Kolkata", 700011),\
(1002, "Soumyajyoti", "Bagchi", "38", "Dhakuria East Road", "Kolkata", 700078),\
(1003, "Oishi", "Bhattacharyya", "28B", "M.G Road", "Pune", 411009),\
(1004, "Sabarni", "Chakraborty", "109A", "Ramkrishna Road", "Kolkata", 700105)\
]
# Create the DataFrame from the List
employeeColumnNamesDf = spark.createDataFrame(employeeList, schema = employeeColumns)
# Display the "Default Number of Partitions" of the DataFrame
print(sc.defaultParallelism)
print(employeeColumnNamesDf.rdd.getNumPartitions())
Output -
Since, there is “Not Enough Data” as there are “Only 4 Rows”, out of the “Default 8 Partitions”, “4 Partitions” are “Created” as “Empty”.
employeeColumnNamesDf.rdd.glom().collect()
Output -
The “Default Number” and “Default Size” of “Partitions” of “Data” that is “Read” from the “External Storage”
- When the “Data” to work with is “Read” from an “External Storage” to the “Spark Cluster”, the “Number of Partitions” and the “Max Size” of “Each Partition” are “Dependent” on the “Default Value” of the “Spark Configuration”, i.e., “spark.sql.files.maxPartitionBytes”.
- The “Default Value” for the “Spark Configuration”, i.e., “spark.sql.files.maxPartitionBytes” is “128 MB”.
- So, if a “File” of “1 GB” in “Size” is “Read” from an “External Storage”, the “Spark Configuration”, i.e., “spark.sql.files.maxPartitionBytes” will “Start Packing” “128 MB” of “Data” into “Each Partition” at “Maximum”, which makes the “1 GB” of “Data” to be “Distributed” into “8 Partitions” (“1024 MB / 128 MB”), onto the “Spark Cluster”.
df_ReadCustomerFileWithHeader = spark.read\
.option("header", "true")\
.csv("dbfs:/FileStore/tables/retailer/data/customer.csv")
# Display the "Default Value" of the "Spark Configuration", i.e, "spark.sql.files.maxPartitionBytes"
print(spark.conf.get("spark.sql.files.maxPartitionBytes"))
# Display the "Size" of the "CSV File"
print(dbutils.fs.ls("dbfs:/FileStore/tables/retailer/data/customer.csv")) #19900792
# Display the "Default Number of Partitions" of the DataFrame
print(df_ReadCustomerFileWithHeader.rdd.getNumPartitions())
Output -
df_ReadCustomerFileWithHeader.rdd.glom().collect()
Output -
Can the “Default Number of Partitions” of “Data” that is “Read” from the “External Storage” be Changed?
Yes. The “Spark Configuration”, i.e., “spark.sql.files.maxPartitionBytes” is “Configurable”. meaning the “Max Size” of “Each Partition” can be “Changed” by “Changing” the “Value” of the “Spark Configuration”, i.e., “spark.sql.files.maxPartitionBytes” depending on the “Different Use Cases” of the “Spark Application”.
Example:
- Some “Developers” may “Develop” a “Spark Application” that can “Handle” “Terabytes” of “Data”. So, the “Max Size” of “Each Partition” can be “Bigger” than the “Default Value” of the “Spark Configuration”, i.e., “spark.sql.files.maxPartitionBytes”.
In this case, the “New Value” of the “Spark Configuration”, i.e., “spark.sql.files.maxPartitionBytes” can be “Set” as the following -
spark.conf.set (“spark.sql.files.maxPartitionBytes”, “10000b”).
# Change the "Value" of the "Spark Configuration", i.e, "spark.sql.files.maxPartitionBytes" to "10 KB", i.e., "10000b"
spark.conf.set ("spark.sql.files.maxPartitionBytes", "10000b")
# Display the "Changed Value" of the "Spark Configuration", i.e, "spark.sql.files.maxPartitionBytes"
print(spark.conf.get("spark.sql.files.maxPartitionBytes"))
Output -
df_ReadCustomerFileWithHeader = spark.read\
.option("header", "true")\
.csv("dbfs:/FileStore/tables/retailer/data/customer.csv")
# Display the "Changed Value" of the "Spark Configuration", i.e, "spark.sql.files.maxPartitionBytes"
print(spark.conf.get("spark.sql.files.maxPartitionBytes"))
# Display the "Size" of the "CSV File"
print(dbutils.fs.ls("dbfs:/FileStore/tables/retailer/data/customer.csv")) #19900792
# Display the "Current Number of Partitions" of the DataFrame
print(df_ReadCustomerFileWithHeader.rdd.getNumPartitions())
Output -
For Which Type of “Files” that is “Read” from the “External Storage”, the “Default Number of Partitions” of “Data” can “Not” be “Applicable”?
- If the “File” that is “Read” from an “External Storage” is “Not” in “Splittable Form”, i.e., if the “File” is “Compressed”, only then the “Data” can “Not” be “Distributed” into the “Number of Partitions” based on the “Default Value” of the “Spark Configuration”, i.e, “spark.sql.files.maxPartitionBytes”.
- In such cases, even if the “Size” of the “File” is “1 TB”, the “Data” would be having just “1 Partition”.
- “Creating” a “Single Partition” with “All” the “Data” of a “File” is “Not Good” for “Performance”, as “Only One Core” would “Process” the “Entire Data” of that “File”, while “All Other Cores” are “Kept Idle”.