Different Types of “Join Strategies” in “Apache Spark”

Oindrila Chakraborty
17 min readOct 6, 2023

--

What is “Join Selection Strategy”?

  • When “Any Type” of “Join”, like the “Left Join”, or, the “Inner Join” is “Performed” between “Two DataFrames”, “Apache Spark” “Internally” decides which “Algorithm” will be used to “Perform” the “JoinOperations between the “Two DataFrames”.
  • That particular “Algorithm” that is “Responsible” for “Planning” the “JoinOperation between the “Two DataFrames”, is called as the “Join Selection Strategy”.

Why Learning About “Join Selection Strategies” is Important?

  • To “Optimize” a “Spark Job” that “Involves” a “Lot of Joins”, the “Developers” need to be very much aware about the “Internal Algorithm” that “Apache Spark” will “Choose” to “Perform” “Any” of the “JoinOperations between “Two DataFrames”.
  • The “Developers” need to know about the “Join Selection Strategies” so that the “Wrong Join Selection Strategy” is “Not Used” in the “JoinOperation between “Two DataFrames”.
  • An “Incorrect Join Selection Strategy” will “Increase” the “Execution Time” of the “JoinOperation, and, the “JoinOperation becomes a “Heavy Operation” on the “Executors” as well.

In Which Phase the “Join Selection Strategy” is “Selected”?

  • “Apache Spark” decides which “Algorithm” will be used to “Perform” the “JoinOperation between “Two DataFrames” in the “Physical PlanningPhase, where “Each Node” in the “Logical Plan” has to be “Converted” to “One”, or, “More” “Operators” in the “Physical Plan” using the so-called “Join Selection Strategies”.

How “Many Types” of “Join Selection Strategies” are There in the “Apache Spark”?

  • Following are the “Join Selection Strategies” that “Apache Spark” can “Choose” from in the time of a “JoinOperation between “Two DataFrames” -
    1. Sort Merge Join
    2
    . Shuffle Hash Join
    3
    . Broadcast Hash Join
    4
    . Cartesian Join
    5
    . Broadcasted Nested Loop Join
  • In this article, I will explain about the first three types of “Join Selection Strategies”.

How “Apache Spark” Decides Which “Join Selection Strategy” to “Choose”?

  • “Apache Spark” decides the “Join Selection Strategy” to use in a “JoinOperation based on the following criteria -
  • “Size” of the “Data” of “Each” of the “DataFrame” involved in a “JoinOperation
  • The “Amount” of “Memory” available to the “Driver Node”, and, the “Worker Nodes
  • “Presence” of “Skewness” in the “Data” of “Each” of the “DataFrame” involved in a “JoinOperation
  • “Characteristics” of the “Join Key”, like — if the “DataFrames” involved in a “JoinOperation are “Partitioned” using the “Join Key Column”, or, the “Join Key Columns” that are “Present” in “Both” of the “DataFrames” involved in a “JoinOperation are “Already Sorted” etc.

Dataset Used -

Create Lists for “Person”.

# Create a List Containing the "Column Names" for "Person"
personColumns = ["Id", "First_Name", "Last_Name", "AddressId"]

# Create a List Containing the "Data" for "Person"
personList = [\
(1001, "Oindrila", "Chakraborty", "A001"),
(1002, "Soumyajyoti", "Bagchi", "A002"),
(1003, "Oishi", "Bhattacharyya", "A004"),
(1004, "Sabarni", "Chakraborty", "A003"),
(1005, "Ayan", "Dutta", "A002"),
(1006, "Dhrubajyoti", "Das", "A004"),
(1007, "Sayantan", "Chatterjee", "A004"),
(1008, "Ritwik", "Ghosh", "A001"),
(1009, "Puja", "Bhatt", "A001"),
(1010, "Souvik", "Roy", "A002"),
(1011, "Souvik", "Roy", "A003"),
(1012, "Ria", "Ghosh", "A003"),
(1013, "Soumyajit", "Pal", "A002"),
(1014, "Abhirup", "Chakraborty", "A004"),
(1015, "Sagarneel", "Sarkar", "A003"),
(1016, "Anamika", "Pal", "A002"),
(1017, "Swaralipi", "Roy", "A002"),
(1018, "Rahul", "Roychowdhury", "A003"),
(1019, "Paulomi", "Mondal", "A004"),
(1020, "Avishek", "Basu", "A002"),
(1021, "Avirupa", "Ghosh", "A004"),
(1022, "Ameer", "Sengupta", "A003"),
(1023, "Megha", "Kargupta", "A002"),
(1024, "Madhura", "Chakraborty", "A002"),
(1025, "Debankur", "Dutta", "A002"),
(1026, "Bidisha", "Das", "A001"),
(1027, "Rohan", "Ghosh", "A004"),
(1028, "Tathagata", "Acharyya", "A003")
]

