Spark 3.0: First hands-on approach with Adaptive Query Execution (Part 1)

Apache Spark is a distributed data processing framework that is suitable for any Big Data context thanks to its features. Despite being a relatively recent product (the first open-source BSD license was released in 2010, it was donated to the Apache Foundation) on June 18th the third major revision was released that introduces several new features including adaptive Query Execution (AQE) that we are about to talk about in this article.

A bit of history

Spark was born, before being donated to the community, in 2009 within the academic context of ampLab (curiosity: AMP is the acronym for Algorithms Machine People) of the University of California, Berkeley. The winning idea behind the product is the concept of RDD, described in the paper “Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing” whose lead author is Spark Matei Zaharia’s “father”.

The idea is for a solution that solves the main problem of the distributed processing models available at the time (MapReduce in the first place): the lack of an abstraction layer for the memory usage of the distributed system. Some complex algorithms that are widely used in big data, such as many for training machine learning models, or manipulating graph data structures, reuse intermediate processing results multiple times during computation. The “single-stage” architecture of algorithms such as MapReduce is greatly penalized in such circumstances since it is necessary to write (and then re-read) the intermediate results of computation on persistent storage. I/O operations on persistent storage are notoriously onerous on any type of system, even more so on one deployed due to the additional overhead introduced by network communications. The concept of RDD implemented on Spark brilliantly solves this problem by using memory during intermediate computation steps on a “multi-stage” DAG engine.

The other milestone (I leap because I enter into the merits of RDD programming and Spark’s detailed history, although very interesting, outside the objectives of the article) is the introduction on the first stable version of Spark (which had been donated to the Apache community) of the Spark SQL module.

One of the reasons for the success of the Hadoop framework before Spark’s birth was the proliferation of products that added functionality to the core modules. Among the most used surely we have to mention Hive, SQL abstraction layer over Hadoop. Despite MapReduce’s limitations that make it underperforming to run more complex SQL queries on this engine after “translation” by Hive, the same is still widespread today mainly because of its ease of use.

The best way to retrace the history of the SQL layer on Spark is again to start with the reference papers. Shark (spark SQL’s ancestor) dating back to 2013 and the one titled “Spark SQL: Relational Data Processing in Spark” where Catalyst, the optimizer that represents the heart of today’s architecture, is introduced.

Spark SQL features are made available to developers through objects called DataFrame (or Java/Scale Datasets in type-safe) that represent RDDs at a higher level of abstraction. You can use the DataFrame API through a specific DSL or through SQL.

Regardless of which method you choose to use, DataFrame operations will be processed, translated, and optimized by Catalyst (Spark from v2.0 onwards) according to the following workflow:

What’s new

We finally get to get into the merits of Adaptive Query Execution, a feature that at the architectural level is implemented at this level. More precisely, this is an optimization that dynamically intervenes between the logical plan and the physical plan by taking advantage of the runtime statistics captured during the execution of the various stages according to the stream shown in the following image:

The Spark SQL execution stream in version 3.0 then becomes:

Optimizations in detail

Because the AQE framework is based on an extensible architecture based on a set of logical and physical plan optimization rules, it can easily be assumed that developers plan to implement additional functionality over time. At present, the following optimizations have been implemented in version 3.0:

  • Dynamically coalescing shuffle partitions
  • Dynamically switching join strategies
  • Dynamically optimizing skew joins

let’s go and see them one by one by touching them with our hands through code examples.

Regarding the creation of the test cluster, we recommend that you refer to the previously published article: “How to create an Apache Spark 3.0 development cluster on a single machine using Docker”.

Dynamically coalescing shuffle partitions

Shuffle operations are notoriously the most expensive on Spark (as well as any other distributed processing framework) due to the transfer time required to move data between cluster nodes across the network. Unfortunately, however, in most cases they are unavoidable.

Transformations on a dataset deployed on Spark, regardless of whether you use RDD or DataFrame API, can be of two types: narrow or wide. Wide-type data needs partition data to be redistributed differently between executors to be completed. The infamous shuffle operation (and creating a new execution stage).

Without AQE, determining the optimal number of DataFrame partitions resulting from performing a wide transformation (e.g. joins or aggregations) was assigned to the developer by setting the spark.sql.shuffle.partitions configuration property (default value: 200). However, without going into the merits of the data it is very difficult to establish an optimal value, with the risk of generating partitions that are too large or too small and resulting in performance problems.

Let’s say you want to run an aggregation query on data whose groups are unbalanced. Without the intervention of AQE, the number of partitions resulting will be the one we have expressed (e.g. 5) and the final result could be something similar to what is shown in the image:

Enabling AQE instead would put data from smaller partitions together in a larger partition of comparable size to the others. With a result similar to the one shown in the figure.

This optimization is triggered when the two configuration properties spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled are both set to true. Since the second is set true by default, practically to take advantage of this feature you only need to enable the global property for AQE activation.

Actually going to parse the source code you find that AQE is actually enabled only if the query needs shuffle operations or is composed of sub-queries:

and that there is a configuration property that you can use to force AQE even in the absence of one of the two conditions above.

The number of partitions after optimization will depend instead on the setting of the following configuration options:

  • spark.sql.adaptive.coalescePartitions.initialPartitionNum
  • spark.sql.adaptive.coalescePartitions.minPartitionNum
  • spark.sql.adaptive.advisoryPartitionSizeInBytes

