The journey to Elasticsearch – Tech Talk with Neodata Group

Tech Talk with Neodata Group: The journey to Elasticsearch

In this first episode of our first Tech Talk, we hosted Neodata Group to talk about their journey to Elasticsearch, the open-source search and analytics engine for all types of data.
We explored some interesting use cases on data & analytics.

See the full video and stay tuned to discover all Agile Lab’s events on our blog!

Spark 3.0: First hands-on approach with Adaptive Query Execution (Part 3)

Spark 3.0: First hands-on approach with Adaptive Query Execution (Part 3)

In the previous articles (1)(2), we started analyzing the individual features of Adaptive Query Execution introduced on Spark 3.0. In particular, we analyzed “dynamically coalescing shuffle partitions” and “dynamically switching join strategies”. Last but not least, let’s analyze what will probably be the most-awaited and appreciated feature:

Dynamically optimizing skew joins

To understand exactly what it is, let’s take a moment’s step back with the theory, remembering that a DataFrame is on Spark an abstraction from the concept of RDD, which is in turn a logical abstraction of the dataset that can be processed in a distributed way thanks to the concept of partition. A dataset on Spark, once “transformed” into RDD, is divided into partitions where ideally the data is distributed in a balanced way:

In practice, it is very common for RDDs generated by certain operations (such as key grouping) to be unbalanced:

The unfortunate consequence of this is that, since computation parallels are based on the concept of data partitioning, we will not be able to adequately exploit the resources of the cluster. Identifying such situations is fairly straightforward by using the Spark UI. How many of you who are regular users of Spark will certainly have found yourself in the unpleasant situation, during which the progress bar related to an operation stops for a long time on the last tasks, giving the feeling, in the most “serious” cases, that the job has gone into freeze.

Analyzing the tasks in detail, it is easy to identify the “incriminated” ones by looking at the duration, size and number of records processed that will be one or more orders of magnitude larger than all the others.

Techniques such as adding additional join keys (where possible) or key salting were used to resolve these situations without AQE. With an additional effort from developers for implementation, testing etc.

AQE mechanisms transparently discover and optimize implementation.

Let’s see how to enable the feature and set configuration parameters correctly. The core directive spark.sql.adaptive.skewJoin.enabled is set to true by default. As in the previous cases, it is sufficient to enable AQE (spark.sql.adaptive.enabled) to take advantage of the optimization in question.

To find out what policy Spark uses to identify a skewed partition, simply analyze the sources of the org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin class (class that extends Rule[SparkPlan]).

where ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR corresponds to the configuration property spark.sql.adaptive.skewJoin.skewedPartitionFactor and SKEW_JOIN_SKEWED_PARTITION_THRESHOLD to spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes which ideally needs to be larger than the property spark.sql.adaptive.advisoryPartitionSizeInBytes we’ve already seen about the shuffle partitions coalesce. By default, the threshold is set to 256Mb.

To show how this optimization works, we’ll borrow the excellent example of the article “The Taming of the Skew” based on two hypothetical tables in a car manufacturer’s database:

That will be created ad-hoc with an imbalance in the number of records related to one of the join keys (represented by the brand (make) and model pair).

Let’s check that the keys are really unbalanced:

Now let’s try to join with AQE disabled (we also disable broadcast joins as the tables are very small and would frustrate the result of our experiment):

and let’s analyze the results. The longest task took, as we would have expected, a good 1.2 minutes out of 1.3 minutes total having processed most of the data concentrated on a single partition.

Now let’s repeat the experiment with AQE enabled and the configuration properties set appropriately based on the size of the sample data we’re using:

First of all, we note that the execution time drops from 1.3 min to 34 seconds. And that longer tasks now only take 4 seconds. Working fine-tuning on the parameters, I am convinced that we could get even better performance but for what was our purposes we stop here.

If you want to read the previous parts of the article:

Part 1- Dynamically coalescing shuffle partitions

