Hold my Beer: Kafka – 10th December 2020 | 7 PM

We are excited to launch our first Hold my Beer: next December 10th, at 7 PM, we will explore 
Kafka world drinking a beer together.

We are all swamped with online webinars, so we have planned something different, more relaxed,
where everyone can participate and liven up the discussion.

Are you proud of a problem solved using Kafka and want to share it with us (maybe letting the code talk)? Are you looking for advice? Have you got questions or just curiosities? Do you just want to get an idea of what Kafka is? This is the event for you!

P.S. We recommend you to to pick also a good beer (we promise that when we will be able to set up a live event, we will also bring beers for everyone!).

Click here to sign up!

 

Spark 3.0: First hands-on approach with Adaptive Query Execution (Part 2)

In the previous article, we started analyzing the individual features of Adaptive Query Execution introduced on Spark 3.0. In particular, the first feature analyzed was “dynamically coalescing shuffle partitions”. Let’s get on with our road test.

Dynamically switching join strategies

The second optimization implemented in AQE is the runtime switch of the dataframe join strategy.

Let’s start with the fact that Spark supports a variety of types of joins (inner, outer, left, etc.). The execution engine supports several implementations that can run them, each of which has advantages and disadvantages in terms of performance and resource utilization (memory in the first place). The optimizer’s job is to find the best tradeoff at the time of execution.

Going into more detail the join strategies supported by Spark are:

  • Broadcast Hash Join
  • Shuffle Hash Join
  • Sort-merge Join
  • Cartesian Join

Without going into too much detail of the individual strategies (which is beyond the scope of the current treatment), the Broadcast Hash Join is the preferred strategy in all those cases where the size of one of the parts of the report is such that the broadcast table can be easily transferred to all executors and the “map-side” join avoiding the burden of shuffle operations (and the creation of a new execution stage). This technique, where applicable, provides excellent benefits in terms of reducing execution times.

Spark allows setting the spark.sql.autoBroadcastJoinThreshold configuration property to force the use of this strategy where one of the dataframes involved in the join is smaller than the specified threshold (the default value of the property in question is 10 Mb). Without AQE, however, the size of the dataframe is determined statically during the optimization phase of the execution plan. In some cases, however, the runtime size of the relationship is significantly smaller than the total size. Think of a join where there is a filter condition that at runtime will cut most records.

To better understand the potential of this optimization we will go to make a practical example. We will use THE public datasets of IMDB (also known as the Internet Movie Database) for our purpose. In particular, the film dataset (title.akas.tsv.gz) and the cast dataset.

The dataset with the cast members is tied to the title dataset through the tconst field. The title dataset weighs about 195 MB and the cast dataset weighs about 325 Mb (gzip compression).

Leaving the default value for the broadcast limit threshold unmodified by trying to join the two datasets, the join strategy selected would of course be SortMerge. Without AQE even applying a very restrictive filter (for example, filtering the dataframe of the titles leaving only those related to the Virgin Islands that are very few) SortMerge would also be selected as a strategy. Try:

See what happens instead by activating AQE:

Thanks to the statistics calculated at runtime and the adaptive execution plan, the most correct strategy has been selected in this case.

The latest optimization, concerning dynamically optimizing skew joins, will be discussed in the last part of the article. Not to be missed!

Written by Mario Cartia – Agile Lab Big Data Specialist/Agile Skill Managing Director
 
 If you found this article useful, take a look at our blog and follow us on our Medium Publication, Agile Lab Engineering!

The secret to reduce Spark applications costs

Who of you has right now the reasonable certainty that all your Spark jobs are performing at their maximum without wasting more computational resources than necessary? If so, what information are you basing it on? Would you be able to quantify the waste of resources in economic terms? In my on-field 7 years’ experience at Agile Lab, about 5 to 10% maximum of Spark users really know and master what’s happening inside their processes.

Apache Spark is one of the most successful open-source projects in the world, and that’s because it could make big (huge) data processing extremely easy by hiding its distributed computation complexity inside its engine. This allowed every programmer without specific knowledge to approach and use the Spark API to manipulate a distributed dataset as it was a collection of objects in the local memory.