where the first represents the starting number of partitions (default: spark.sql.shuffle.partitions), the second represents the minimum number of partitions after optimization (default: spark.default.parallelism), and the third represents the “suggested” size of the partitions after optimization (default: 64 Mb).

To test the behaviour of the dynamic coalition feature of AQE’s shuffle partitions, we’re going to create two simple datasets (one is to be understood as a lookup table that we need to have a second dataset to join).

The sample dataset is deliberately unbalanced, the transactions of our hypothetical “Very Big Company” are about 10% of the total. Those of the remaining companies about 1%:

Let’s first test what would happen without AQE.

We will receive output:

Number of partitions without AQE50

The value is exactly what we have indicated ourselves by setting the configuration property spark.sql.shuffle.partitions.

We repeat the experiment by enabling AQE.

The new output will be:

Number of partitions with AQE7

The value, in this case, was determined based on the default level of parallelism (number of allocated cores), that is, by the value of the spark.sql.adaptive.coalescePartitions.minPartitionNum configuration property.

Now let’s try what happens by “suggesting” the target size of the partitions (in terms of storage). Let’s set it to 30 Kb which is a value compatible with our sample data.

This time the output will be:

Number of partitions with AQE (advisory partition size 30Kb): 15

regardless of the number of cores allocated on the cluster for our job.

Apart from having a positive impact on performance, this feature is very useful in creating optimally sized output files (try analyzing the contents of the job output directories that I created in CSV format while being less efficient so that you can easily inspect the files).

In the second and third part of the article we will try the other two new features:

  • Dynamically switching join strategies
  • Dynamically optimizing skew joins.

Stay tuned!

 
Written by Mario Cartia – Agile Lab Big Data Specialist/Agile Skill Managing Director
 
 If you found this article useful, take a look at our blog and follow us on our Medium Publication, Agile Lab Engineering!

The world is real-time, not batch – White Paper

WHITE PAPER
THE WORLD IS REAL TIME 

NOT BATCH

An overview of Data Streaming scenario, its stages of evolution and benefits.

Are you getting your data fast enough? Why is streaming data so hot at the moment?

Time is a key factor for every business: an efficient decision-making process needs updated information.
For this reason, today’s leading organizations are looking for new and innovative ways to leverage data and improve their business agility. Running a business in real-time, gaining insights from data in motion, as it is generated, allows companies acting based on information coming in by the second, reducing the risk of bad decisions. 

By reading the document, you will go through the main evolution stages of the real-time data journey, starting from batch processing up to Continuous Intelligence.

 

Download now!

 

Would you like to discover more about Streaming Platforms?

AWS Partnership

We are proud to announce that we have been recognized “AWS Select Consulting Partner” within the Amazon Partner Network (APN).

The Select status achievement demonstrates our commitment in delivering top technology solutions and confirms our knowledge and cloud-first approach.

Discover more about our certifications and knowledge https://www.agilelab.it/technologies/

How to create an Apache Spark 3.0 development cluster on a single machine using Docker

Apache Spark is the most widely used in-memory parallel distributed processing framework in the field of Big Data advanced analytics. The main reasons for its success are the simplicity of use of its API and the rich set of features ranging from those for querying the data lake using SQL to the distributed training of complex Machine Learning models through the use the most popular algorithms.

Given this simplicity of using its API, however, one of the most frequently problem encountered by developers, similar to what happens with most distributed systems, is the creation of a development environment where you can test your applications by simulating the execution of code on multiple nodes of a cluster.

Although Spark provides a local execution mode, it may hide a number of issues due to the distributed mechanism of code execution, making testing ineffective. That’s why the most effective way to create a test environment that is more like a production cluster environment is to use Docker containers.

An additional benefit of using this approach is that you can test your applications on different versions of the framework by simply changing a few parameters in the configuration file.

In our example we will use version 3.0, the test cluster hypothetically could be useful in testing the compatibility of our applications with the recently released major version as well as to test the new features introduced.

Spark is designed so that you can run on different types of clusters. This is done by supporting several cluster managers such as YARN, the Hadoop platform resource manager, Mesos or Kubernetes. If an existing cluster infrastructure is not available, Spark can run on an integrated resource manager/scheduler. This is commonly referred as “standalone” cluster mode.

In our example, we’ll create a cluster consisting of a master node and 3 worker nodes like the one in the image below.

To setup our cluster, we will use the images created by the developers of the open source project “Big Data Europe”, whose sources are available on GitHub: https://github.com/big-data-europe/docker-spark.

We’ll make small changes to the docker-compose.yml configuration file to size the number of nodes and most importantly to add a persistent volume for reading/writing data during our experiments.

This is the configuration file that we’re going to use.

You can then start the cluster and run a shell on the master node once it starts.

And finally launch our spark-shell.

At the time of testing, I am using a laptop with a CPU with 8 cores and 16Gb of RAM. That’s why I allocated 2 cores for each executor (6 in total) and 2.5 Gb of RAM.

Our development cluster is ready, have fun!

Written by Mario Cartia – Agile Lab Big Data Specialist/Agile Skill Managing Director
 
 If you found this article useful, take a look at our blog and follow us on our Medium Publication, Agile Lab Engineering!