Part 2 – Dynamically switching join strategies

Written by Mario Cartia – Agile Lab Big Data Specialist/Agile Skill Managing Director
 If you found this article useful, take a look at our blog and follow us on Agile Lab Engineering, our Medium Publication.

Agile Lab ospite di InnoTech, l’Hub di The European House Ambrosetti per parlare di Machine Learning e Big Data in ambito assicurativo

Agile Lab ospite di InnoTech, l’Hub di The European House Ambrosetti per parlare di Machine Learning e Big Data in ambito assicurativo

Alberto Firpo, Co-Founder & CEO di Agile Lab, è il protagonista dell’ episodio di “InnoTechCast – Leaders’ View on Innovation” per parlare di Machine Learning e Big Data applicati all’ambito assicurativo.

The European House Ambrosetti, all’interno di InnoTech Hub, riferimento per l’ecosistema dell’innovazione e della tecnologia italiana in Europa e nel mondo, nel bel mezzo della pandemia COVID, ha lanciato InnoTechCast, un nuovo format per raccontare in digitale il punto di vista dei principali leader della community italiana del mondo dell’innovazione e della tecnologia.

In questo podcast Alberto Firpo racconta come i dati raccolti dalle “black-box” installate sulle autovetture, grazie a modelli di Machine Learning e sistemi real-time, forniscano informazioni utili alle aziende del mondo assicurativo.

In particolare, attraverso la piattaforma WASP di Agile Lab, i dati raccolti in tempo reale consentono alle compagnie assicurative di attivare servizi per i clienti, in real-time, e formulare le polizze sulla base del “driving-behaviour“, il comportamento di guida del conducente rilevato attraverso modalità innovative.

Ascolta il podcast!

Spark 3.0: First hands-on approach with Adaptive Query Execution (Part 2)

In the previous article, we started analyzing the individual features of Adaptive Query Execution introduced on Spark 3.0. In particular, the first feature analyzed was “dynamically coalescing shuffle partitions”. Let’s get on with our road test.

Dynamically switching join strategies

The second optimization implemented in AQE is the runtime switch of the dataframe join strategy.

Let’s start with the fact that Spark supports a variety of types of joins (inner, outer, left, etc.). The execution engine supports several implementations that can run them, each of which has advantages and disadvantages in terms of performance and resource utilization (memory in the first place). The optimizer’s job is to find the best tradeoff at the time of execution.

Going into more detail the join strategies supported by Spark are:

  • Broadcast Hash Join
  • Shuffle Hash Join
  • Sort-merge Join
  • Cartesian Join

Without going into too much detail of the individual strategies (which is beyond the scope of the current treatment), the Broadcast Hash Join is the preferred strategy in all those cases where the size of one of the parts of the report is such that the broadcast table can be easily transferred to all executors and the “map-side” join avoiding the burden of shuffle operations (and the creation of a new execution stage). This technique, where applicable, provides excellent benefits in terms of reducing execution times.

Spark allows setting the spark.sql.autoBroadcastJoinThreshold configuration property to force the use of this strategy where one of the dataframes involved in the join is smaller than the specified threshold (the default value of the property in question is 10 Mb). Without AQE, however, the size of the dataframe is determined statically during the optimization phase of the execution plan. In some cases, however, the runtime size of the relationship is significantly smaller than the total size. Think of a join where there is a filter condition that at runtime will cut most records.

To better understand the potential of this optimization we will go to make a practical example. We will use THE public datasets of IMDB (also known as the Internet Movie Database) for our purpose. In particular, the film dataset (title.akas.tsv.gz) and the cast dataset.

The dataset with the cast members is tied to the title dataset through the tconst field. The title dataset weighs about 195 MB and the cast dataset weighs about 325 Mb (gzip compression).

Leaving the default value for the broadcast limit threshold unmodified by trying to join the two datasets, the join strategy selected would of course be SortMerge. Without AQE even applying a very restrictive filter (for example, filtering the dataframe of the titles leaving only those related to the Virgin Islands that are very few) SortMerge would also be selected as a strategy. Try:

