Perform “SCD Type 1” Using “MERGE” Operation on Delta Table Using “SPARK SQL” and “PySpark” in Databricks

Oindrila Chakraborty
13 min readOct 7, 2023

--

What is “Slowly Changing Dimension”?

The “Slowly Changing Dimension” is “One” of the “Very Important Concept” in “Data Warehousing Solution”.

“Slowly Changing Dimension” is the “Change” of “Attribute”, or, the “Value” of the “Entities” over a “Period” of “Time”.
Example:

  • “Consider” that there is a “Delta Table”, called “Person”, which has an “Attribute”, called as “Address”. Now, a “Person” can “Stay” in an “Address” now. But, after “One Year”, the “Same Person” can “Change” the “Address”.
    As a result, the “Latest Value” of the “Address” needs to be “Updated” in the “Address” “Attribute” of the “PersonTable.
  • This is called as the “Slowly Changing Dimension”.

“Different Methods” of “Handling” the “Slowly Changing Dimension”

There are “Various Methods” of “Handling” the “Slowly Changing Dimension” in “Data Warehousing Solution”.

The “Commonly Used” “Three Approaches” to “Handle” the “Slowly Changing Dimension” are -

  • 1. “SCD Type 1” — In “SCD Type 1”, the “Old Record” is “Overwritten” with the “New Record”.
  • 2. “SCD Type 2” — In “SCD Type 2”, a “New Record” is “Introduced” for “Each Change” of the “Attribute”.
  • 3. “SCD Type 3” — In “SCD Type 3”, a “New Column” is “Introduced” for “Each Change” of the “Attribute”.

“Considering” the “Delta Table” “Person”, if a “Person” “Stays” in an “Address” for a “Year”, and, after “One Year” is “Over”, “Changes” the “Address”, then -

  • In “SCD Type 1”, the “Address” “Attribute” will be “Overwritten” with the “Latest Value” of the “Address”.
    In this “Approach”, the “Historical Information” is “Not Preserved”. So, it will “Not” be “Possible” to “Retrieve” the “Previous Address” “After” a “Period of Time” from the “Person” “Delta Table”, as, the “Previous Address” is “Overwritten” with the “Latest Address”.
  • In “SCD Type 2”, the “Row” that is “Associated” with the “Previous Address” will become “Inactive”, and, a “New Row” will be “Inserted” into the “Person” “Delta Table” with the “Latest Value” of the “Address”.
    In this “Approach”, the “Historical Information” is “Preserved”, but, at the “Same Time” “Duplicate Records” of the “Primary Keys” are “Created”.
  • In “SCD Type 3”, in the “Same Row”, a “New Column” will be “Added” for “Latest Value” of the “Address”, and, the “Address” “Attribute” will be “Renamed”.

What is “Merge”, or, “Upsert” Operation?

“SCD Type 1” is “Performed” in “Data Warehousing Solution” using the “MergeOperation.

When a “Target Table” needs to be “Updated” with the “Latest Data” that is “Coming” from the “Source”, the “Latest Data” which is “Coming” from the “Source” is “Compared” with the “Already Existing Data” of the “Target Table”, based on the “Merge Condition”.

  • If there is a “Match Found” between the “Source”, and, the “Target”, then the “Target Table” would be “Updated” with the “Latest Data” which is “Coming” from the “Source”.
  • If there is “No Match Found” between the “Source”, and, the “Target”, then it means that the “Source” is having “New Data”. So, the “New Data” which is “Coming” from the “Source” will be “Inserted” into the “Target Table”.
    This “Operation” is called as the “MergeOperation.

The “MergeOperation is also called as the “UpsertOperation.

The “MergeOperation is “One” of the “Most Commonly Used Operations” in the “Database Development”.

“Challenges” of Using “Merge” Operation in “Data Lake”.

  • When “Data Lake” got “Introduced” in the “Big Data Development”, the “MergeOperation were “Not Allowed” to be “Performed” in the “Data Lake”.
  • This was “One” of the “Shortcomings” of the “Data Lake”.

Did the Usage of “Delta Lake” Actually Solved the Problem of “Performing” the “Merge” Operation?

  • When “Databricks” “Introduced” the “Delta Lake” in the “Big Data Development”, the “MergeOperation was “Allowed” to be “Performed” in the “Delta Lake”.