Create Lists for “Address”.

# Create a List Containing the "Column Names" for "Address"
addressColumns = ["AddressId", "Address"]

# Create a List Containing the "Data" for "Address"
addressList = [\
("A001", "India"),
("A002", "US"),
("A003", "UK"),
("A004", "UAE")
]

Create a DataFrame for “Person”.

dfPerson = spark.createDataFrame(personList, schema = personColumns)
dfPerson.printSchema()
display(dfPerson)

Output -

Create Another DataFrame for “Address”.

dfAddress = spark.createDataFrame(addressList, schema = addressColumns)
dfAddress.printSchema()
display(dfAddress)

Output -

1. Sort Merge Join

  • The “Sort Merge Join” is the “Default Join Selection Strategy” when a “JoinOperation is “Performed” between “Two DataFrames”.
  • The “Sort Merge Join” goes through the following “Three Phases” -
    1. Shuffle
    2
    . Sort
    3
    . Merge
  • Consider, in the “Cluster”, there are “Two Worker Nodes”, and, “Both” of the “DataFrames” are “Initially Distributed” into “4 Partitions”. Then, the “Sort Merge Join” is “Applied” to “Join” the “DataFrame” of “Person” with the “DataFrame” of “Address”.

Phase 1 : “Shuffle”

A “DataFrame” is nothing but a “Collection” of “Partitions” that are “Distributed” across “All” the “Cores” of “Every Worker Node” in a “Cluster”.

When a “DataFrame” is “Created”, the “Number of Partitions” for that “DataFrame” is decided by various “Parameters”, like — the “Default Block Size”, i.e., “128 MB”.

When a “JoinOperation is “Performed” between “Two DataFrames”, the “Driver Node” sends some “Piece of Code”, involving the “JoinOperation, in the “Form” of “Tasks” to the “Worker Nodes”.

“Each Task” will “Pick Up” “One Partition” of “Both” of the “DataFrames” “At a Time”, and, that particular “Task” will “Work On” the “Data” that is “Present” in those particular “Partitions”.

The “JoinOperation is “Performed” based on a “Key Column”. It might happen that “All” the “Data”, pertaining to a particular “Value” of the “Join Key Column” is “Not Present” in a “Single Partition”, but is “Scattered Around” the “Different Worker Nodes” in the “Cluster”.
Example -

  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A001” is “Not Present” in a “Single Partition” of the “DataFrame” of “Person”, but is “Present” in the “Multiple Partitions”.
    Each” of these “Partitions” might be “Present” in “Different Worker Nodes”.
  • The “Data”, pertaining to the “Join Key Column” with the “Value” as “A001” is “Present Only” in the “Partition No: 2” of the “DataFrame” of “Address”.
    So, “Partition No: 2” of the “DataFrame” of “Address” can “Not” be “Present” in “Different Worker Nodes”. Suppose, the “Partition No: 2” of the “DataFrame” of “Address” is “Present” in the “Third Worker Node”. Hence, the “JoinOperation can “Not Occur”.

