Usage of “Broadcast Variable” in “Joins” in “Apache Spark”

Oindrila Chakraborty
7 min readOct 6, 2023

Why “Broadcast Variable” is Used?

  • The “Broadcast Variable” is used mainly for “Performance Tuning”.

What is a “Broadcast Variable”?

  • The “Broadcast Variable” is a “Programming Mechanism” in “Apache Spark” through which it is possible to “Keep” the “Read-Only Copy” of the “Data” of a “Table”, or, a “DataFrame” in “Each Node” of the “Cluster”, instead of “Sending” the “Data” of that “Table”, or, that “DataFrame”, in the “Form” of “Multiple Partitions”, into “Each Core” of “Each Worker Node” of the “Cluster”, when a “Task” needs the “Data”.

When the “Usage” of the “Broadcast Variable” is “Most Suitable”?

  • The “Usage” of the “Broadcast Variable” is “Ideal” for the “JoinOperations, where “One Side” of the “JoinOperation is a “Small DataFrame”, or, a “Small Table”.

What “Type” of a “Table” can be “Sent” Using the “Broadcast Variable”?

  • The “Broadcast Variable” is “Only Suitable” for the “Small Tables”, or, the “Small DataFrames” as a “Read-Only Copy” of the “Small Tables”, or, the “Small DataFrames” is “Sent” to “All” the “Worker Nodes” of a “Cluster”, and, the “Local Read-Only Copy” of the “Small Tables”, or, the “Small DataFrames” gets “Cached” in “Each” of the “Worker Nodes”.

Can a “Large Table” be “Sent” Using the “Broadcast Variable”?

  • If a “Large Table” is “Sent” using the “Broadcast Variable”, then “One Read-Only Copy” of the “Entire Large Table” will be “Sent” to “Each Worker Node” in a “Cluster”.
  • This might “Consume” the “Entire Memory” of “Each Worker Node”, which will “Not Leave Enough Memory” for the “Processing” of the “Data”.
  • Therefore, “Sending” a “Large Table” using the “Broadcast Variable” will “Hit” the “Performance”.

Can the “Size” of the “Table” that is “Sent” Using the “Broadcast Variable” be “Larger” than the “Size” of the “Memory” of the “Driver Node”?

  • The “Broadcast Variable” is “Sent” to the “Worker Nodes” of a “Cluster” through its “Driver Node”.
  • So, the “Size” of the “Table”, or, the “DataFrame” that is “Sent” using the Broadcast Variable”, should be “Smaller” than the “Size” of the “Memory” of the “Driver Node”.
  • Example: If the “Size” of the “Memory” of the “Driver Node” is “14 GB”, and, the “Size” of the “Table”, or, the “DataFrame” that is “Sent” using the “Broadcast Variable” is “16 GB”, then the “Driver Node” would “Throw” the “Out of Memory” Exception.

“Internal Spark Architecture” of a “Normal Parallel Processing”

  • It is possible to “Join” a “Large Fact Table”, containing “Billions of Records” with a “Small Dimension Table”, containing “Hundreds”, or, a “Few Thousands” of “Records”.
  • When “Apache Spark” “Reads” the “Fact Table”, and, the “Dimension Table”, the “Driver Node” would “Split” the “Data” of the “Fact Table”, and, the “Dimension Table” into “Multiple Smaller Partitions”.
  • “Each Partition” is “Created” for “Each Core” of “All” of the “Worker Nodes”.
  • When the “Driver Node” sends some “Piece of Code” in the “Form” of “Tasks” to the “Worker Nodes”, “Each Task” will “Pick Up” “One Partition” “At a Time”, and, that particular “Task” will “Work On” the “Data” that is “Present” in that particular “Partition”.

“Downside” of a “Normal Parallel Processing”

  • When a “Join”, or, a “GroupingOperation is “Performed” on “One”, or, “Multiple DataFrames”, the “Related Data” should be “Grouped Together”.
  • This can be done using the “Shuffle”, which is a “Very Costly Operation”.
  • The “Performance” of that “Join”, or, “GroupingOperation will be “Decreased”, because, “Data Movement” will “Happen” across “All” of the “Worker Nodes” in the “Cluster” to “Group” the “Related Data” “Together”.

“Internal Spark Architecture” of a “Parallel Processing” With “Broadcast Variable”

  • It is possible to “Join” a “Large Fact Table”, containing “Billions of Records” with a “Small Dimension Table”, containing “Hundreds”, or, a “Few Thousands” of “Records”.
  • When “Apache Spark” “Reads” the “Small Dimension Table”, instead of “Splitting” the “Data” of that “Dimension Table” into “Multiple Smaller Partitions”, the “Driver Node” “Sends” the “Read-Only Copy” of the “Entire Dimension Table” into “All” of the “Worker Nodes”. This is called as “Broadcasting”.
  • Since, the “Local Read-Only Copy” of a “Small Dimension Table” is “Made Available” to “Each” of the “Worker Nodes”, then, “Each Core” within the respective “Worker Nodes” can “Refer” to the “Local Read-Only Copy” of the “Small Dimension Table”, making the “JoinOperationFaster” as the “Small Dimension Table” is “Not Shuffled” across “All” of the “Worker Nodes” in the “Cluster” to “Group” the “Related Data” “Together”.

