Introduction to Incremental ETL in Databricks

What is “Incremental ETL”

Ingesting Data in an “Ongoing” and “Scalable” Way is “Complicated”. When there is a “Stream” of “Data” that is “Arriving” at a “Source”, there should be a “Way” to “Ingest” the Data “Incrementally”.

Features of “Incremental ETL”

  • Scalable - This means that the “Data” that has “Already” been “Processed” will “Not” be “Re-Processed”. This is the “Key” to “Achieving” the “Scalability”.
    Otherwise, the “Processing Effort” will become “Unbounded” as the “Time Goes On”.
  • Idempotent - “Data Processing” with “Idempotency” Property means that “Processing” the ‘Same Data” Always “Results In” the “Same Outcome”.
    “Idempotency” forms the “Basis” of “Incremental ETL” and is “Tricky” to “Achieve” since it “Requires” very “Careful Maintenance” of “State”.
  • Complicated - Traditionally, Proper “Incremental ETL” Required a lot of “Set Up”. It would “Require” an “Always On Streaming Platform”, like “Apache Kafka” or “Some Other Mechanism” to “Track” “Which File” has been “Ingested” and “Which File” has “Not” been “Ingested” carefully “Managing” the case where the “ETL Process” has “Interrupted” while “Processing a File”.
    Understandably, this is a “Lot of Work” and “Requires” a “Lot of Maintenance” and “Testing” to “Ensure” that the “Incremental ETL Framework” is “Behaving Properly”.

Features of “Incremental ETL” in Databricks

“COPY INTO” and “AutoLoader” are “Databricks Features” that “Abstract” the “Complexities” of the “Incremental ETL Framework” to “Provide” a “Robust” and “Easy to Use” “Incremental ETL Constructs”, that is “Processing” “Only New Files” or “Updated Files”.

Benefits of “COPY INTO” Command in “Incremental ETL” in Databricks

  • No Custom Bookkeeping - “COPY INTO” is a “SQL Command” that “Performs” the “Incremental ETL” by “Loading” the “Data” from a “Folder”, “Generally Present” in some “Cloud Object Storage”, into a “Delta Table”.
    There is “No Need” to “Manage” the “State Information”.
  • Accessible - “COPY INTO” is the “Best Way” to “Ingest” the “Data” in “Databricks SQL”. It can also be used in a “Notebook”.
  • Scalable - The “COPY INTO” Command is “Well Suited” for “Scheduled” or “On-Demand Ingestion” Use Cases in which the “Data Source Location” has a “Small Number of Files” that is in the “Order” of “1000s of Files”.

Usage of “COPY INTO” Command in “Incremental ETL” in Databricks

“COPY INTO” Command can be very “Simple” to “Use”.

  • Source - The “Source” Specifies a “Path” Containing the “Source Files”.
  • File Format - The “File Format” Specifies the “Format” if the “Input Files” from one of the “Supported File Types”.
tableName = 'training.students'
sourceFileLocation = 'dbfs:/FileStore/tables/training/csv-files'
sourceFileFormat = 'CSV'
spark.sql(f"""COPY INTO {tableName}
FROM '{sourceFileLocation}'
FILEFORMAT = {sourceFileFormat}
FORMAT_OPTIONS('HEADER' = 'TRUE', 'inferSchema' = 'TRUE')
""")

Benefits of “AutoLoader” in “Incremental ETL” in Databricks

  • Easy to Use - “AutoLoader” Provides “Easy to Use” “Python” and “Scala” “Methods” for “Performing” the “Incremental ETL”. It is “Implemented” in the “Apache Spark Structured Streaming”. This means that “AutoLoader” can be “Reliably Run Continuously” or “On” a “Schedule Basis”.

“New File Detection Mode” of “AutoLoader” in “Incremental ETL” in Databricks

For “Moderate Changes” in the “Data Source”, it can be “Configured” in “Two” Different Ways -

  • File Notification Mode - The “File Notification Method” “Relies” on the “Cloud Service” to “Send” a “Notification” when “Files” are “Added” or “Changed” in the “Folder”, “Generally Present” in some “Cloud Object Storage”.
    This “Approach” uses the “Cloud Storage Queue Service” and “Event Notification” to “Track” the “Files” that are “Added” or “Changed”
    The “Configurations” are “Automatically Handled” by the “Databricks”.
    While this “Approach” does “Require” some “Additional Set Up”, it is the “Universal Approach” that can “Scale” to “Handle” the “Millions of Files” as the “Data Grows”.

Data Ingestion Challenges Solved by “AutoLoader” in “Incremental ETL” in Databricks

“Setting Up” the “Data Ingestion” Usually “Requires” the “Detail Knowledge” of the “Incoming Schema”, which is “Not Always Available”, or, “May” even “Change Over Time”, particularly in “Semi-Structured Formats”. When this happens, “Processing Errors” may “Occur”, which “Leads” to “Invalid States”, or, “Possible Data Loss”.

  • “New Fields” “Appear” in the “Source Data”.
  • “Data Types” are “Updated” or “Incorrectly Enforced” by the “Source Systems”.
  • Some “Changes” “Lead” to “Silently Dropping Data”.
  • Other “Changes” “Completely Break” the “Ingestion Scripts”.

