Developing a Simple “Streaming Application” Using “Spark” in Databricks

Oindrila Chakraborty
11 min readNov 1, 2023

--

Why “Streaming Application” is Required to be “Developed”?

  • Sensors, IoT Devices, Social Networks, and Online Transactions — all Generate the Data that needs to be Monitored Constantly and Acted Upon Quickly.
  • As a result, the Need for Large-Scale, Real-Time Stream Processing is More Evident than ever before.
  • In today’s Big Data World, to Deal with the Real-Time Data, it is Required to Develop the Streaming Applications.
  • The Main Model for Handling the Streaming Datasets in Apache Spark is the Structured Streaming Module.

Important Streaming Terminologies

What is “ReadStream”?

  • “ReadStream” is the Entry Point to a Streaming Application.
  • “ReadStream” can be a “Function”, or, an “API”, using which the Data can be Consumed from the External Source System.
  • The External Source Systems can be any Database, or, File System, or, Popular Streaming Service, like — Apache Kafka, or, Azure Event Hub.

What is “WriteStream”?

  • “WriteStream” is the Output of a Streaming Application.
  • “WriteStream” can be a “Function”, or, an “API”.
    Once the Data to Process is Consumed from the Source System, and, the Data is Processed, i.e., Transformed, and, the Required Business Aggregation is Applied on the Data, then the Processed Data can be Written to the Target using the “WriteStreamAPI.

What is “Checkpoint”?

“Checkpoint” is used for Incremental Data Loading.
To Handle the Incremental Data Loading, the Checkpoint Keeps the Track of the Offset Value of the Data that gets Processed in Each of the Batches. So that, when a Streaming Application is Executed the Next Time, only the Unprocessed Data gets Processed.

  • When a Spark Streaming Application is Executed the Next Time, it will First Refer to the Checkpoint Location to Retrieve the Offset Value.
  • Based on the Offset Value, Spark understands Which is the Already Processed Data and, Which is the Yet to Process Data. Spark will Only Pick the Unprocessed Data, and, will Process it Incrementally.

“Checkpoint” is also used for Fault Tolerance in Spark Streaming Applications.

  • In case, a Spark Streaming Application gets Failed in the Middle of the process, the Spark Streaming Application can be Re-Run from the Failure Point by Referring to the Checkpoint Location.
  • The “Checkpoint” will give a List of the Already Processed Data, and, the Unprocessed Data. Spark will Only Pick the Unprocessed Data, and, will Process it Incrementally again.

What is a “Trigger”?

“Trigger” is a Special Event that Initiates the Spark Streaming Application.

There are Three Types of “Trigger” -

  • 1. Default: In the “Default Trigger”, the Incoming Streaming Data would be Segregated as Different Micro Batches. The Spark Streaming Application will start Processing One Micro Batch At a Time.
    Once, the Processing of All the Data of One Micro Batch is Finished, only then Spark will start Processing the Data of Next Micro Batch.
  • 2. Fixed Interval: To Process All the Streaming Data, which is Accumulated during a Fixed Time Interval, like — in Every 1 Hour, or, in Every 5 Minutes etc., the Fixed Interval Trigger is used.
    In the Fixed Interval Trigger, the Spark Streaming Application will Process All the Streaming Data that is Accumulated during a particular Fixed Time Interval at Once.
    After that, once the Fixed Time Interval is Passed again, the Spark Streaming Application will Process All the Streaming Data that is Accumulated during the Last Cycle of the Fixed Time Interval at Once, and, so on.
  • 3. One Time: To Process All the Accumulated Streaming Data at One Shot, the One Time Trigger is used.
    In the One Time Trigger, the Spark Streaming Application will Process All the Accumulated Streaming Data at One Shot, and, then will Stop the Spark Streaming Application.
    The One Time Trigger is Equivalent to the Batch Mode.

What are the Different “Output Modes” of “Streaming Application”?

