Introduction to Incremental ETL in Databricks

Oindrila Chakraborty
7 min readOct 25, 2022

--

What is “Incremental ETL”

Ingesting Data in an “Ongoing” and “Scalable” Way is “Complicated”. When there is a “Stream” of “Data” that is “Arriving” at a “Source”, there should be a “Way” to “Ingest” the Data “Incrementally”.

The “Ability” to “Process” “Only” the “New Data”, or, the “Data” that is “Changed” since the “Last Ingestion” is Referred to as the “Incremental ETL”.

Features of “Incremental ETL”

  • Scalable - This means that the “Data” that has “Already” been “Processed” will “Not” be “Re-Processed”. This is the “Key” to “Achieving” the “Scalability”.
    Otherwise, the “Processing Effort” will become “Unbounded” as the “Time Goes On”.
  • Idempotent - “Data Processing” with “Idempotency” Property means that “Processing” the ‘Same Data” Always “Results In” the “Same Outcome”.
    “Idempotency” forms the “Basis” of “Incremental ETL” and is “Tricky” to “Achieve” since it “Requires” very “Careful Maintenance” of “State”.
  • Complicated - Traditionally, Proper “Incremental ETL” Required a lot of “Set Up”. It would “Require” an “Always On Streaming Platform”, like “Apache Kafka” or “Some Other Mechanism” to “Track” “Which File” has been “Ingested” and “Which File” has “Not” been “Ingested” carefully “Managing” the case where the “ETL Process” has “Interrupted” while “Processing a File”.
    Understandably, this is a “Lot of Work” and “Requires” a “Lot of Maintenance” and “Testing” to “Ensure” that the “Incremental ETL Framework” is “Behaving Properly”.

Features of “Incremental ETL” in Databricks

“COPY INTO” and “AutoLoader” are “Databricks Features” that “Abstract” the “Complexities” of the “Incremental ETL Framework” to “Provide” a “Robust” and “Easy to Use” “Incremental ETL Constructs”, that is “Processing” “Only New Files” or “Updated Files”.

“COPY INTO” and “AutoLoader” “Move” the Data from an “External Storage” into the “Delta Table”.

Benefits of “COPY INTO” Command in “Incremental ETL” in Databricks

  • No Custom Bookkeeping - “COPY INTO” is a “SQL Command” that “Performs” the “Incremental ETL” by “Loading” the “Data” from a “Folder”, “Generally Present” in some “Cloud Object Storage”, into a “Delta Table”.
    There is “No Need” to “Manage” the “State Information”.
  • Accessible - “COPY INTO” is the “Best Way” to “Ingest” the “Data” in “Databricks SQL”. It can also be used in a “Notebook”.
  • Scalable - The “COPY INTO” Command is “Well Suited” for “Scheduled” or “On-Demand Ingestion” Use Cases in which the “Data Source Location” has a “Small Number of Files” that is in the “Order” of “1000s of Files”.

Usage of “COPY INTO” Command in “Incremental ETL” in Databricks

“COPY INTO” Command can be very “Simple” to “Use”.

  • Destination - The “Destination” can be a “Table Name” or a “Location”.
  • Source - The “Source” Specifies a “Path” Containing the “Source Files”.
  • File Format - The “File Format” Specifies the “Format” if the “Input Files” from one of the “Supported File Types”.
tableName = 'training.students'
sourceFileLocation = 'dbfs:/FileStore/tables/training/csv-files'
sourceFileFormat = 'CSV'
spark.sql(f"""COPY INTO {tableName}
FROM '{sourceFileLocation}'
FILEFORMAT = {sourceFileFormat}
FORMAT_OPTIONS('HEADER' = 'TRUE', 'inferSchema' = 'TRUE')
""")

Benefits of “AutoLoader” in “Incremental ETL” in Databricks

  • Easy to Use - “AutoLoader” Provides “Easy to Use” “Python” and “Scala” “Methods” for “Performing” the “Incremental ETL”. It is “Implemented” in the “Apache Spark Structured Streaming”. This means that “AutoLoader” can be “Reliably Run Continuously” or “On” a “Schedule Basis”.

