In this blog we are going to see about Incremental load using Auto Loader and Structured Streaming in brief manner with an example, so that you will get an overview of this topic in a quick single page read.
Auto Loader incrementally processes new data files that arrived in cloud storage. It can ingest csv,parquet,text,json,binaryfile,orc and avro file formats.
The main part of Auto Loader is Structured Streaming source and that is called cloudFiles. We need to provide the path where the files will be arrived in the cloud storage then, the cloudFiles source automatically processes new files as they arrive. It supports for both SQLand python in Delta Live Tables.
Now, we are going to see a simple example with each part such that - configuring the streaming read, Defining the streaming aggregation and write to delta table.
Configure Streaming Read:
query=(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.option("cloudFiles.schemaLocation",locationpath)
.load("path where source files located")
.createorReplaceTempView("tempview"))
In the above example we can notice that format is given as cloudFiles. which indicates it is an incremental load. The location path is nothing but the checkpoint path. We are given the path where the source file located in the load parameter. Thus now we are read the input files. And from the above example we can see that we store the loaded data in a temporary view called "tempview"
Define a Streaming aggregation:
CREATE OR REPLACE TEMPORARY VIEW count_temp_view AS
SELECT state,
count(id)
FROM tempview
GROUP BY state
In this example, we have done the grouping and aggregation operation -count. In the above example we can notice that we used the tempview , which has the data nothing but the loaded data from the source file.
Write Aggregation data to Delta table:
query=(spark.table( "count_temp_view")
.writeStream
.format("delta")
.option("checkpointLocation",path)
.outputMode("complete")
.table("customer_count_table"))
In the above example, we can see how to write the aggregated data to delta table. which can be done by giving the format as "delta". We can notice that the outputMode is given as "complete." We can give two types of mode,
i) append
ii) complete
- In the append mode the data gets inserted in the target
- In complete mode the data is overwritten.
We can also add trigger as well. The default the processing time is 500ms
.trigger(processingTime="2 minutes")
by the above code added, we can overwrite the default processing time.
.trigger(once=True)
by the above code, it execute the single batch and it stops automatically.
.trigger(availableNow=True)
by the above code added, it executes multiple micro batch and stops on its own
Thus in this blog we saw about Incremental load using Auto Loader and Structured Streaming in crisp and clear manner along with easily understandable example code. Hope this blog is helpful.
Thank You!!!
0 Comments