The “Data” in “Each Partition”, for “Both” of the “DataFrames”, needs to be “Shuffled” so that “All” the “Data”, having a particular “Value” of the “Join Key Column” can be “Stored” in the “Same Partition” for the respective “DataFrames”, and, the “Resultant Partitions” of “Both” of the “DataFrames” can be “Stored” in the “Same Worker Node” depending on the “Values” of the “Join Key Column”, on which the “Filtering” happened.
Example -

  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A001” should be “Present” in the “Partition No: 1” for “Both” of the “DataFrames” in the “Worker Node : 1”.
  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A002” should be “Present” in the “Partition No: 3” for “Both” of the “DataFrames” in the “Worker Node : 2”.
  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A003” should be “Present” in the “Partition No: 1” for “Both” of the “DataFrames” in the “Worker Node : 1”.
  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A004” should be “Present” in the “Partition No: 3” for “Both” of the “DataFrames” in the “Worker Node : 2”.

Phase 2 : “Sort”

Once, the “ShufflePhase” is “Over”, in “All” of the “Resultant Partitions” of “Both” of the “DataFrames”, the “Data” is “Not Sorted” based on the “Values” of the “Join Key Column”.

In the Second Phase, i.e., “SortPhase”, “All” the “Data” in “All” of the “Resultant Partitions” of “Both” of the “DataFrames” are “Sorted” based on the “Values” of the “Join Key Column”.

The “Resultant Sorted Partitions”, after the Second Phase is “Over”, of “Both” of the “DataFrames” are still “Present” in the “Same Working Nodes” respectively.
Example -

“Partition No: 1” for the “DataFrame” of “Person” has “Data”, having “A001”, and, “A003” as the “Value” of the “Join Key Column”.
In the “SortingPhase, the “Data” of the “Partition No: 1” is “Sorted” -

  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A001” is “Kept Together” in the “Worker Node : 1”.
  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A003” is “Kept Together” in the “Worker Node : 1”.

“Partition No: 3” for the “DataFrame” of “Person” has “Data”, having “A002”, and, “A004” as the “Value” of the “Join Key Column”.
In the “SortingPhase, the “Data” of the “Partition No: 3” is “Sorted” -

  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A002” is “Kept Together”.
  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A004” is “Kept Together”.

Phase 3 : “Merge”

Once, the “SortPhase” is “Over”, “All” of the “Resultant Partitions” of “Both” of the “DataFrames” are “Sorted” based on the “Values” of the “Join Key Column”.

In the Third Phase, i.e., “MergePhase”, a “Partition” from the “First DataFrame” is “Merged” with a “Partition” from the “Second DataFrame” based on the “Same Value” of the “Join Key Column” in the “Same Working Node”.

Example -

  • “All” the “Data” of the “Partition No: 1” for the “DataFrame” of “Person” having “A001” as the “Value” of the “Join Key Column” needs to be “Merged” with “All” the “Data” of the “Partition No: 1” for the “DataFrame” of “Address” having “A001” as the “Value” of the “Join Key Column” in the “Worker Node : 1”.
  • “All” the “Data” of the “Partition No: 3” for the “DataFrame” of “Person” having “A002” as the “Value” of the “Join Key Column” needs to be “Merged” with “All” the “Data” of the “Partition No: 3” for the “DataFrame” of “Address” having “A002” as the “Value” of the “Join Key Column” in the “Worker Node : 2”.
  • “All” the “Data” of the “Partition No: 1” for the “DataFrame” of “Person” having “A003” as the “Value” of the “Join Key Column” needs to be “Merged” with “All” the “Data” of the “Partition No: 1” for the “DataFrame” of “Address” having “A003” as the “Value” of the “Join Key Column” in the “Worker Node : 1”.
  • “All” the “Data” of the “Partition No: 3” for the “DataFrame” of “Person” having “A004” as the “Value” of the “Join Key Column” needs to be “Merged” with “All” the “Data” of the “Partition No: 3” for the “DataFrame” of “Address” having “A004” as the “Value” of the “Join Key Column” in the “Worker Node : 2”.

For What “Type” of “Join” the “Sort Merge Join” can be Used as the “Join Selection Strategy”?

  • The “Sort Merge Join” can be used “Only” for the “Equi Joins”, i.e., those “Join Operations”, where “Equals Operator” (“==”) is used.
  • The “Sort Merge Join” “Works” for “All Types” of “Joins”, including “Full Outer Join”.