“New File Detection Mode” of “AutoLoader” in “Incremental ETL” in Databricks

For “Moderate Changes” in the “Data Source”, it can be “Configured” in “Two” Different Ways -

  • Directory Listing Mode - “Directory Listing Mode” is the “Default Mode” used to “Identify” the “New Files” or “Changed Files”.
    This is also the “Preferred Method” when the “Source Folder” has a “Small Number of Files”, that is in the “Order” of “1000s of Files”.
    This “Method” “Easily Streams” the “Files” from the “Object Storage” “Without” any “Configuration”.
  • File Notification Mode - The “File Notification Method” “Relies” on the “Cloud Service” to “Send” a “Notification” when “Files” are “Added” or “Changed” in the “Folder”, “Generally Present” in some “Cloud Object Storage”.
    This “Approach” uses the “Cloud Storage Queue Service” and “Event Notification” to “Track” the “Files” that are “Added” or “Changed”
    The “Configurations” are “Automatically Handled” by the “Databricks”.
    While this “Approach” does “Require” some “Additional Set Up”, it is the “Universal Approach” that can “Scale” to “Handle” the “Millions of Files” as the “Data Grows”.

Data Ingestion Challenges Solved by “AutoLoader” in “Incremental ETL” in Databricks

“Setting Up” the “Data Ingestion” Usually “Requires” the “Detail Knowledge” of the “Incoming Schema”, which is “Not Always Available”, or, “May” even “Change Over Time”, particularly in “Semi-Structured Formats”. When this happens, “Processing Errors” may “Occur”, which “Leads” to “Invalid States”, or, “Possible Data Loss”.

“AutoLoader” “Helps” with the following “More Advanced” “Data Ingestion Challenges” -

  • “Schema” is “Unknown” when “Initially Configuring” a “Pipeline”.
  • “New Fields” “Appear” in the “Source Data”.
  • “Data Types” are “Updated” or “Incorrectly Enforced” by the “Source Systems”.
  • Some “Changes” “Lead” to “Silently Dropping Data”.
  • Other “Changes” “Completely Break” the “Ingestion Scripts”.

How “AutoLoader” Helps in “Incremental ETL” in Databricks

  • Identify Schema on Stream Initialization - “AutoLoader” “Automatically Detects” and “Stores” the “Schema” on “Initialization”.
  • Auto-Detect Changes and Evolve Schema to Capture New Fields - “AutoLoader” will “Automatically Update” the “Schema” when the “New Fields” are “Encountered” in the “Future”.
  • Add Type Hints for Enforcement When Schema is Known - To “Mitigate” the “Data Loss” due to the “Mismatched Schemas”, “All Fields” will be “Inferred” as “Strings” by “Default”.
    Users have the “Option” to “Specify” the “Schema Hints” to “Add” the “Data Type Enforcement” on the “Known Fields”.
  • Rescue Data That Does Not Meet Expectation - When the “Schema Hints” are “Specified” to “Add” the “Data Type Enforcement” on the “Known Fields”, the “Data Type Mismatches” will “Not Lead” to the “Failures”, instead, the “Records” with the “Incorrect Data Types” will be “Captured Automatically” in the Special “_rescued_data” Column.

Usage of “AutoLoader” in “Incremental ETL” in Databricks

The “cloudFiles” as the “Format” actually “Invokes” the “AutoLoader”.

The “cloudFiles.format” Option actually “Specifies” the “Underlying Format” of the “Data Files” being “Ingested” by the “AutoLoader”.

  • Streaming Loads with AutoLoader - “Read” the Data from the “Continuous Arriving CSV Files” from a “Directory” in “ADLS Gen2” -