Does the “Data” Sent Through “Broadcast Variable” Gets “Cached” in the “Worker Nodes” of a “Cluster”?

  • When “Apache Spark” “Reads” a “Small DataFrame”, or, a “Small Table” for the “First Time”, the “Driver Node” sends the “Read-Only Copy” of the “Entire DataFrame”, or, the “Entire Table”, using the “Broadcast Variable”, across “All” the “Worker Nodes” of the “Cluster”.
  • Now, when “Any Core” within “Any Worker Node” “Reads” the “Local Read-Only Copy” of the “Entire DataFrame”, or, the “Entire Table” for the “First Time”, that particular “Worker Node”, having that particular “Core” would “Cache” the “Local Read-Only Copy”.
  • So, there will be “No Need” for the “Driver Node” to send the “Read-Only Copy” of the “Entire DataFrame”, or, the “Entire Table”, using the “Broadcast Variable”, to a “Worker Node” again and again when “Different Tasks” require the “Data”, working on the “Same Worker Node”.

How Using “Broadcast Variable” in “Joins” “Improves” the “Performance”?

  • When, a “Large Fact Table”, containing “Billions of Records” is “Joined” with a “Small Dimension Table”, containing “Hundreds”, or, a “Few Thousands” of “Records”, the “Driver Node” “Splits”, and, “Sends” the “Partitions” of the “Data” of the “Fact Table” across “All” the “Worker Nodes” in the “Cluster”, but, the “Driver Node” sends the “Read-Only Copy” of the “Entire Dimension Table”, using the “Broadcast Variable” to “Each” of the “Worker Nodes” in the “Cluster”.
  • The “Local Read-Only Copy” of the “Entire Dimension Table” would be then “Stored” in the “Form” of “Cache”, when “Any Worker Node” “Reads” the “Read-Only Copy” of the “Entire Dimension Table” for the “First Time”.
  • Using the “Broadcast Variable”, the “Performance” of the “JoinOperation is “Improved” in the following way -

1. During the “JoinOperation, “Each Worker Node” might need to only “Shuffle” the “Data” of the “Fact Table”, but, “Not” the “Data” of the “Dimension Table”.
So, by “Avoiding” the “ShuffleOperation, it “Improves” the “Performance”.

2. Since, “Each Worker Node” “Caches” the “Read-Only Copy” of the “Entire Dimension Table”, whenever any “Task” requires the “Data” of the “Dimension Table” in a “Worker Node”, where the “Data” is “Already Used” in a “Previous Task”, the “Driver Node” does “Not” need to send the “Read-Only Copy” of the “Entire Dimension Table” again.
So, by “Reducing” the “Network Input/OutputOperations, it “Improves” the “Performance”.

Example -

Create Lists for “Person”.

# Create a List Containing the "Column Names" for "Person"
personColumns = ["Id", "First_Name", "Last_Name", "AddressId"]

# Create a List Containing the "Data" for "Person"
personList = [\
(1001, "Oindrila", "Chakraborty", "A001"),
(1002, "Soumyajyoti", "Bagchi", "A002"),
(1003, "Oishi", "Bhattacharyya", "A004"),
(1004, "Sabarni", "Chakraborty", "A003"),
(1005, "Ayan", "Dutta", "A002"),
(1006, "Dhrubajyoti", "Das", "A004"),
(1007, "Sayantan", "Chatterjee", "A004"),
(1008, "Ritwik", "Ghosh", "A001"),
(1009, "Puja", "Bhatt", "A001"),
(1010, "Souvik", "Roy", "A002"),
(1011, "Souvik", "Roy", "A003"),
(1012, "Ria", "Ghosh", "A003"),
(1013, "Soumyajit", "Pal", "A002"),
(1014, "Abhirup", "Chakraborty", "A004"),
(1015, "Sagarneel", "Sarkar", "A003"),
(1016, "Anamika", "Pal", "A002"),
(1017, "Swaralipi", "Roy", "A002"),
(1018, "Rahul", "Roychowdhury", "A003"),
(1019, "Paulomi", "Mondal", "A004"),
(1020, "Avishek", "Basu", "A002"),
(1021, "Avirupa", "Ghosh", "A004"),
(1022, "Ameer", "Sengupta", "A003"),
(1023, "Megha", "Kargupta", "A002"),
(1024, "Madhura", "Chakraborty", "A002"),
(1025, "Debankur", "Dutta", "A002"),
(1026, "Bidisha", "Das", "A001"),
(1027, "Rohan", "Ghosh", "A004"),
(1028, "Tathagata", "Acharyya", "A003")
]

Create Lists for “Address”.

# Create a List Containing the "Column Names" for "Address"
addressColumns = ["AddressId", "Address"]

# Create a List Containing the "Data" for "Address"
addressList = [\
("A001", "India"),
("A002", "US"),
("A003", "UK"),
("A004", "UAE")
]

Create a DataFrame for “Person”.

dfPerson = spark.createDataFrame(personList, schema = personColumns)
dfPerson.printSchema()
display(dfPerson)

Output -

Create Another DataFrame for “Address”.

dfAddress = spark.createDataFrame(addressList, schema = addressColumns)
dfAddress.printSchema()
display(dfAddress)

Output -

“Join” the “Person DataDrame” With the “Address DataFrame” on “AddressId” Using “Broadcast Variable”.

from pyspark.sql.functions import broadcast

dfPersonWithAddress = dfPerson.join(broadcast(dfAddress), dfPerson.AddressId == dfAddress.AddressId, "inner")
dfPersonWithAddress.printSchema()
display(dfPersonWithAddress)

Output -

View the “Execution Plan” of the “DataFrame” Created by the “Join” Operation.

dfPersonWithAddress.explain()

In the “Physical Plan”, it can be seen that the “Join” performed is “Broadcast Join” indeed.

--

--

Oindrila Chakraborty

I have 11+ experience in IT industry. I love to learn about the data and work with data. I am happy to share my knowledge with all. Hope this will be of help.