“Join” the “Person DataDrame” With the “Address DataFrame” on “AddressId”.

dfPersonWithAddress = dfPerson.join(dfAddress, dfPerson.AddressId == dfAddress.AddressId, "inner")
dfPersonWithAddress.printSchema()
display(dfPersonWithAddress)

Output -

View the “Execution Plan” of the “DataFrame” Created by the “Join” Operation.

dfPersonWithAddress.explain()

Output -

In the “Physical Plan”, it can be seen that the “Join” performed is “Sort Merge Join” indeed.

Even Though, “Both” of the “Person DataFrame”, and, the “Address DataFrame” are “Small”, still “Apache Spark” selected the “Sort Merge Join” as the “Join Selection Strategy” in the “Physical Plan” Phase during the “Join” Operation, because, the “Sort Merge Join” is the “Default Join Selection Strategy” when a “Join” Operation is “Performed” between “Two DataFrames”.

How to “Disable” the “Sort Merge Join” as the “Default Join Selection Strategy”?

  • The “Sort Merge Join” is the “Default Join Selection Strategy” when a “JoinOperation is “Performed” between “Two DataFrames”.
  • To “Disable” the “Sort Merge Join” as the “Default Join Selection Strategy”, the “Spark Configuration Option”, i.e., “spark.sql.join.preferSortMergeJoin” is “Set” to “false”.
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")

2. Shuffle Hash Join

  • The “Shuffle Hash Join” goes through the following “Three Phases” -
    1. Shuffle
    2
    . Hash Table Creation
    3
    . Hash Join
  • Consider, in the “Cluster”, there are “Two Worker Nodes”, and, “Both” of the “DataFrames” are “Initially Spitted” into “4 Partitions”. Then, the “Shuffle Hash Join” is “Applied” to “Join” the “DataFrame” of “Person” with the “DataFrame” of “Address”.

Phase 1 : “Shuffle”

A “DataFrame” is nothing but a “Collection” of “Partitions” that are “Distributed” across “All” the “Cores” of “Every Worker Node” in a “Cluster”.

When a “DataFrame” is “Created”, the “Number of Partitions” for that “DataFrame” is decided by various “Parameters”, like — the “Default Block Size”, i.e., “128 MB”.

When a “JoinOperation is “Performed” between “Two DataFrames”, the “Driver Node” sends some “Piece of Code”, involving the “JoinOperation, in the “Form” of “Tasks” to the “Worker Nodes”.

“Each Task” will “Pick Up” “One Partition” of “Both” of the “DataFrames” “At a Time”, and, that particular “Task” will “Work On” the “Data” that is “Present” in those particular “Partitions”.

The “JoinOperation is “Performed” based on a “Key Column”. It might happen that “All” the “Data”, pertaining to a particular “Value” of the “Join Key Column” is “Not Present” in a “Single Partition”, but is “Scattered Around” the “Different Worker Nodes” in the “Cluster”.
Example -

  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A001” is “Not Present” in a “Single Partition” of the “DataFrame” of “Person”, but is “Present” in the “Multiple Partitions”.
    Each” of these “Partitions” might be “Present” in “Different Worker Nodes”.
  • The “Data”, pertaining to the “Join Key Column” with the “Value” as “A001” is “Present Only” in the “Partition No: 2” of the “DataFrame” of “Address”.
    So, “Partition No: 2” of the “DataFrame” of “Address” can “Not” be “Present” in “Different Worker Nodes”. Suppose, the “Partition No: 2” of the “DataFrame” of “Address” is “Present” in the “Third Worker Node”. Hence, the “JoinOperation can “Not Occur”.

