Introduction to Skewness in Databricks

Oindrila Chakraborty
6 min readNov 18, 2024

--

How the Size of Each Partition, of the Data to Process, is Estimated Initially in a Spark Application?

Initially, Apache Spark typically reads the data to process, from the underlying files, and stores into the Memory in the form of partitions of size 128 MB each, i.e., each of the partitions would be evenly distributed.

Initially, when Apache Spark reads the data to process, from the underlying files, the size of each partition is controlled by the following two Spark configurations -

1. spark.sql.files.maxPartitionBytes - Apache Spark configuration property spark.sql.files.maxPartitionBytes is used to specify the maximum number of bytes to pack into a single partition when reading from the file sources like — Parquet, JSON, ORC, CSV etc.
The default value for this property is 134217728 bytes, i.e., 128 MB. If the size of the input file is bigger than 128 MB, Apache Spark will read the data from that file into multiple partitions. Example -

  • Suppose, the size of an input file is 461966527 bytes, i.e., 441 MB. This is bigger than the default partition size. So, when the data of that input file is read into a DataFrame, it got created with 4 partitions because the size of the input file is 441 MB = 3 * 128 MB + 57 MB.

2. spark.sql.files.openCostInBytes - Apache Spark configuration property spark.sql.files.openCostInBytes is used to find out the estimated cost to open a file, i.e., number of bytes that could be scanned in, when reading data from multiple files like — Parquet, JSON, ORC, CSV etc.
This is used when putting multiple files into one partition. It is better to over-estimate, then the partitions with small files will be faster than partitions with bigger files.
Example -

  • E.g. 1 - Suppose there is a folder containing the following three files: -
    File1.csv of size 50 MB
    File2.csv of size 60 MB
    File3.csv of size 55 MB
    So, in this case, since the summation of the size of the two files, i.e., File1.csv and File2.csv, is lesser than 128 MB, then the data from these two files, File1.csv and File2.csv, will go to a single partition. Hence, the initial two partitions will have the following size -
    partition 1: 4 + 50 + 4 + 60 (File1.csv + File2.csv)
    partition 2: 4 + 55 (File3.csv)
  • E.g. 2 - Suppose there is another folder containing the following three files: -
    File1.csv of size 95 MB
    File2.csv of size 91 MB
    File3.csv of size 105 MB
    So, in this case, since the size of each of the files is very much close to 128 MB, no data from multiple files will go to a single partition. Hence, the initial three partitions will have the following size -
    partition 1: 4 + 95 (File1.csv)
    partition 2: 4 + 91 (File2.csv)
    partition 3: 4 + 105 (File3.csv)

What is Skewness?

Data Skewness means one partition will have more data than the other partitions in a dataset. Example -

There are 100 rows of data in a dataset called Location, which is partitioned on Country column. Out of those 100 rows, suppose, 99 rows have USA as the Country column and only 1 row has India as the Country column.

So, the data is Skewed as 99 rows went to the partition with USA as the Country, and, only 1 row went to the partition with India as the Country.

So, now when the data will be processed in parallel based on the partition, the partition with 99 rows will go to one Worker Node for processing and the partition with only 1 row will go to another Worker Node for processing. Hence, the Worker Node with only 1 row to process will complete the work very fast and will sit idle afterwards until the other Worker Node is also finished processing.

But, while processing the partition with 99 rows, this Worker Node might have the following performance issues -

  • Skewness Issue - The Worker Node will take a lot of time to complete the work due to the Skewness of the data in this partition.
    So, the Task to process the partition with 99 rows is taking much longer to finish as this Task needs to process much more data due to the Skewness in the partition it is processing.
  • Spilling Issue - It might also happen that this Worker Node does not even have enough Memory to process the data, which causes the data in the partition to process to Spill to the Disk.

Does Data Skewness Occur Only When Partitioning a Dataset?

  • The Data Skewness can happen at any time, not just because of partitioning the dataset. Data Skewness can occur in the following cases as well -
  • If the dataset is being Filtered, then it is possible to have significantly more records in one partition of the output dataset than the other partitions.
  • If one dataset is being Joined with another dataset, at that time also, it is possible to have significantly more records in one partition of the joined dataset than the other partitions.

How to Identify Data Skewness in a Spark Application?

  • To identify the Data Skewness in a Spark application, first the data to process needs to be analyzed. At every step of the development, when executing the Spark application, it must be checked consistently to see if the data to process is getting Skewed.
  • The Data Skewness in a Spark application can also be identified by checking the Task Duration at any Stage. When executing the Spark application, if a particular Task at a Stage is taking too much time to finish, at that time, it must be analysed to find out the reason behind that particular Task not getting completed.

Data Skew Code

Suppose, there are two datasets -

  1. Customer Dataset - A dataset, called Customer, containing the two columns customer_id and customer_name with data like the following -

2. Order Dataset - A dataset, called Order, containing the three columns order_id customer_id and country with data like the following -

  • Read the Customer dataset using the following code -
    df_customer = spark.read.format(“csv”).load(“<path-to-customer-dataset-file>")
  • Similarly, read the Order dataset using the following code -
    df_order = spark.read.format(“csv”).load(“<path-to-order-dataset-file>")
  • The target of the first query is to find out how many orders have been created per country (using group by) -
    df_group_by_country = df_order
    .groupBy(“country”).count()

    Since, most of the rows are present in a particular partition in the Order dataset, that partition is Skewed. So, performing the Wide Transformation, i.e., group by on the already Skewed dataset will lead to Long-Runing Task, or, in worst case will lead to Out-of-Memory, or Spill error.
  • The target of the second query is to find out the name of the customers who have created an order (using join) -
    df_order_with_customer_name = df_order.join(df_customer, df_order[“customer_id”] == df_customer[“customer_id”])
    .select(df_customer[“*”], df_order.order_id, df_order.country)

    Since, most of the rows are present in a particular partition in the Order dataset, that partition is Skewed. So, performing the Wide Transformation, i.e., join of the already Skewed dataset with another dataset, i.e., Customer dataset, will lead to Long-Runing Task, or, in worst case will lead to Out-of-Memory, or Spill error.

How to Solve the Data Skewness in a Spark Application?

  1. Using Skew Hint - While executing an Apache Spark application, if it is found that the query is stuck on finishing a small number of Tasks due to Data Skew present in the DataFrame on which the operations are being executed, it is possible to specify the Skew Hint with the hint(“skew”) method on the Skewed DataFrame as following -
    df.hint(“skew”).
    In addition to the basic Skew Hint, it is also possible to specify the hint method with the following combinations of parameters:
  • A. DataFrame and Skewed Column Name - The Skewed Data of a particular column in a DataFrame can be optimized by specifying that particular Skewed column in the hint method as following -
    df.hint(“skew”, “col1”)
  • B. DataFrame and Multiple Skewed Column Names - The Skewed Data of multiple columns in a DataFrame can be optimized by specifying the names of those particular Skewed columns as a List in the hint method as following -
    df.hint(“skew”, [“col1”, “col2”, “col3”])
  • C. DataFrame, Skewed Column Name, and Skew Value - If it is already known that for what particular value a particular column in a DataFrame is getting Skewed, then that particular column in that DataFrame can be optimized for that particular Skewed Value by specifying that particular Skewed column, and, the Skewed Value in the hint method as following -
    df.hint(“skew”, “col1”, “value”)

--

--

Oindrila Chakraborty
Oindrila Chakraborty

Written by Oindrila Chakraborty

I have 12+ experience in IT industry. I love to learn about the data and work with data. I am happy to share my knowledge with all. Hope this will be of help.

Responses (2)