The “Output Modes” of a “Spark Streaming Application” can be “Configured” in the following ways -

  • 1. Append : To Add, or, Accumulate only the Incremental Data, of Each Trigger Run, to the Target of a Spark Streaming Application, the Append Output Mode can be used.
    The Append Output Mode is Only Applicable where the Existing Rows in the Target are Not Expected to Change.
  • 2. Complete: In certain cases, some Aggregation Operations need to be Performed on the Data of Each Trigger Run.
    Then, in the Target, to Make Sense of the Output, the Complete Data of Each Trigger Run needs to be Included, because, if Only the Incremental Data of Each Trigger Run, is Considered, then the Correct Aggregated Value might Not be Achieved.
    In such Use Cases, the Complete Output Mode can be used.
  • 3. Update : To Update the Existing Rows in the Target with the New Data that is Coming from Each Trigger Run, the Update Output Mode can be used.

Standard Architecture of Streaming Application

In the Standard Architecture of the Streaming Application, there are the following “Four Processes” -

  • 1. Ingestion: The Source Data can Reside in the External Data Sources, like — Relational Databases, Non-Relational Databases, File Systems.
    Databricks can Directly Consume the Source Data from those External Data Sources.
    Databricks can also Consume the Source Data, in an Incremental Way, from the Mediator, i.e., Streaming Service, like — Azure Event Hub, or, Apache Kafka etc.
    In this case, the External Data Sources Push the Source Data, in an Incremental Way, to the Topics of the Streaming Services, and, the Databricks Engine can Consume the Source Data from those Topics of the Streaming Service.
    If the Raw Source Data is needed for some Tracking, or, Debugging Purpose, then the Raw Source Data can be Persisted into the Staging Storage Layer.
    The Staging Storage Layer can be used for Checkpointing as well. The Checkpoint Location can Keep Track of the Processed Data in the Form of Offset Value.
  • 2. Processing: Once, the Source Data is Consumed from the Topics of the Streaming Services, the Databricks Engine can Apply the Business Logics on the Source Data to Cleanse, Transform, or, Aggregate.
  • 3. Storing: Once, the Transformation on the Source Data is Done, the Transformed Data, or, Aggregated Data can be Moved to the Data Warehouse, which can be Azure SQL Server, Azure Synapse Analytics, or, Another Storage Layer.
  • 4. Reporting: From the Data Warehouse, the Data can Flow to Reporting.

Hands-On Example of Spark Streaming Application

from pyspark.sql.functions import *
from pyspark.sql.types import *

Define the “Schema” for the “Incoming Streaming Data”

person_schema = StructType([
StructField("first_name", StringType(), False),
StructField("last_name", StringType(), False),
StructField("current_company", StringType(), False),
StructField("exp_in_current_company", DoubleType(), False)
])

Create the Required Folders in DBFS

  1. Create the Input Folder in DBFS
dbutils.fs.mkdirs("/FileStore/tables/streaming-data/incoming-streaming-data/json-files")

2. Create the Output Folder in DBFS

dbutils.fs.mkdirs("/FileStore/tables/streaming-data/output-of-streaming-data/")

3. Create the Folder for the Checkpoint Location in DBFS

dbutils.fs.mkdirs("/FileStore/tables/streaming-data/checkpoint-location")

Create a “Streaming DataFrame” by “Reading” the “Streaming Data” from the “JSON Files”

df_stream = spark.readStream.format("json")\
.schema(person_schema)\
.option("multiLine", "true")\
.load("/FileStore/tables/streaming-data/incoming-streaming-data/json-files")
display(df_stream)
  • When the First JSON File is Uploaded into the Input Folder in DBFS, the Content of the Input DataFrame is -
  • When the Second JSON File is Uploaded into the Input Folder in DBFS, the Content of the Input DataFrame is -
  • When the Third JSON File is Uploaded into the Input Folder in DBFS, the Content of the Input DataFrame is -
  • When the Fourth JSON File is Uploaded into the Input Folder in DBFS, the Content of the Input DataFrame is -

“Write” the Content of the “Streaming DataFrame”

  • In this example, the Spark Streaming Application, will Start Reading the JSON Files One by One, and, Process the Incoming Streaming Data from those JSON Files.
  • Finally, the Spark Streaming Application will Write the Output in the Form of Parquet File to the Target Location.

When “Checkpoint Location” is “Used” in “Spark Streaming Application”?

  • The Checkpoint Location is Only Provided at the Time of Writing the Content of the Streaming DataFrame.
  • When the Content of the Streaming DataFrame is Written to the Target Location, the Metadata is also Written to the Checkpoint Location, in order to Keep Track of the Processed File.