The “Data” in “Each Partition”, for “Both” of the “DataFrames”, needs to be “Shuffled” so that “All” the “Data”, having a particular “Value” of the “Join Key Column” can be “Stored” in the “Same Partition” for the respective “DataFrames”, and, the “Resultant Partitions” of “Both” of the “DataFrames” can be “Stored” in the “Same Worker Node” depending on the “Values” of the “Join Key Column”, on which the “Filtering” happened.
Example -

  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A001” should be “Present” in the “Partition No: 1” for “Both” of the “DataFrames” in the “Worker Node : 1”.
  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A002” should be “Present” in the “Partition No: 3” for “Both” of the “DataFrames” in the “Worker Node : 2”.
  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A003” should be “Present” in the “Partition No: 1” for “Both” of the “DataFrames” in the “Worker Node : 1”.
  • “All” the “Data”, pertaining to the “Join Key Column” with the “Value” as “A004” should be “Present” in the “Partition No: 3” for “Both” of the “DataFrames” in the “Worker Node : 2”.

Phase 2 : “Hash Table Creation”

Once, the “ShufflePhase” is “Over”, a “Hash Table” will be “Created” based on the “Smaller DataFrame” from the “Two DataFrames” that are involved in the “JoinOperation.

In “Each Working Node”, a “Hash Table” will be “Created” based on “Each” of the “Partitions” of “Smaller DataFrame” that are “Present” in that particular “Working Node”.
Example -

In the “Working Node : 1”, for the “Smaller DataFrame”, i.e., the “DataFrame” of “Address”, the following “Two Hash Tables” will be “Created” -

  • A “Hash Tables” will be “Created” for the “Partition No : 1” of the “DataFrame” of “Address
  • Another “Hash Tables” will be “Created” for the “Partition No : 3” of the “DataFrame” of “Address

In the “Working Node : 2”, for the “Smaller DataFrame”, i.e., the “DataFrame” of “Address”, the following “Two Hash Tables” will be “Created” -

  • A “Hash Tables” will be “Created” for the “Partition No : 2” of the “DataFrame” of “Address
  • Another “Hash Tables” will be “Created” for the “Partition No : 4” of the “DataFrame” of “Address

Phase 3 : “Hash Join”

Once, the “Hash Table” is “Created” for “Each” of the “Partitions” of the “Smaller DataFrame” that are “Present” in a particular “Working Node”, “Each” of the “Hash Table” will be “Joined” with the “Respective Partition” of the “Larger DataFrame” in the “Same Working Node”.
Example -

In the “Working Node : 1” -

  • The “Hash Tables” that is “Created” for the “Partition No : 1” of the “DataFrame” of “Address” will be “Joined” with the “Partition No : 1” of the “DataFrame” of “Person
  • The “Hash Tables” that is “Created” for the “Partition No : 3” of the “DataFrame” of “Address” will be “Joined” with the “Partition No : 3” of the “DataFrame” of “Person

In the “Working Node : 2” -

  • The “Hash Tables” that is “Created” for the “Partition No : 2” of the “DataFrame” of “Address” will be “Joined” with the “Partition No : 2” of the “DataFrame” of “Person
  • The “Hash Tables” that is “Created” for the “Partition No : 4” of the “DataFrame” of “Address” will be “Joined” with the “Partition No : 4” of the “DataFrame” of “Person

Why the “Shuffle Hash Join” is “Termed” as “Expensive Operation”?

  • The “Shuffle Hash Join” is an “Expensive Operation”, because, it uses both the “Shuffling”, and, the “Hashing”, which are “Individually” “Expensive Operations”.
    Also, “Maintaining” a “Hash Table” requires “Memory”, and, “Computation”.

When the “Shuffle Hash Join” Can “Work” as the “Join Selection Strategy”?

“Apache Spark” will “Not Choose” the “Shuffle Hash Join” as the “Join Selection Strategy” by default, because, the “Shuffle Hash Join” is an “Expensive Operation”.

