Ticker

6/recent/ticker-posts

Adaptive Query Execution (AQE)

 In this Blog you can read a Crisp and Clear concept of Adaptive Query Execution (AQE). This is an very important topic for Databricks Certified Associate Developer for Apache Spark 3 Exam Certification . And it is an important topics in Data Engineering job role interviews as well. So kindly give more importance to this topic. Lets get started,

  • Adaptive Query Execution Comes into picture in version 3.0 of spark. It is not a default option
  • If we disable Adaptive Query Execution then Shuffle.partitions comes into picture.

Enable

Spark.conf.set(‘spark.sql.adaptive.enabled’,True)

Disable

Spark.conf.set(‘spark.sql.adaptive.enabled’,False)

  • Let us assume we disabled the feature, and after performing groupBy or any aggregation process, when we check the SparkUI we can see that more number of cores used and that will be max value set in the spark.sql.shuffle.partitions.
  • The default value is 200 , that is spark.conf.set('spark.sql.shuffle.partitions',200)
  • Due to this even if we have only 10 partition this will create 200 partition in default so there  will be 200 task created unnecessarily (although remaining 190 is empty task it will take some time).And this is rectified using Adaptive Query Execution.
  • Now, After enabling the  Adaptive Query Execution and spark.sql.adaptive.coalease.partition.enabled as True we will able to see, it uses very less task to complete the shuffle.
  • That means the number of shuffle partition has been chosen dynamically by spark itself.
Now let us see some of the properties of Adaptive Query Execution,

Few Properties of Adaptive Query Execution:

  1. Dynamically Coalescing Shuffle Partition
  2. Dynamically switching the join strategies
  3. Dynamically Optimizing skew joins

  • Coalescing  Shuffle Partition describes that spark will choose best coalesce value dynamically.
  • Converting the join type describes that spark will dynamically choose the best join type

    For Example ,
          Two Tables are large during join condition spark will choose  Sort Merge Join

          One table  is small during join condition then spark will choose Broadcast join
  • In the above point you may wonder why it is choosing broadcast join when one table is small in joins, what is broadcast join. Broadcast join is nothing but the copy of the small table is given to every nodes, therefore spark can perform the join without Shuffling. Thus it is one of the optimization technique.

  • Then what is Sort Merge Join, It involves shuffling of data to get same join key with the same worker and performing the join . It is a default join technique. In the scenario of two large table join, we can only go for Sort merge join, because we cannot take a copy of a large table and the give to all the nodes, that does not make any sense and as well as the memory to store that data will be high for the node.
  • When skew problem involved, that particular task is broken and given to another core to perform the task. Now the Number of task in increased but skew is resolved. And this will be done dynamically by spark on enabling spark.sql.adaptive.skewjoin.enabled as True
In above points we saw the concept in crisp and clear manner. To understand better lets see this topic in detail. To those person who dont wish to go deeper in the topic can skip the below explanation. Its absolutely fine to skip the below explanation on Certification point of view.

I) Dynamically Coalescing Shuffle Partitions:
    In case of shuffle exchange, example on group by let us assume it is 5 partition. But there will be 10 task will run, as the spark.shuffle.partitions=10 . At this situation Adaptive Query Execution will take care of it.
    After enabling Adaptive Query Execution, now it may have 4 partition , that is it will combine multiple small partition to single partition.

    Configuration :

        1) spark.sql.adaptive.enabled 
                If this is True , below 4 points will come into picture.

        2) spark.sql.adaptive.coaleasePartitions.initialPartitionNum
                It is something like maximum number of shuffle partition. By default it is set to      spark.shuffle.partition

        3) spark.sql.adaptive.coalesePartitions.minPartitionNum
                It is like minimum shuffle partition value. by default it is set to the value as spark.default.parallelism

        4) spark.sql.adaptive.advisoryPartitionSizeInBytes
                It is a advisery to the Adaptive Query Executionfor the partition size. This partition comes in place when split or combine of the partition . By default it is 64 MB

        5) spark.sql.adaptive.coalesePartitions.enabled
                By default it is True. If it is false the spark Adaptive Query Execution will not combine small partition.

II) Dynamically switching join strategies.

        Consider the scenario that , join 2 big tables then perform "where" to the 2nd table. Assume both are 100GB but post filter it is 7 MB.
        Here spark will execute the shuffle sort only, as the spark execution plan is created before the spark spark starts the job execution.
        Therefore as Adaptive Query Execution checks on the shuffle exchange, that it needs only 7MB then it applies Broadcast Join.
        Note : here exchange is done, that is shuffle is done and only sorting is avoided. As Adaptive Query Execution checks on the shuffle part.
        Config: spark.sql.adaptive.localShuffleReader.enabled=True. By default it is true. "CustomShuffleReader" or "LocalShuffleReader" comes in picture here. It reduces the network traffic and optimize Adaptive Query Execution broadcast join.

III) Dynamically optimize the skew joins:

         Consider joining 2 large tables. We know that each table will have different partitions. And there was a skew happened on one of the partition of one table. Skew is nothing but the size of one of the partition is high compared to others. Now Adaptive Query Execution will split the skewed partition into two partition and the respective second table partition will get duplicated to do a join.

        Config: 
            1) spark.adaptive.enabled =True
            2) spark.sql.adaptive.skewJoin.enabled =true
            3) spark.sql.adaptive.skewJoin.skewedPartitionFactor =5
            4) spark.sql.adaptive.skewJoinPartitionThresholdInBytes=256MB

The top four config are self explanatory , but the point 3) there is a 5 that implies , 5 times the median partition size. And the skew split will happen only when the 3rd and 4th point configuration are broken.


From the above topic , definitely you will be getting question in Databricks Certified Associate Developer for Apache Spark 3 Exam. So now you are ready to Answer that Question. If you like this blog you can follow this blog by pressing the follow button on the right side of this window. You can also follow me in linked in 


All the Best !!!

Post a Comment

0 Comments

Ad Code