How “AutoLoader” Helps in “Incremental ETL” in Databricks

  • Identify Schema on Stream Initialization - “AutoLoader” “Automatically Detects” and “Stores” the “Schema” on “Initialization”.
  • Auto-Detect Changes and Evolve Schema to Capture New Fields - “AutoLoader” will “Automatically Update” the “Schema” when the “New Fields” are “Encountered” in the “Future”.
  • Add Type Hints for Enforcement When Schema is Known - To “Mitigate” the “Data Loss” due to the “Mismatched Schemas”, “All Fields” will be “Inferred” as “Strings” by “Default”.
    Users have the “Option” to “Specify” the “Schema Hints” to “Add” the “Data Type Enforcement” on the “Known Fields”.
  • Rescue Data That Does Not Meet Expectation - When the “Schema Hints” are “Specified” to “Add” the “Data Type Enforcement” on the “Known Fields”, the “Data Type Mismatches” will “Not Lead” to the “Failures”, instead, the “Records” with the “Incorrect Data Types” will be “Captured Automatically” in the Special “_rescued_data” Column.

Usage of “AutoLoader” in “Incremental ETL” in Databricks

The “cloudFiles” as the “Format” actually “Invokes” the “AutoLoader”.

from pyspark.sql.types import *sourceFileLocation = '/mnt/with-aad-app/databricks-training-folder/day-3/autoloader-csv-files'
sourceFileFormat = 'CSV'
# Create a "Schema" of the Data to be "Loaded" from the "Continuous Arriving CSV Files".
studentsDataSchema = StructType([
StructField("FirstName", StringType(), False),
StructField("MiddleName", StringType(), True),
StructField("LastName", StringType(), False),
StructField("Subject", StringType(), False),
StructField("Marks", IntegerType(), False)
])
# Read the "Continuous Arriving CSV Files" from the Mounted Path "/mnt/with-aad-app/databricks-training-folder/day-3/autoloader-csv-files" Using "Auto Loader".
df_ReadStudentCsvFiles = spark.readStream.format("cloudFiles")\
.option("cloudFiles.format", sourceFileFormat)\
.option("header", "true")\
.schema(studentsDataSchema)\
.load(sourceFileLocation)
tableName = 'training.autoloader_students'
targetTablePath = '/mnt/with-aad-app/databricks-training-folder/day-3/target-delta-table/autoloader_students'
checkpointDirectoryPath = '/mnt/with-aad-app/databricks-training-folder/day-3/autoloader-checkpoint-directory'
# "Start" the "Stream".
# "Use" the "Checkpoint Directory Location" to "Keep" a "Record" of All the "Files" that have "Already" been "Uploaded" to the "sourceFileLocation" Path.
# For those "Files" that have been "Uploaded" since the "Last Check", "Write" the Data of the "Newly-Uploaded Files" to the "targetTablePath" Path.
df_ReadStudentCsvFiles.writeStream.format("delta")\
.outputMode("append")\
.option("checkpointLocation", checkpointDirectoryPath)\
.queryName(f"Running Auto Loader for Table '{tableName}'")\
.start(targetTablePath)
tableName = 'training.autoloader_students'
targetTablePath = '/mnt/with-aad-app/databricks-training-folder/day-3/target-delta-table/autoloader_students'
batchModeCheckpointDirectoryPath = '/mnt/with-aad-app/databricks-training-folder/day-3/autoloader-checkpoint-directory'
# "Start" the "Stream".
# "Use" the "Checkpoint Directory Location" to "Keep" a "Record" of All the "Files" that have "Already" been "Uploaded" to the "sourceFileLocation" Path.
# For those "Files" that have been "Uploaded" since the "Last Check", "Write" the Data of the "Newly-Uploaded Files" to the "targetTablePath" Path.
df_ReadStudentCsvFiles.writeStream.format("delta")\
.trigger(once = True)\
.outputMode("append")\
.option("checkpointLocation", batchModeCheckpointDirectoryPath)\
.queryName(f"Running Auto Loader for Table '{tableName}' In Batch Mode")\
.start(targetTablePath)

Check Point Location of “AutoLoader” in “Incremental ETL” in Databricks

By “Omitting” the “Schema Specification” while “Reading” the “Data” from the “Source Folder”, the “AutoLoader” is “Allowed” to “Infer” the “Schema” using the “cloudFiles.schemaLocation” Option.

from pyspark.sql.types import *sourceFileLocation = '/mnt/with-aad-app/databricks-training-folder/day-4/autoloader-schema-inference-csv-files'
sourceFileFormat = 'CSV'
schemaLocationPath = '/mnt/with-aad-app/databricks-training-folder/day-4/target-delta-table/autoloader_customers/checkpoint-and-schema-location'
# Read the "Continuous Arriving CSV Files" from the Mounted Path "/mnt/with-aad-app/databricks-training-folder/day-4/autoloader-schema-inference-csv-files" Using "Auto Loader".
df_ReadCustomersCsvFiles = spark.readStream.format("cloudFiles")\
.option("cloudFiles.format", sourceFileFormat)\
.option("header", "true")\
.option("cloudFiles.schemaLocation", schemaLocationPath)\
.load(sourceFileLocation)

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Oindrila Chakraborty

I have 10+ experience in IT industry. I love to learn about the data and work with data. I am happy to share my knowledge with all. Hope this will be of help.