The “Shuffle Hash Join” can be used as the “Join Selection Strategy” if the following criteria are met -

  • When the “Sort Merge JoinOperation is “Disabled” as the “Default Join Selection Strategy”.
    To “Disable” the “Sort Merge Join” as the “Default Join Selection Strategy”, the “Spark Configuration Option”, i.e., “spark.sql.join.preferSortMergeJoin” is “Set” to “false”.
  • When the “Broadcast Hash JoinOperation can “Not” be used as the “Join Selection Strategy”, because, “Both” of the “DataFrames” involved in a “JoinOperation are “Above” the “Range” of the “Default Broadcast Threshold Limit”, i.e., “10 MB”.
    It is also possible to “Forcibly Disable” the “Broadcast Hash Join” by “Setting” the “Value” of the “Broadcast Threshold Limit” to “-1”.
  • If the “Sizes” of “Both” of the “DataFrames” are “Already Known”, then “Explicitly Pass” the “Join Hint” as “SHUFFLE_HASH” for the “Smaller DataFrame” during the “JoinOperation.

“Forcibly Disable” the “Broadcast Hash Join” by Setting “Broadcast Threshold Limit” to “-1”.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

“Disable” the “Sort Merge Join” as the “Default Join Selection Strategy”.

spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")

“Explicitly Pass” the “Join Hint” as “SHUFFLE_HASH” for the “Smaller DataFrame” While “Joining” the “Person DataDrame” With the “Address DataFrame” on “AddressId”.

dfPersonWithAddress = dfPerson.join(dfAddress.hint("SHUFFLE_HASH"), dfPerson.AddressId == dfAddress.AddressId, "inner")
dfPersonWithAddress.printSchema()
display(dfPersonWithAddress)

Output -

View the “Execution Plan” of the “DataFrame” Created by the “Join” Operation.

dfPersonWithAddress.explain()

Output -

In the “Physical Plan”, it can be seen that the “Join” performed is “Shuffle Hash Join” indeed.

Even Though, “Both” of the “Person DataFrame”, and, the “Address DataFrame” are “Small”, “Apache Spark” did not select the “Broadcast Hash Join” as the “Join Selection Strategy” in the “Physical Plan” Phase during the “Join” Operation, because, the “Broadcast Hash Join” is “Forcibly Disabld” by Setting the “Value” of the “Broadcast Threshold Limit” to “-1”.

“Apache Spark” also did not select the “Sort Merge Join” as the “Join Selection Strategy” in the “Physical Plan” Phase during the “Join” Operation, because, the “Sort Merge Join” is is “Forcibly Disabld” as the “Default Join Selection Strategy”.

“Apache Spark” selected the “Shuffle Hash Join” as the “Join Selection Strategy” in the “Physical Plan” Phase during the “Join” Operation, because, during the “Join” Operation, it was mentioned in the “Hint” of the “Smaller DataFrame”, i.e., “DataFrame” of “Address”.

For What “Type” of “Join” the “Shuffle Hash Join” can be Used as the “Join Selection Strategy”?

  • The “Shuffle Hash Join” can be used “Only” for the “Equi Joins”, i.e., those “Join Operations”, where “Equals Operator” (“==”) is used.
  • The “Shuffle Hash Join” can be used for “All Types” of “Join”, only “Except” for the “Full Outer Join”.

3. Broadcast Hash Join

  • When a “JoinOperation is “Performed” between “Two DataFrames”, if the “Size” of “Any One” or “Both” the “DataFrames” lie “Within” the “Range” of the “Broadcast Threshold Limit”, then a “Read-Only Copy” of the “DataFrame”, which is having a “Size” that is “Within” the “Range” of the “Broadcast Threshold Limit”, is “Made Available” in the “Memory” of “All” the “Worker Nodes” in the “Cluster”.
  • Then, a “Hash Join” will be “Performed” between the “Read-Only Copy” of the “DataFrame” that is “Made Available” in “Each” of the “Worker Nodes”, and, the “Partition” of the “Large DataFrame” that is “Present” in the “Respective Worker Nodes”.

“Default Size” of the “Broadcast Threshold Limit”

  • Using the “Spark Configuration Option”, i.e., “spark.sql.autoBroadcastJoinThreshold”, it is possible to “Find Out” the “Maximum Size” of a “DataFrame” that is “Allowed” to be “Broadcasted”.
  • The “Default Value” of the “Broadcast Threshold Limit” is “10 MB”, which means that “Any DataFrame”, which is “Below 10 MB” in “Size” is “Allowed” to be “Broadcasted”.