“Disadvantage” of Using “Spark SQL” to “Perform” the “Merge” Operation

  • “MergeOperation can be “Performed” using “Spark SQL”, but, still it is “Not Suitable” for “All” the “Scenarios”, which involves “Complex Derivation Logic”.
  • In “Some” of the “Scenarios”, involving “Complex Derivation Logic”, the “User-Defined Functions”, i.e, the “UDFs” need to be “Created” to “Perform” the “MergeOperation.

Why It is Necessary for the “Developers” to Know the Usage of “Merge” Operation in “PySpark”?

“Not” “All” the “Scenarios” are “Suitable” to “Perform” the “MergeOperation using the “Spark SQL”, like, involving “Complex Derivation Logic”.

  • In “Some” of the “Scenarios”, involving “Complex Derivation Logic”, the “User-Defined Functions”, i.e., the “UDFs” need to be “Created” to “Perform” the “MergeOperation.

The “UDFs” are “Created” in “PySpark”. Hence, the “Developers” need to know the “Usage” of “MergeOperation in “PySpark”.

“Create” the “Target Delta Table”

Create the Database “training”

CREATE DATABASE IF NOT EXISTS training;

Create an “Empty Delta Table”

CREATE OR REPLACE TABLE training.person
(
PersonId INT,
FirstName STRING,
LastName STRING,
Country STRING
)
USING DELTA
LOCATION '/FileStore/tables/delta-table-merge/person';

“Display” the “Content” of the “Delta Table” (Currently “Empty”)

SELECT * FROM training. Person;

Output -

A. “Perform” the “Merge” Operation Using “Spark SQL”

  • When the “MergeOperation needs to be “Performed” using the “Spark SQL”, it is “Mandatory” to “Make” “Both” the “Source”, and, the “Target” as the “Table”, or, “View”.
  • Usually, the “Source” is used as the “View”, whereas, the “Target” is “Always” the “Table”.

First Load

Step 1: “Create” the “Source” for the “First Load”

Create Lists for “Person” for the “First Load”

# Create a List Containing the "Column Names" for "Person"
personColumns = ["PersonId", "FirstName", "LastName", "Country"]

# Create a List Containing the "Data" for "Person"
personList = [\
(1001, "Oindrila", "Chakraborty", "India"),
(1002, "Soumyajyoti", "Bagchi", "US"),
(1003, "Oishi", "Bhattacharyya", "UK")
]

Create a “Source DataFrame” for the “First Load”

dfPerson = spark.createDataFrame(personList, schema = personColumns)
dfPerson.printSchema()
display(dfPerson)

Output -

Create a “Source View” from the “Source DataFrame” for the “First Load”

dfPerson.createOrReplaceTempView("vw_person")

“Display” the “Content” of the “Source View”

SELECT * FROM vw_person;

Output -

Step 2: “Perform” the “Merge” Operation

In this “MergeOperation, based on the “Merge Condition”, i.e., the “Value” of the “Column” of “PersonId”, the “Target Table”, i.e., “training.person” will be “Updated” with the “Latest Data” that is “Coming” from the “Source View”, i.e., “vw_person”.

  • If there is a “Match Found” between the “Source”, and, the “Target Table”, i.e., if a “Value” of the “Column” of “PersonId” “Exists” in “Both” the “Source”, and, the “Target Table”, then the “Values” of “All” the “Other Columns” for that “Matching Row” in the “Target Table” would be “Updated” with the “Values” in the respective “Columns”, which is “Coming” from the “Source”.
  • If there is “No Match Found” between the “Source”, and, the “Target Table”, i.e., if “No Value” of the “Column” of “PersonId” from the “Source” “Exists” in the “Target Table”, then that “Unmatched Row” from the “Source” will be “Inserted” into the “Target Table”.

Since, currently, the “Target Table” is “Empty”, “No Value” of the “Column” of “PersonId” from the “Source” will “Exist” in the “Target Table”.

That means, “All” the “Rows” of the “Source” will be “Inserted” into the “Target Table”.

“Perform” the “Merge” Operation into the “Delta Table” Using “Spark SQL” for the “First Load”

MERGE INTO training.person TARGET
USING vw_person SOURCE
ON TARGET.PersonId = SOURCE.PersonId
WHEN MATCHED THEN
UPDATE SET
TARGET.FirstName = SOURCE.FirstName,
TARGET.LastName = SOURCE.LastName,
TARGET.Country = SOURCE.Country
WHEN NOT MATCHED THEN
INSERT
(
PersonId,
FirstName,
LastName,
Country
)
VALUES
(
SOURCE.PersonId,
SOURCE.FirstName,
SOURCE.LastName,
SOURCE.Country
)