See what happens instead by activating AQE:

Thanks to the statistics calculated at runtime and the adaptive execution plan, the most correct strategy has been selected in this case.

The latest optimization, concerning dynamically optimizing skew joins, will be discussed in the last part of the article. Not to be missed!

Written by Mario Cartia – Agile Lab Big Data Specialist/Agile Skill Managing Director
 If you found this article useful, take a look at our blog and follow us on our Medium Publication, Agile Lab Engineering!

Spark 3.0: First hands-on approach with Adaptive Query Execution (Part 1)

Apache Spark is a distributed data processing framework that is suitable for any Big Data context thanks to its features. Despite being a relatively recent product (the first open-source BSD license was released in 2010, it was donated to the Apache Foundation) on June 18th the third major revision was released that introduces several new features including adaptive Query Execution (AQE) that we are about to talk about in this article.

A bit of history

Spark was born, before being donated to the community, in 2009 within the academic context of ampLab (curiosity: AMP is the acronym for Algorithms Machine People) of the University of California, Berkeley. The winning idea behind the product is the concept of RDD, described in the paper “Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing” whose lead author is Spark Matei Zaharia’s “father”.

The idea is for a solution that solves the main problem of the distributed processing models available at the time (MapReduce in the first place): the lack of an abstraction layer for the memory usage of the distributed system. Some complex algorithms that are widely used in big data, such as many for training machine learning models, or manipulating graph data structures, reuse intermediate processing results multiple times during computation. The “single-stage” architecture of algorithms such as MapReduce is greatly penalized in such circumstances since it is necessary to write (and then re-read) the intermediate results of computation on persistent storage. I/O operations on persistent storage are notoriously onerous on any type of system, even more so on one deployed due to the additional overhead introduced by network communications. The concept of RDD implemented on Spark brilliantly solves this problem by using memory during intermediate computation steps on a “multi-stage” DAG engine.

The other milestone (I leap because I enter into the merits of RDD programming and Spark’s detailed history, although very interesting, outside the objectives of the article) is the introduction on the first stable version of Spark (which had been donated to the Apache community) of the Spark SQL module.

One of the reasons for the success of the Hadoop framework before Spark’s birth was the proliferation of products that added functionality to the core modules. Among the most used surely we have to mention Hive, SQL abstraction layer over Hadoop. Despite MapReduce’s limitations that make it underperforming to run more complex SQL queries on this engine after “translation” by Hive, the same is still widespread today mainly because of its ease of use.

The best way to retrace the history of the SQL layer on Spark is again to start with the reference papers. Shark (spark SQL’s ancestor) dating back to 2013 and the one titled “Spark SQL: Relational Data Processing in Spark” where Catalyst, the optimizer that represents the heart of today’s architecture, is introduced.

Spark SQL features are made available to developers through objects called DataFrame (or Java/Scale Datasets in type-safe) that represent RDDs at a higher level of abstraction. You can use the DataFrame API through a specific DSL or through SQL.

Regardless of which method you choose to use, DataFrame operations will be processed, translated, and optimized by Catalyst (Spark from v2.0 onwards) according to the following workflow:

What’s new

We finally get to get into the merits of Adaptive Query Execution, a feature that at the architectural level is implemented at this level. More precisely, this is an optimization that dynamically intervenes between the logical plan and the physical plan by taking advantage of the runtime statistics captured during the execution of the various stages according to the stream shown in the following image:

The Spark SQL execution stream in version 3.0 then becomes:

Optimizations in detail

Because the AQE framework is based on an extensible architecture based on a set of logical and physical plan optimization rules, it can easily be assumed that developers plan to implement additional functionality over time. At present, the following optimizations have been implemented in version 3.0:

  • Dynamically coalescing shuffle partitions
  • Dynamically switching join strategies
  • Dynamically optimizing skew joins