“Metadata” of “Spark Streaming Application”

  • In the Target Folder of the Spark Streaming Application, a Folder with the Name as _spark_metadata is Created as the Metadata of the Spark Streaming Application.
  • In the Checkpoint Location also, the Metadata of the Spark Streaming Application is Created.
  • So, in order to Make any Changes to the Spark Streaming Application, and, Restart the Spark Streaming Application from the Beginning, All the Data, that are Present inside Both of the Checkpoint Location, as well as, inside the _spark_metadata Folder of the Target Folder, need to be Deleted.
final_df = df_stream.writeStream.format("parquet")\
.outputMode("append")\
.option("path", "/FileStore/tables/streaming-data/output-of-streaming-data/")\
.option("checkpointLocation", "/FileStore/tables/streaming-data/checkpoint-location")\
.start()\
.awaitTermination()
  • Verify if the Parquet File is created, when the First JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -
dbutils.fs.ls("/FileStore/tables/streaming-data/output-of-streaming-data/")

Verify the Data Written as the Output of the Streaming Application, when the First JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -

display(spark.read.format("parquet").load("/FileStore/tables/streaming-data/output-of-streaming-data/*.parquet"))
  1. Verify the Metadata of the Target Folder of the Streaming Application, when the First JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -
dbutils.fs.ls("/FileStore/tables/streaming-data/output-of-streaming-data/_spark_metadata/")

The Content of the Metadata File, i.e., 0 is as following -

2. Verify the Metadata of the Checkpoint Location of the Streaming Application, when the First JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -

The Content of the Metadata File, i.e., “metadata” is as following -

  • Verify if the Parquet File is created, when the Second JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -
dbutils.fs.ls("/FileStore/tables/streaming-data/output-of-streaming-data/")

Verify the Data Written as the Output of the Streaming Application, when the Second JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -

display(spark.read.format("parquet").load("/FileStore/tables/streaming-data/output-of-streaming-data/*.parquet"))
  1. Verify the Metadata of the Target Folder of the Streaming Application, when the Second JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -
dbutils.fs.ls("/FileStore/tables/streaming-data/output-of-streaming-data/_spark_metadata/")

The Content of the Metadata File, i.e., 1 is as following -

2. Verify the Metadata of the Checkpoint Location of the Streaming Application, when the Second JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -

The Content of the Metadata File, i.e., “metadata” is as following -

  • Verify if the Parquet File is created, when the Third JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -
dbutils.fs.ls("/FileStore/tables/streaming-data/output-of-streaming-data/")

Verify the Data Written as the Output of the Streaming Application, when the Third JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -

display(spark.read.format("parquet").load("/FileStore/tables/streaming-data/output-of-streaming-data/*.parquet"))
  1. Verify the Metadata of the Target Folder of the Streaming Application, when the Third JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -
dbutils.fs.ls("/FileStore/tables/streaming-data/output-of-streaming-data/_spark_metadata/")

The Content of the Metadata File, i.e., 2 is as following -

2. Verify the Metadata of the Checkpoint Location of the Streaming Application, when the Third JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -

The Content of the Metadata File, i.e., “metadata” is as following -

  • Verify if the Parquet File is created, when the Fourth JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -
dbutils.fs.ls("/FileStore/tables/streaming-data/output-of-streaming-data/")

Verify the Data Written as the Output of the Streaming Application, when the Fourth JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -

display(spark.read.format("parquet").load("/FileStore/tables/streaming-data/output-of-streaming-data/*.parquet"))
  1. Verify the Metadata of the Target Folder of the Streaming Application, when the Fourth JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -
dbutils.fs.ls("/FileStore/tables/streaming-data/output-of-streaming-data/_spark_metadata/")

The Content of the Metadata File, i.e., 3 is as following -

2. Verify the Metadata of the Checkpoint Location of the Streaming Application, when the Fourth JSON File is Read into the Input DataFrame, and, the Content of the Input DataFrame is Written to the Target Folder -

The Content of the Metadata File, i.e., “metadata” is as following -

--

--

Oindrila Chakraborty

I have 11+ experience in IT industry. I love to learn about the data and work with data. I am happy to share my knowledge with all. Hope this will be of help.