Output -

  • From the “Statistics”, it can be seen that, the “Number of Affected Rows” is the “Same” as the “Number of Inserted Rows”, as, in the “First Load”, “No Row” in the “Target Table” was “Updated”, but, “All” the “Rows” of the “Source” are “Inserted” into the “Target Table”.

“Display” the “Content” of the “Delta Table” After the “First Load”

SELECT * FROM training.person;

Output -

Second Load

Step 1: “Create” the “Source” for the “Second Load”

Create Lists for “Person” for the “Second Load”

In the “Data List” -

1. The “Second Tuple” Remains With the “Same Values” as the “Data List” in the “Previous Load”.

2. In the “First Tuple”, the “Values” of the “Columns” — “LastName”, and, “Country” are “Changed”.

3. The “Third Tuple” is a “New Tuple”, which was “Not” in the “Data List” for the “Previous Load”.

# Create a List Containing the "Column Names" for "Person"
personColumns = ["PersonId", "FirstName", "LastName", "Country"]

# Create a List Containing the "Data" for "Person"
personList = [\
(1001, "Oindrila", "Bagchi", "Japan"),
(1002, "Soumyajyoti", "Bagchi", "US"),
(1004, "Rahul", "Roychowdhury", "Turkey")
]

Create a “Source DataFrame” for the “Second Load”

dfPerson = spark.createDataFrame(personList, schema = personColumns)
dfPerson.printSchema()
display(dfPerson)

Output -

Create a “Source View” from the “Source DataFrame” for the “Second Load”

dfPerson.createOrReplaceTempView("vw_person")

“Display” the “Content” of the “Source View”

SELECT * FROM vw_person;

Output -

Step 2: “Perform” the “Merge” Operation

  • In this “MergeOperation, “Two Values” of the “Column” of “PersonId”, i.e., “1001”, and, “1002”, from the “Source” will “Exist” in the “Target Table”. So, the “Values” of “All” the “Other Columns” for those “Two Matching Rows” in the “Target Table” would be “Updated” with the “Values” of the respective “Columns” of the “Source”.
  • “One Value” of the “Column” of “PersonId”, i.e., “1004”, from the “Source” will “Not Exist” in the “Target Table”. That means, that “Unmatched Row” of the “Source” will be “Inserted” into the “Target Table”.

When the “Merge Condition” is “Matched” between the “Source”, and, the “Target”, instead of “Providing” “All” the “Column Names” for “Both” the “Source”, and, the “Target” in the “UPDATE SETStatement to “Update” the “Target Delta Table”, inside the “WHEN MATCHED THENClause, the “UPDATE SET *Statement can be used.

Similarly, When the “Merge Condition” is “Not Matched” between the “Source”, and, the “Target”, instead of “Providing” “All” the “Column Names” for “Both” the “Source”, and, the “Target” in the “INSERTStatement to “Insert” into the “Target Delta Table”, inside the “WHEN NOT MATCHED THENClause, the “INSERT *Statement can be used.

“Perform” the “Merge” Operation into the “Delta Table” Using “Spark SQL” for the “Second Load”

MERGE INTO training.person TARGET
USING vw_person SOURCE
ON TARGET.PersonId = SOURCE.PersonId
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *

Output -

  • From the “Statistics”, it can be seen that, the “Number of Affected Rows” is the “Summation” of the “Number of Updated Rows”, and, the “Number of Inserted Rows”.
  • In the “Second Load”, “Two Values” of the “Column” of “PersonId”, i.e., “1001”, and, “1002” from the “Source” are “Present” in the “Target Table”.
    So, the “Two Matched Rows” are “Updated” in the “Target Table” with the “Values” of the “Respective Columns” of the “Source”.
    Hence, the “Number of Updated Rows” is “2”.
  • In the “Second Load”, “One Value” of the “Column” of “PersonId”, i.e., “1004” from the “Source” is “Not Present” in the “Target Table”.
    So, the “Unmatched Row” of the “Source” will be “Inserted” into the “Target Table”.
    Hence, the “Number of Inserted Rows” is “1”.

“Display” the “Content” of the “Delta Table” After the “Second Load”

SELECT * FROM training.person;

Output -