Display the “Default Size” of the “Broadcast Threshold Limit”.

print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))

Output -

Can the “Default Size” of the “Broadcast Threshold Limit” be “Changed” to “Any Other Value”?

  • Using the “Spark Configuration Option”, i.e., “spark.sql.autoBroadcastJoinThreshold”, it is possible to “Change” the “Size” of the “Broadcast Threshold Limit” from its “Default Size” to “Any Desired Size” as per the “Requirement” as well.

Change “Size” of the “Broadcast Threshold Limit” from its “Default Value” to “Any Desired Value”.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 99999999)

How to “Disable” the “Broadcast Threshold Limit”?

  • By default, the “Broadcast Threshold Limit” is “Enabled”.
  • To “Disable” the “Broadcast Threshold Limit”, the “Spark Configuration Option”, i.e., “spark.sql.autoBroadcastJoinThreshold” is “Set” to “-1”.

“Disable” the “Broadcast Threshold Limit”.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

Why “Lesser Shuffling” Occurs in a “Broadcast Hash Join”?

  • When a “Broadcast Hash JoinOperation is “Performed” between “Two DataFrames”, “Lesser ShufflingOccurs, because, the “Two DataFrames” that are “Involved” in the “JoinOperation are “Present” in “Each” of the “Worker Nodes”, as the “Read-Only Copy” of the “Small DataFrame” is “Made Available” in “Each” of the “Worker Nodes”, where the “Respective Partition” of the “Large DataFrame” is also “Present”.

What Happens When Both of the “DataFrames” in a “Join” are “Larger” Than the “Broadcast Threshold Limit” and “Broadcast Hash Join” is Used?

  • When a “Broadcast Hash JoinOperation is “Performed” between “Two DataFrames”, where the “Size” of “Both” the “DataFrames” are “Larger” than the “Broadcast Theshold Limit”, and, a “Read-Only Copy” of “Any One” of the “DataFrames” is “Made Available” in the “Memory” of “All” the “Worker Nodes” in the “Cluster”, then “Out of MemoryError will be “Thrown” by the “Apache Spark”.

How to “Explicitly Define” the “Broadcast Hash Join” as the “Join Selection Stratey” During the “Join” Operation?

  • It is also possible to “Explicitly Define” the “Broadcast Hash Join” as the “Join Selection Strategy” During the “JoinOperation by using the “Broadcast Variable” to “Send” the “Smaller DataFrame” to “Each Worker Node” of the “Cluster”.
  • To do so, the “Smaller DataFrame” is “Passed” to the “broadcastFunction during the “JoinOperation.

“Join” the “Person DataDrame” With the “Address DataFrame” on “AddressId” Using “Broadcast Variable”.

from pyspark.sql.functions import broadcast

dfPersonWithAddress = dfPerson.join(broadcast(dfAddress), dfPerson.AddressId == dfAddress.AddressId, "inner")
dfPersonWithAddress.printSchema()
display(dfPersonWithAddress)

Output -

View the “Execution Plan” of the “DataFrame” Created by the “Join” Operation.

dfPersonWithAddress.explain()

Output -

In the “Physical Plan”, it can be seen that the “Join” performed is “Broadcast Hash Join” indeed.

Even Though, “Both” of the “Person DataFrame”, and, the “Address DataFrame” are “Small”, “Apache Spark” would not have selected the “Broadcast Hash Join” as the “Join Selection Strategy” in the “Physical Plan” Phase during the “Join” Operation, because, the “Shuffle Sort Join” is the “Default Join Selection Strategy”.

To make “Apache Spark” select the “Broadcast Hash Join” as the “Join Selection Strategy” in the “Physical Plan” Phase during the “Join” Operation, the “Smaller DataFrame”, i.e., the “DataFrame” of “Address”, is “Sent” using the “Broadcast Variable” “Explicitly”.

--

--

Oindrila Chakraborty
Oindrila Chakraborty

Written by Oindrila Chakraborty

I have 12+ experience in IT industry. I love to learn about the data and work with data. I am happy to share my knowledge with all. Hope this will be of help.