from pyspark.sql.types import *sourceFileLocation = '/mnt/with-aad-app/databricks-training-folder/day-3/autoloader-csv-files'
sourceFileFormat = 'CSV'
# Create a "Schema" of the Data to be "Loaded" from the "Continuous Arriving CSV Files".
studentsDataSchema = StructType([
StructField("FirstName", StringType(), False),
StructField("MiddleName", StringType(), True),
StructField("LastName", StringType(), False),
StructField("Subject", StringType(), False),
StructField("Marks", IntegerType(), False)
])
# Read the "Continuous Arriving CSV Files" from the Mounted Path "/mnt/with-aad-app/databricks-training-folder/day-3/autoloader-csv-files" Using "Auto Loader".
df_ReadStudentCsvFiles = spark.readStream.format("cloudFiles")\
.option("cloudFiles.format", sourceFileFormat)\
.option("header", "true")\
.schema(studentsDataSchema)\
.load(sourceFileLocation)

“Store” the Data from the “Streaming DataFrame” to the “Delta Table” -

tableName = 'training.autoloader_students'
targetTablePath = '/mnt/with-aad-app/databricks-training-folder/day-3/target-delta-table/autoloader_students'
checkpointDirectoryPath = '/mnt/with-aad-app/databricks-training-folder/day-3/autoloader-checkpoint-directory'
# "Start" the "Stream".
# "Use" the "Checkpoint Directory Location" to "Keep" a "Record" of All the "Files" that have "Already" been "Uploaded" to the "sourceFileLocation" Path.
# For those "Files" that have been "Uploaded" since the "Last Check", "Write" the Data of the "Newly-Uploaded Files" to the "targetTablePath" Path.
df_ReadStudentCsvFiles.writeStream.format("delta")\
.outputMode("append")\
.option("checkpointLocation", checkpointDirectoryPath)\
.queryName(f"Running Auto Loader for Table '{tableName}'")\
.start(targetTablePath)
  • Batch Loads with AutoLoader - The above code can “Easily” be “Changed” to “Run” the “AutoLoader” in the “Batch” Mode on a “Schedule”. This is “Achieved” with the “trigger(once=True)” Option.
tableName = 'training.autoloader_students'
targetTablePath = '/mnt/with-aad-app/databricks-training-folder/day-3/target-delta-table/autoloader_students'
batchModeCheckpointDirectoryPath = '/mnt/with-aad-app/databricks-training-folder/day-3/autoloader-checkpoint-directory'
# "Start" the "Stream".
# "Use" the "Checkpoint Directory Location" to "Keep" a "Record" of All the "Files" that have "Already" been "Uploaded" to the "sourceFileLocation" Path.
# For those "Files" that have been "Uploaded" since the "Last Check", "Write" the Data of the "Newly-Uploaded Files" to the "targetTablePath" Path.
df_ReadStudentCsvFiles.writeStream.format("delta")\
.trigger(once = True)\
.outputMode("append")\
.option("checkpointLocation", batchModeCheckpointDirectoryPath)\
.queryName(f"Running Auto Loader for Table '{tableName}' In Batch Mode")\
.start(targetTablePath)

Check Point Location of “AutoLoader” in “Incremental ETL” in Databricks

By “Omitting” the “Schema Specification” while “Reading” the “Data” from the “Source Folder”, the “AutoLoader” is “Allowed” to “Infer” the “Schema” using the “cloudFiles.schemaLocation” Option.

from pyspark.sql.types import *sourceFileLocation = '/mnt/with-aad-app/databricks-training-folder/day-4/autoloader-schema-inference-csv-files'
sourceFileFormat = 'CSV'
schemaLocationPath = '/mnt/with-aad-app/databricks-training-folder/day-4/target-delta-table/autoloader_customers/checkpoint-and-schema-location'
# Read the "Continuous Arriving CSV Files" from the Mounted Path "/mnt/with-aad-app/databricks-training-folder/day-4/autoloader-schema-inference-csv-files" Using "Auto Loader".
df_ReadCustomersCsvFiles = spark.readStream.format("cloudFiles")\
.option("cloudFiles.format", sourceFileFormat)\
.option("header", "true")\
.option("cloudFiles.schemaLocation", schemaLocationPath)\
.load(sourceFileLocation)

--

--

Oindrila Chakraborty
Oindrila Chakraborty

Written by Oindrila Chakraborty

I have 12+ experience in IT industry. I love to learn about the data and work with data. I am happy to share my knowledge with all. Hope this will be of help.

Responses (1)