B. “Perform” the “Merge” Operation Using “PySpark”

  • When the “MergeOperation needs to be “Performed” using the “PySpark”, it is “Mandatory” to “Keep” the “Source” as the “DataFrame”. So, “No” “View” needs to be “Created” as the “Source”.
  • The “Entire Data” of the “Target Delta Table” needs to be “Read” as the “Delta Table” first, in order to use the “Target” in the “MergeOperation.

Third Load

Step 1: “Create” a “Delta Table Instance” Using “DeltaTableBuilder API”

What is a “Delta Table Instance”?

  • A “Delta Table Instance” is a “Replica” of the “Actual Delta Table”.
  • The “Delta Table Instance” will “Create” a “Soft Link” to the “Actual Delta Table”, but “Not” a “Copy” of the “Actual Delta Table”.
  • Since, a “Soft Link” “Exists” between the “Delta Table Instance” and the “Actual Delta Table”, any “DML Operations” “Performed” on the “Delta Table Instance” will “Reflect” in the “Actual Delta Table”.
    Similarly, any “Modification” “Performed” on the “Actual Delta Table” will “Reflect” in the “Delta Table Instance”.

What is the “Usage” of the “Delta Table Instance”?

In order to “Perform” any “DML Operation”, or, to “Execute” any “SQL Statement” on a “Delta Table”, the “Developers” usually “Depend” on the “Spark SQL”.

Using the “Spark SQL Query”, it is posible to -

  • “Select” the “Data” from a “Delta Table
  • “Update” the “Data” of a “Delta Table
  • “Delete” the “Data” from a “Delta Table”, etc.

It might happen that due to some “Project Standard”, the “Developers” only need to use “PySpark” in the “Project”.
To “Perform” any “DML Operation”, or, to “Execute” any “SQL Statement” on a “Delta Table” using “PySpark”, just “Referring” the “Delta Table” by its “Name” is “Not” going to “Work”.

To “Perform” any “DML Operation”, or, to “Execute” any “SQL Statement” on a “Delta Table” using “PySpark”, the “Delta Table Instance” of that “Delta Table” needs to be “Created”. This is the “Primary Usage” of the “Delta Table Instance”.

“Create” a “Delta Table Instance” Using “DeltaTableBuilder API”

from delta.tables import *

personDeltaTable = DeltaTable.forPath(spark, "/FileStore/tables/delta-table-merge/person")
print(type(personDeltaTable))

Output -

How to “Convert” the “Delta Table Instance” to “DataFrame”?

  • In order to “Display” the “Data” of a “Delta Table Instance”, the “Delta Table Instance” needs to be “Converted” to a “DataFrame”.
  • To “Convert” a “Delta Table Instance” to a “DataFrame”, the “.toDF()Method” of the “Delta Table Instance” is used.

“Convert” a “Delta Table Instance” to a “DataFrame”

dfFromPersonDeltaTable = personDeltaTable.toDF()
display(dfFromPersonDeltaTable)

Output -

Step 2: “Create” the “Source” for the “Third Load”

Create Lists for “Person” for the “Third Load”

In the “Data List” -

1. The “Second Tuple” Remains With the “Same Values” as the “Data List” in “Any” of the “Previous Loads”.

2. In the “First Tuple”, the “Values” of the “Columns” — “FirstName”, and, “Country” are “Changed”.

3. The “Third Tuple” is a “New Tuple”, which was “Not” in the “Data List” for the “Previous Loads”.

# Create a List Containing the "Column Names" for "Person"
personColumns = ["PersonId", "FirstName", "LastName", "Country"]

# Create a List Containing the "Data" for "Person"
personList = [\
(1004, "Ayan", "Roychowdhury", "Jordan"),
(1001, "Oindrila", "Bagchi", "Japan"),
(1005, "Kasturi", "Chakraborty", "South Korea")
]

Create a “Source DataFrame” for the “Third Load”

dfPerson = spark.createDataFrame(personList, schema = personColumns)
dfPerson.printSchema()
display(dfPerson)

Output -

Step 3: “Perform” the “Merge” Operation

  • In this “MergeOperation, “Two Values” of the “Column” of “PersonId”, i.e., “1001”, and, “1004”, from the “Source” will “Exist” in the “Target Table”. So, the “Values” of “All” the “Other Columns” for those “Two Matching Rows” in the “Target Table” would be “Updated” with the “Values” of the respective “Columns” of the “Source”.
  • “One Value” of the “Column” of “PersonId”, i.e., “1005”, from the “Source” will “Not Exist” in the “Target Table”. That means, that “Unmatched Row” of the “Source” will be “Inserted” into the “Target Table”.