Unfortunately, writing an algorithm or an ETL pipeline in Spark, leveraging its built-in APIs, doesn’t guarantee it will be able to linearly scale up with the data increase, nor it always auto-optimize itself in order to perform at its best, according to the given computational resources.

Even now, after many years after its mass adoption, it’s not so unusual to incur into underperforming or resources overusing solutions. This is because distributed-systems-inexperienced developers are put without worries to write computationally complex processes as if they were just like “normal” Java applications.

But what are the turns of this in terms of business?

Execution time multiplied by the allocated resources (CPU and RAM) is directly proportional to the economic costs and, in the Cloud world, this is even more impacting than on-premise.

Typically, Big Data infrastructures are very expensive and it’s not so rare to find out Spark jobs allocating hundreds of vcores and GB of memory.

So basically, either inefficient jobs occupy a certain amount of resources for more time than necessary, or they allocate more resources than necessary thus wasting them during their runtime. Such wastes repeat themselves every time these jobs are executed, thus making it to all effects a recurring cost (OPEX).

How can we know if our jobs are efficient or not?

First rule – Don’t believe what developers declare about these jobs and here’s why:

  • The tuning process is long and boring, in fact it requires to rerun the job over and over, simulating different scenarios and collecting performance data in an extremely scrupulous fashion. These are repetitive and high precision tasks that most of developers just avoid doing.
  • It’s difficult to optimize something you don’t have a reference benchmark against, so the common practice is thinking to have reached the top performance too soon, also because questioning ourselves is always a difficult path to undertake.
  • Usually, they don’t precisely know the dimensions over which to optimize and they don’t have the correct tools to instrument the jobs.

Let’s make an example of how I’ve usually seen Spark jobs tuning done.

The easiest approach is to provide the job with a certain amount of resources and then progressively reduce them until it doesn’t crash, then increase them a little so to have an extra safety margin.

But can we really trust this process? What happens if the input data volume is not constant day after day? Are we sure we can’t optimize the code and making it more efficient? Did we verify that the crash wasn’t caused by data skewness on a specific node?

The answer is not trivial and it requires experience and different debug and instrumentation tools. For instance, let me tell you about an episode that will show you how much the process mentioned above is superficial.

It occurred to me once to troubleshoot a job allocating about 100 Spark executors with 20GB RAM each, so a total of 2TB RAM. It was an import/ingestion job scheduled to run every day and lasting about 30 minutes, thus costing about 200$ per run. The developers team did try to reduce the amount of allocated executor’s memory, but faced OutOfMemory issues, so they were resigned it was performing the best possible. The number of executors was so high since they actually noticed little improvements in the job duration time by increasing the overall parallelism.

After the analysis, it came out that only one of the executors was actually working for the first 28 minutes (out of 30) and that it needed 20GB of RAM to complete its tasks. We modified the source code and the files structure then and got to have the job to linearly scale up and completing in just 12 minutes with only 20 executors having 20GB RAM each, thus making it a 16$ cost per run (67.000$ save per year!).

To conclude, my strong advice for all the IT executives managing Spark-based projects/systems/applications and — most of all — their associated costs is to always ask detailed reports about the performance tuning processes (before going into production), such as:

  • Audit of all the tests attempts, with related configuration, input data e timings of all the main elaboration steps.
  • Memory and CPU profiling of all the executors JVMs (not the hosts!)
  • Data skewness analysis, to understand if data is uniformly and efficiently distributable across the executor’s cores during all the elaboration steps.
  • Time lost by executors, to verify that there are no hot-spotting ones thus keeping most of the others idle.
  • Worst case scenario simulations, in terms of data and in terms of cluster events, such as YARN or the cluster resource manager not able to allocate the requested containers, input data doubles or accumulates for days due to unpredicted system stops, etc …
  • Opportunities to optimize the code base and effort spent in this direction.

I know this seems to be an expensive extra activity, but they are CAPEX costs that positively impact the OPEX ones. If they tell you no benefits/improvements come out of this type of tasks, then beware!

Would you like to go deeper? Take a look at Agile Lab Spark Radar to find out how you can easily optimize your Spark applications performance.

Written by Paolo Platter – Agile Lab CTO & Co-Fouder
 
To read more about Apache Spark and related topics, take a look at our blog and follow us on our Medium Publication, Agile Lab Engineering!