let’s go and see them one by one by touching them with our hands through code examples.

Regarding the creation of the test cluster, we recommend that you refer to the previously published article: “How to create an Apache Spark 3.0 development cluster on a single machine using Docker”.

Dynamically coalescing shuffle partitions

Shuffle operations are notoriously the most expensive on Spark (as well as any other distributed processing framework) due to the transfer time required to move data between cluster nodes across the network. Unfortunately, however, in most cases they are unavoidable.

Transformations on a dataset deployed on Spark, regardless of whether you use RDD or DataFrame API, can be of two types: narrow or wide. Wide-type data needs partition data to be redistributed differently between executors to be completed. The infamous shuffle operation (and creating a new execution stage).

Without AQE, determining the optimal number of DataFrame partitions resulting from performing a wide transformation (e.g. joins or aggregations) was assigned to the developer by setting the spark.sql.shuffle.partitions configuration property (default value: 200). However, without going into the merits of the data it is very difficult to establish an optimal value, with the risk of generating partitions that are too large or too small and resulting in performance problems.

Let’s say you want to run an aggregation query on data whose groups are unbalanced. Without the intervention of AQE, the number of partitions resulting will be the one we have expressed (e.g. 5) and the final result could be something similar to what is shown in the image:

Enabling AQE instead would put data from smaller partitions together in a larger partition of comparable size to the others. With a result similar to the one shown in the figure.

This optimization is triggered when the two configuration properties spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled are both set to true. Since the second is set true by default, practically to take advantage of this feature you only need to enable the global property for AQE activation.

Actually going to parse the source code you find that AQE is actually enabled only if the query needs shuffle operations or is composed of sub-queries:

and that there is a configuration property that you can use to force AQE even in the absence of one of the two conditions above.

The number of partitions after optimization will depend instead on the setting of the following configuration options:

  • spark.sql.adaptive.coalescePartitions.initialPartitionNum
  • spark.sql.adaptive.coalescePartitions.minPartitionNum
  • spark.sql.adaptive.advisoryPartitionSizeInBytes

where the first represents the starting number of partitions (default: spark.sql.shuffle.partitions), the second represents the minimum number of partitions after optimization (default: spark.default.parallelism), and the third represents the “suggested” size of the partitions after optimization (default: 64 Mb).

To test the behaviour of the dynamic coalition feature of AQE’s shuffle partitions, we’re going to create two simple datasets (one is to be understood as a lookup table that we need to have a second dataset to join).

The sample dataset is deliberately unbalanced, the transactions of our hypothetical “Very Big Company” are about 10% of the total. Those of the remaining companies about 1%:

Let’s first test what would happen without AQE.

We will receive output:

Number of partitions without AQE50

The value is exactly what we have indicated ourselves by setting the configuration property spark.sql.shuffle.partitions.

We repeat the experiment by enabling AQE.

The new output will be:

Number of partitions with AQE7

The value, in this case, was determined based on the default level of parallelism (number of allocated cores), that is, by the value of the spark.sql.adaptive.coalescePartitions.minPartitionNum configuration property.

Now let’s try what happens by “suggesting” the target size of the partitions (in terms of storage). Let’s set it to 30 Kb which is a value compatible with our sample data.

This time the output will be:

Number of partitions with AQE (advisory partition size 30Kb): 15

regardless of the number of cores allocated on the cluster for our job.

Apart from having a positive impact on performance, this feature is very useful in creating optimally sized output files (try analyzing the contents of the job output directories that I created in CSV format while being less efficient so that you can easily inspect the files).

In the second and third part of the article we will try the other two new features:

  • Dynamically switching join strategies
  • Dynamically optimizing skew joins.

Stay tuned!

Written by Mario Cartia – Agile Lab Big Data Specialist/Agile Skill Managing Director
 If you found this article useful, take a look at our blog and follow us on our Medium Publication, Agile Lab Engineering!