“Perform” the “Merge” Operation into the “Delta Table” Using “PySpark” for the “Third Load”

personDeltaTable.alias("Target")\
.merge(
source = dfPerson.alias("Source"),
condition = "Target.PersonId = Source.PersonId"
)\
.whenMatchedUpdate(
set =
{
"Target.FirstName": "Source.FirstName",
"Target.LastName": "Source.LastName",
"Target.Country": "Source.Country"
}
)\
.whenNotMatchedInsert(
values =
{
"Target.PersonId": "Source.PersonId",
"Target.FirstName": "Source.FirstName",
"Target.LastName": "Source.LastName",
"Target.Country": "Source.Country"
}
)\
.execute()
  • When the “MergeOperation is “Performed” using “PySpark”, by default, “Statistics” is “Not Displayed”.

“Display” the “Content” of the “Delta Table” After the “Third Load”

SELECT * FROM training.person;

Output -

Fourth Load

Step 1: “Create” a “Delta Table Instance” Using “DeltaTableBuilder API”

“Create” a “Delta Table Instance” Using “DeltaTableBuilder API”

from delta.tables import *

personDeltaTable = DeltaTable.forName(spark, "training.person")
print(type(personDeltaTable))

Output -

Step 2: “Create” the “Source” for the “Fourth Load”

In the “Data List” -

1. The “First Tuple” Remains With the “Same Values” as the “Data List” in “Any” of the “Previous Loads”.

2. In the “Second Tuple”, the “Values” of the “Columns” — “FirstName”, and, “Country” are “Changed”.

3. The “Third Tuple” is a “New Tuple”, which was “Not” in the “Data List” for the “Previous Loads”.

# Create a List Containing the "Column Names" for "Person"
personColumns = ["PersonId", "FirstName", "LastName", "Country"]

# Create a List Containing the "Data" for "Person"
personList = [\
(1001, "Oindrila", "Bagchi", "Japan"),
(1005, "Sabarni", "Chakraborty", "China"),
(1006, "Chandan", "Roy", "Saudi Arabia")
]

Create a “Source DataFrame” for the “Fourth Load”

dfPerson = spark.createDataFrame(personList, schema = personColumns)
dfPerson.printSchema()
display(dfPerson)

Output -

Step 3: “Perform” the “Merge” Operation

  • In this “MergeOperation, “Two Values” of the “Column” of “PersonId”, i.e., “1001”, and, “1005”, from the “Source” will “Exist” in the “Target Table”. So, the “Values” of “All” the “Other Columns” for those “Two Matching Rows” in the “Target Table” would be “Updated” with the “Values” of the respective “Columns” of the “Source”.
  • “One Value” of the “Column” of “PersonId”, i.e., “1006”, from the “Source” will “Not Exist” in the “Target Table”. That means, that “Unmatched Row” of the “Source” will be “Inserted” into the “Target Table”.

When the “Merge Condition” is “Matched” between the “Source”, and, the “Target”, instead of “Providing” “All” the “Column Names” for “Both” the “Source”, and, the “Target” as the “Dictionary” to “Update” the “Target Delta Table”, i.e., “set”, inside the “whenMatchedUpdateFunction, the “whenMatchedUpdateAll()Function can be used.

Similarly, When the “Merge Condition” is “Not Matched” between the “Source”, and, the “Target”, instead of “Providing” “All” the “Column Names” for “Both” the “Source”, and, the “Target” as the “Dictionary” to “Insert” into the “Target Delta Table”, i.e., “value”, inside the “whenNotMatchedInsertFunction, the “whenNotMatchedInsertAll()Function can be used.

“Perform” the “Merge” Operation into the “Delta Table” Using “PySpark” for the “Fourth Load”

personDeltaTable.alias("Target")\
.merge(
source = dfPerson.alias("Source"),
condition = "Target.PersonId = Source.PersonId"
)\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()

“Display” the “Content” of the “Delta Table” After the “Fourth Load”

SELECT * FROM training.person;

Output -

--

--

Oindrila Chakraborty

I have 11+ experience in IT industry. I love to learn about the data and work with data. I am happy to share my knowledge with all. Hope this will be of help.