t>
Skip to contentApache Spark is a distributed data processing framework that is suitable for any Big Data context thanks to its features. Despite being a relatively recent product (the first open-source BSD license was released in 2010, it was donated to the Apache Foundation) on June 18th the third major revision was released that introduces several new features including adaptive Query Execution (AQE) that we are about to talk about in this article.
A bit of history
Spark was born, before being donated to the community, in 2009 within the academic context of ampLab (curiosity: AMP is the acronym for Algorithms Machine People) of the University of California, Berkeley. The winning idea behind the product is the concept of RDD, described in the paper “Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing” whose lead author is Spark Matei Zaharia’s “father”.
The idea is for a solution that solves the main problem of the distributed processing models available at the time (MapReduce in the first place): the lack of an abstraction layer for the memory usage of the distributed system. Some complex algorithms that are widely used in big data, such as many for training machine learning models, or manipulating graph data structures, reuse intermediate processing results multiple times during computation. The “single-stage” architecture of algorithms such as MapReduce is greatly penalized in such circumstances since it is necessary to write (and then re-read) the intermediate results of computation on persistent storage. I/O operations on persistent storage are notoriously onerous on any type of system, even more so on one deployed due to the additional overhead introduced by network communications. The concept of RDD implemented on Spark brilliantly solves this problem by using memory during intermediate computation steps on a “multi-stage” DAG engine.
The other milestone (I leap because I enter into the merits of RDD programming and Spark’s detailed history, although very interesting, outside the objectives of the article) is the introduction on the first stable version of Spark (which had been donated to the Apache community) of the Spark SQL module.
One of the reasons for the success of the Hadoop framework before Spark’s birth was the proliferation of products that added functionality to the core modules. Among the most used surely we have to mention Hive, SQL abstraction layer over Hadoop. Despite MapReduce’s limitations that make it underperforming to run more complex SQL queries on this engine after “translation” by Hive, the same is still widespread today mainly because of its ease of use.
The best way to retrace the history of the SQL layer on Spark is again to start with the reference papers. Shark (spark SQL’s ancestor) dating back to 2013 and the one titled “Spark SQL: Relational Data Processing in Spark” where Catalyst, the optimizer that represents the heart of today’s architecture, is introduced.
Spark SQL features are made available to developers through objects called DataFrame (or Java/Scale Datasets in type-safe) that represent RDDs at a higher level of abstraction. You can use the DataFrame API through a specific DSL or through SQL.
Regardless of which method you choose to use, DataFrame operations will be processed, translated, and optimized by Catalyst (Spark from v2.0 onwards) according to the following workflow:
What’s new
We finally get to get into the merits of Adaptive Query Execution, a feature that at the architectural level is implemented at this level. More precisely, this is an optimization that dynamically intervenes between the logical plan and the physical plan by taking advantage of the runtime statistics captured during the execution of the various stages according to the stream shown in the following image:
The Spark SQL execution stream in version 3.0 then becomes:
Optimizations in detail
Because the AQE framework is based on an extensible architecture based on a set of logical and physical plan optimization rules, it can easily be assumed that developers plan to implement additional functionality over time. At present, the following optimizations have been implemented in version 3.0:
let’s go and see them one by one by touching them with our hands through code examples.
Regarding the creation of the test cluster, we recommend that you refer to the previously published article: “How to create an Apache Spark 3.0 development cluster on a single machine using Docker”.
Dynamically coalescing shuffle partitions
Shuffle operations are notoriously the most expensive on Spark (as well as any other distributed processing framework) due to the transfer time required to move data between cluster nodes across the network. Unfortunately, however, in most cases they are unavoidable.
Transformations on a dataset deployed on Spark, regardless of whether you use RDD or DataFrame API, can be of two types: narrow or wide. Wide-type data needs partition data to be redistributed differently between executors to be completed. The infamous shuffle operation (and creating a new execution stage).
Without AQE, determining the optimal number of DataFrame partitions resulting from performing a wide transformation (e.g. joins or aggregations) was assigned to the developer by setting the spark.sql.shuffle.partitions configuration property (default value: 200). However, without going into the merits of the data it is very difficult to establish an optimal value, with the risk of generating partitions that are too large or too small and resulting in performance problems.
Let’s say you want to run an aggregation query on data whose groups are unbalanced. Without the intervention of AQE, the number of partitions resulting will be the one we have expressed (e.g. 5) and the final result could be something similar to what is shown in the image:
Enabling AQE instead would put data from smaller partitions together in a larger partition of comparable size to the others. With a result similar to the one shown in the figure.
This optimization is triggered when the two configuration properties spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled are both set to true. Since the second is set true by default, practically to take advantage of this feature you only need to enable the global property for AQE activation.
Actually going to parse the source code you find that AQE is actually enabled only if the query needs shuffle operations or is composed of sub-queries:
and that there is a configuration property that you can use to force AQE even in the absence of one of the two conditions above.
The number of partitions after optimization will depend instead on the setting of the following configuration options:
where the first represents the starting number of partitions (default: spark.sql.shuffle.partitions), the second represents the minimum number of partitions after optimization (default: spark.default.parallelism), and the third represents the “suggested” size of the partitions after optimization (default: 64 Mb).
To test the behaviour of the dynamic coalition feature of AQE’s shuffle partitions, we’re going to create two simple datasets (one is to be understood as a lookup table that we need to have a second dataset to join).
The sample dataset is deliberately unbalanced, the transactions of our hypothetical “Very Big Company” are about 10% of the total. Those of the remaining companies about 1%:
Let’s first test what would happen without AQE.
We will receive output:
Number of partitions without AQE: 50
The value is exactly what we have indicated ourselves by setting the configuration property spark.sql.shuffle.partitions.
We repeat the experiment by enabling AQE.
The new output will be:
Number of partitions with AQE: 7
The value, in this case, was determined based on the default level of parallelism (number of allocated cores), that is, by the value of the spark.sql.adaptive.coalescePartitions.minPartitionNum configuration property.
Now let’s try what happens by “suggesting” the target size of the partitions (in terms of storage). Let’s set it to 30 Kb which is a value compatible with our sample data.
This time the output will be:
Number of partitions with AQE (advisory partition size 30Kb): 15
regardless of the number of cores allocated on the cluster for our job.
Apart from having a positive impact on performance, this feature is very useful in creating optimally sized output files (try analyzing the contents of the job output directories that I created in CSV format while being less efficient so that you can easily inspect the files).
In the second and third part of the article we will try the other two new features:
Stay tuned!
WHITE PAPER
THE WORLD IS REAL TIME
NOT BATCH
Are you getting your data fast enough? Why is streaming data so hot at the moment?
Time is a key factor for every business: an efficient decision-making process needs updated information.
For this reason, today’s leading organizations are looking for new and innovative ways to leverage data and improve their business agility. Running a business in real-time, gaining insights from data in motion, as it is generated, allows companies acting based on information coming in by the second, reducing the risk of bad decisions.
By reading the document, you will go through the main evolution stages of the real-time data journey, starting from batch processing up to Continuous Intelligence.
Download now!
We are proud to announce that we have been recognized “AWS Select Consulting Partner” within the Amazon Partner Network (APN).
The Select status achievement demonstrates our commitment in delivering top technology solutions and confirms our knowledge and cloud-first approach.
Discover more about our certifications and knowledge https://www.agilelab.it/technologies/
Apache Spark is the most widely used in-memory parallel distributed processing framework in the field of Big Data advanced analytics. The main reasons for its success are the simplicity of use of its API and the rich set of features ranging from those for querying the data lake using SQL to the distributed training of complex Machine Learning models through the use the most popular algorithms.
Given this simplicity of using its API, however, one of the most frequently problem encountered by developers, similar to what happens with most distributed systems, is the creation of a development environment where you can test your applications by simulating the execution of code on multiple nodes of a cluster.
Although Spark provides a local execution mode, it may hide a number of issues due to the distributed mechanism of code execution, making testing ineffective. That’s why the most effective way to create a test environment that is more like a production cluster environment is to use Docker containers.
An additional benefit of using this approach is that you can test your applications on different versions of the framework by simply changing a few parameters in the configuration file.
In our example we will use version 3.0, the test cluster hypothetically could be useful in testing the compatibility of our applications with the recently released major version as well as to test the new features introduced.
Spark is designed so that you can run on different types of clusters. This is done by supporting several cluster managers such as YARN, the Hadoop platform resource manager, Mesos or Kubernetes. If an existing cluster infrastructure is not available, Spark can run on an integrated resource manager/scheduler. This is commonly referred as “standalone” cluster mode.
In our example, we’ll create a cluster consisting of a master node and 3 worker nodes like the one in the image below.
To setup our cluster, we will use the images created by the developers of the open source project “Big Data Europe”, whose sources are available on GitHub: https://github.com/big-data-europe/docker-spark.
We’ll make small changes to the docker-compose.yml configuration file to size the number of nodes and most importantly to add a persistent volume for reading/writing data during our experiments.
This is the configuration file that we’re going to use.
You can then start the cluster and run a shell on the master node once it starts.
And finally launch our spark-shell.
At the time of testing, I am using a laptop with a CPU with 8 cores and 16Gb of RAM. That’s why I allocated 2 cores for each executor (6 in total) and 2.5 Gb of RAM.
Our development cluster is ready, have fun!
Receive all our latest articles in your inbox
Cookie | Duration | Description |
---|---|---|
connect.sid | 2 hours | This cookie is used for authentication and for secure log-in. It registers the log-in information. |
cookielawinfo-checkbox-advertisement | 1 year | Set by the GDPR Cookie Consent plugin, this cookie is used to record the user consent for the cookies in the "Advertisement" category . |
cookielawinfo-checkbox-analytics | 11 months | This cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Analytics". |
cookielawinfo-checkbox-functional | 11 months | The cookie is set by GDPR cookie consent to record the user consent for the cookies in the category "Functional". |
cookielawinfo-checkbox-necessary | 11 months | This cookie is set by GDPR Cookie Consent plugin. The cookies is used to store the user consent for the cookies in the category "Necessary". |
cookielawinfo-checkbox-others | 11 months | This cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Other. |
cookielawinfo-checkbox-performance | 11 months | This cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Performance". |
elementor | never | This cookie is used by the website's WordPress theme. It allows the website owner to implement or change the website's content in real-time. |
viewed_cookie_policy | 1 months | The cookie is set by the GDPR Cookie Consent plugin and is used to store whether or not user has consented to the use of cookies. It does not store any personal data. |
Cookie | Duration | Description |
---|---|---|
aka_debug | session | Vimeo sets this cookie which is essential for the website to play video functionality. |
bcookie | 2 years | LinkedIn sets this cookie from LinkedIn share buttons and ad tags to recognize browser ID. |
ELOQUA | 1 year 1 month | The domain of this cookie is owned byOracle Eloqua. This cookie is used for email services. It also helps for marketing automation solution for B2B marketers to track customers through all phases of buying cycle. |
lang | session | LinkedIn sets this cookie to remember a user's language setting. |
lidc | 1 day | LinkedIn sets the lidc cookie to facilitate data center selection. |
UserMatchHistory | 1 month | LinkedIn sets this cookie for LinkedIn Ads ID syncing. |
Cookie | Duration | Description |
---|---|---|
dtCookie | This cookie is set by the provider Dynatrace. This is a session cookie used to collect information for Dynatrace. Its a system to track application performance and user errors. | |
INGRESSCOOKIE | 23 hours | This cookie is used for load balancing and session stickiness. This technical session identifier is required for some website features. |
SRM_B | 1 year 24 days | Used by Microsoft Advertising as a unique ID for visitors. |
YSC | session | This cookies is set by Youtube and is used to track the views of embedded videos. |
Cookie | Duration | Description |
---|---|---|
_ga | 2 years | This cookie is installed by Google Analytics. The cookie is used to calculate visitor, session, campaign data and keep track of site usage for the site's analytics report. The cookies store information anonymously and assign a randomly generated number to identify unique visitors. |
_gat_gtag_UA_71167806_1 | 1 minute | This cookie is set by Google and is used to distinguish users. |
_gat_UA-140152462-1 | 1 minute | This is a pattern type cookie set by Google Analytics, where the pattern element on the name contains the unique identity number of the account or website it relates to. It appears to be a variation of the _gat cookie which is used to limit the amount of data recorded by Google on high traffic volume websites. |
_gh_sess | session | GitHub sets this cookie for temporary application and framework state between pages like what step the user is on in a multiple step form. |
_gid | 1 day | This cookie is installed by Google Analytics. The cookie is used to store information of how visitors use a website and helps in creating an analytics report of how the website is doing. The data collected including the number visitors, the source where they have come from, and the pages visted in an anonymous form. |
vuid | 2 years | This domain of this cookie is owned by Vimeo. This cookie is used by vimeo to collect tracking information. It sets a unique ID to embed videos to the website. |
Cookie | Duration | Description |
---|---|---|
ANONCHK | 10 minutes | The ANONCHK cookie, set by Bing, is used to store a user's session ID and also verify the clicks from ads on the Bing search engine. The cookie helps in reporting and personalization as well. |
IDE | 1 year 24 days | Used by Google DoubleClick and stores information about how the user uses the website and any other advertisement before visiting the website. This is used to present users with ads that are relevant to them according to the user profile. |
MUID | 1 year 24 days | Bing sets this cookie to recognize unique web browsers visiting Microsoft sites. This cookie is used for advertising, site analytics, and other operations. |
test_cookie | 15 minutes | This cookie is set by doubleclick.net. The purpose of the cookie is to determine if the user's browser supports cookies. |
uuid | never | MediaMath sets this cookie to avoid the same ads from being shown repeatedly and for relevant advertising. |
VISITOR_INFO1_LIVE | 5 months 27 days | This cookie is set by Youtube. Used to track the information of the embedded YouTube videos on a website. |
yt-remote-connected-devices | never | YouTube sets this cookie to store the video preferences of the user using embedded YouTube video. |
yt-remote-device-id | never | YouTube sets this cookie to store the video preferences of the user using embedded YouTube video. |
yt.innertube::nextId | never | This cookie, set by YouTube, registers a unique ID to store data on what videos from YouTube the user has seen. |
yt.innertube::requests | never | This cookie, set by YouTube, registers a unique ID to store data on what videos from YouTube the user has seen. |
Cookie | Duration | Description |
---|---|---|
_clck | 1 year | No description |
_clsk | 1 day | No description |
_lfa | 2 years | This cookie is set by the provider Leadfeeder. This cookie is used for identifying the IP address of devices visiting the website. The cookie collects information such as IP addresses, time spent on website and page requests for the visits.This collected information is used for retargeting of multiple users routing from the same IP address. |
_lfa_test_cookie_stored | past | No description |
_octo | 1 year | No description available. |
AnalyticsSyncHistory | 1 month | No description |
CkswdEcVLNG | 1 day | No description |
CLID | 1 year | No description |
CONSENT | 16 years 6 months 22 days 15 hours 4 minutes | No description |
dFjDpuJ | 1 day | No description |
encheventsnippet | 1 year | No description |
HnQ-UxFdSB | 1 day | No description |
logged_in | 1 year | No description available. |
nzUkEOhlVyTH- | 1 day | No description |
QKeTjOPgZ | 1 day | No description |
route-gcrowd-fe-prod | No description | |
SM | session | No description available. |
tEbJAInwcapD_sU | 1 day | No description |
uUxecYM-Q_oZPt | 1 day | No description |