Hold my Beer: Kubernetes loves Ansible – 13th January 2021 | 6,30 PM

A new Hold my Beer event: next January 13th, at 6.30 PM, we will talk about how to configure a Kubernetes cluster using Ansible.

Speakers:

Mario Vitale -A DevOps Engineer @ Flixbus who loves playing music, fitness, videogames and open source. He will be the main speaker of the event.

Carlo Ventrella, SRE Engineer @ AgileLab will support Mario during the talk.

You: for a fruitful discussion, everyone can take part. The more, the merrier! 

Discover more and sign up soon!

The journey to Elasticsearch – Tech Talk with Neodata Group

Tech Talk with Neodata Group: The journey to Elasticsearch

In this first episode of our first Tech Talk, we hosted Neodata Group to talk about their journey to Elasticsearch, the open-source search and analytics engine for all types of data.
We explored some interesting use cases on data & analytics.

See the full video and stay tuned to discover all Agile Lab’s events on our blog!

Spark Remote Debugging

 

Spark Remote Debugging

Hi everybody! I’m a Big Data Engineer @ Agile Lab, a remote-first Big Data engineering and R&D firm located in Italy. Our main focus is to build Big Data and AI systems, in a very challenging — yet awesome — environment.

In Agile Lab we use in various projects, among other technologies, Apache Spark as a processing engine. Spark is fast and it is simple for a developer to write some code that can be run right away, but as for regular programs, it is not always easy to understand what a Spark job is doing.

This article will focus on how a developer can remotely debug a running Spark Scala/Java application (running on YARN) using IntelliJ IDEA, but all the Spark and environment configurations hold also for other IDEs.

Agent JDWP, licence to debug

To perform remote debugging of a Spark job, we leverage the JDWP agent (Java Debug Wire Protocol) that defines a communication protocol between a debugger and a running JVM. JDWP defines only the format and layout of packets exchanged by the debugger and the target JVM, while the transport protocol can be chosen by the user. Usually, the available transport mechanisms are shared memory (dt_shmem) and socket (dt_socket) but only the latter, which uses a TCP socket connection to communicate, can be used for remote debugging.

So in order to enable remote debugging, we must configure the target JVM with the following Java property in order to make it acting as a JDWP server to which our IDE can connect:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=4747

The property above tells the JVM to load the JDWP agent and wait for a socket connection on the specified port. In particular:

  • transport=dt_socket tells the agent to use socket as the desired transport mechanism.
  • server=y means that the JVM will act a JDWP server: it will listen for a debugger client to attach to it.
  • suspend=y tells the JVM if it must wait for a debugger connection before executing the main function. If this is set to false (n), the main function will start while listening for the debugger connection anyway.
  • address=4747 specifies the port at which the debug socket will listen on. In the example, the target JVM will listen on port 4747 for incoming client connections.

We will leverage the JDWP agent for all the following remote debugging scenarios, so remember that you can always adjust the configurations listed above to fit your use case.

You must choose your Spark deployment …but choose wisely

Before delving into the debug of your application, here’s a quick recap of how a Spark job executes on a cluster; each Spark job requires:
* a process called driver that performs all the standard Java code
* one or more executor processes that will perform all the code defined inside the transformations and actions of the RDDs/Datasets.

This means that in a realistic scenario we will have different JVMs running at the same time (often on different nodes): one for the driver and one for each executor.

A typical Spark application will run on multiple nodes, each containing one or more processes

Spark allows the developer to run a job in different modes depending on the requirement of the desired use case. In particular, we will focus on two configurations of the spark-submit command: deploy-mode and master.

Those two configurations allow a developer to decide how the Spark application will be deployed and run:

  • master: this configuration tells Spark which is the master URL of the cluster. A complete list of all the allowed values can be found in the official Spark documentation.
  • deploy-mode: Whether to deploy your driver on one of the worker nodes (cluster) or locally as an external client (client). In client mode, the driver is started in the same node where the spark-submit is launched.

Even in one of the allowed value for master is local, which allows running the application on the local machine specifying how many threads should be used, we will not explore it: there is no need for remote debugging if the Spark application runs with master local since everything runs on the same local JVM.

This article will focus only on Spark applications launched with a spark-submit run against a YARN cluster (master yarn configuration), but the same considerations will hold also for other resource managers.

Your honour, my client pleads not guilty

When the spark-submit command is invoked with client deploy-mode, Spark will spawn the driver in the client process that performed the submit command. Executors will spawn into nodes of the cluster depending on the resources associated with them.

In client mode, the driver is spawned in the same process used to start the spark-submit command

If you are performing the spark-submit command from an edge node of your cluster, you can debug the driver code by simply passing the JDWP agent configuration as a driver extra Java option:

spark-submit
    --class org.example.MyJob \
    --master yarn \
    --deploy-mode client \
    --conf "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \
    myJob.jar

The command above will start the Spark job with the driver running on the edge node and listening on port 4747 (this value is arbitrary, you can choose any available port number here). Now we can setup our IDE to start a remote debug towards the edge node IP and port 4747:

Define a remote debug configuration with the edge node IP as host

We can start debugging our code with the configuration just defined by clicking on the debug icon:

Always check that the chosen debug configuration is the remote one

Code execution will stop at each breakpoint we define in our code; remember that since we are debugging the driver we can set up breakpoints anywhere in our code except for the code defined inside RDDs/Datasets, that will be performed by the executors.

You and what nodes army?

To debug the executor code we can focus on how a Spark job behaves in cluster deploy-mode: the driver is spawned into one of the cluster nodes as well as the executors. So in this configuration connecting to the driver or one of the executors will require us to check where the processes are actually running.

In cluster mode, the driver will be spawned in one of the cluster nodes, as done for the executors

If we need to debug the driver we can pass the same configurations presented above for the client mode, while if we need to debug one of the executors we should pass the agent properties in the executor options instead of the driver options:

spark-submit
    --class org.example.MyJob \
    --master yarn \
    --deploy-mode client \
    --conf "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \
    myJob.jar

As discussed, in cluster mode if we want to debug the driver or one of the executors we first need to find out where the actual process is running. To do so we can leverage the Spark UI: whenever we start the job we can access its Spark UI using the link printed by the spark-submit command.

Then we can access the Executors section of the Spark UI, where all the running processes associated to our job are listed. From here we can see the driver node and all the nodes where executors are running, so we can find the executor IP and use it in our IDE debug configuration.

In this case the driver was spawned on cluster.node2 and the single executor on cluster.node1

At this point, we need to check the Spark UI to find out the IP addresses of the desired nodes. Since we defined the port in the agent configuration, we should keep the same debug port, changing only the IP address.

If we need to debug the executor code, since in the Spark UI we saw that the executor was cluster.node1, we can set that address in the remote debug configuration

To simplify the debugging mechanism it is advised to start your Spark job with only one executor: debugging a Spark job with multiple executors dramatically increases its complexity. For example, if two executors are spawned on the same node, they will have the same IP address and debug port, so it could lead to inconsistencies. For debug purposes, you could scale it down to only one executor.

spark-submit
    --class org.example.MyJob \
    --master yarn \
    --deploy-mode client \
    --num-executors 1 \
    --conf "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \
    myJob.jar

If we decide to debug one of the executors, code execution will stop at each breakpoint we define inside RDDs/Datasets transformations and actions; remember that since we are debugging the executor, all breakpoints set up outside RDDs/Datasets will not be reached since that code will be performed only by the driver process.

In a cluster far, far away…

In a real scenario, the Spark job will run inside a cluster which is not accessible from the outside world: you can only access a cluster edge node, but you are not able to access directly the nodes where the driver and executors processes will run. The edge node can access the nodes since it is co-located in the cluster.

In a real scenario, the cluster is not accessible from the outside and you can only communicate with the edge nodes of the cluster

In this situation we need to take advantage of a mechanism called port forwarding: we can forward the port of our target node to a port of the edge node. In this way, we can use our edge node (for which we have access) as a proxy for the desired target node (for which we don’t).

A very common way to perform the port forwarding is to use ssh:

ssh -L 4848:target.node:4747 user@edge.node

The command above, run from your local machine, will connect to the edge node (edge.node) with the specified username (user). Obviously you must provide your identity to the remote machine, the edge node, using one of several methods depending on the protocol version used. The -L specifies that the given port on the local host (4848) has to be forwarded to the given host (target.node) and port (4747) on the remote side.

In summary, in order to perform remote debugging of your code for a Spark job running in a cluster like the one described above you will need to:

  1. add the desired agent configuration to the spark-submit command
  2. start the job
  3. open the Spark UI and find out where your process is running
  4. use the ssh command to forward the port specified in the agent from the target node to your local machine through the edge node
  5. start the remote debug from your IDE using as IP and port localhost and the forwarded port
Since we forwarded the remote port 4747 to our local port 4848 we can edit the remote debug configuration to listen for localhost on port 4848

If you made it this far, you may be interested in other Spark-related articles that you can find on our blog.

Stay tuned because other Spark articles are coming!

Written by Lorenzo Pirazzini – Agile Lab Big Data Engineer

Il caso Vodafone Automotive: Real-time Analytics in applicazioni mission Critical con Agile Lab WASP e Cloudera

Il caso Vodafone Automotive: Real-time Analytics in applicazioni mission Critical con Agile Lab WASP e Cloudera

Si è da poco conclusa l’edizione 2020 dell’Osservatorio Big Data & Business Analytics del Politecnico di Milano, da anni un punto di riferimento per il mondo della ricerca e non solo, che, come ogni anno, ha delineato l’evoluzione del mercato e portato alla ribalta interessanti use case realizzati in ambito Big Data.

Tra questi è stato inserito il progetto realizzato da Vodafone Automotive, l’azienda del noto gruppo che si occupa di fornire servizi, prodotti ed architetture telematiche afferenti al mondo della mobilità. Grazie all’impiego della piattaforma WASP – Wide Analytics Streaming Platform – di Agile Lab e alla tecnologia Cloudera, Vodafone Automotive è stata in grado di gestire la grande mole di dati provenienti dalle black box installate sulle vetture per raccogliere, analizzare ed elaborare un’ingente mole di dati ed erogare servizi ai propri clienti in near-real-time.

Spark 3.0: First hands-on approach with Adaptive Query Execution (Part 3)

Spark 3.0: First hands-on approach with Adaptive Query Execution (Part 3)

In the previous articles (1)(2), we started analyzing the individual features of Adaptive Query Execution introduced on Spark 3.0. In particular, we analyzed “dynamically coalescing shuffle partitions” and “dynamically switching join strategies”. Last but not least, let’s analyze what will probably be the most-awaited and appreciated feature:

Dynamically optimizing skew joins

To understand exactly what it is, let’s take a moment’s step back with the theory, remembering that a DataFrame is on Spark an abstraction from the concept of RDD, which is in turn a logical abstraction of the dataset that can be processed in a distributed way thanks to the concept of partition. A dataset on Spark, once “transformed” into RDD, is divided into partitions where ideally the data is distributed in a balanced way:

In practice, it is very common for RDDs generated by certain operations (such as key grouping) to be unbalanced:

The unfortunate consequence of this is that, since computation parallels are based on the concept of data partitioning, we will not be able to adequately exploit the resources of the cluster. Identifying such situations is fairly straightforward by using the Spark UI. How many of you who are regular users of Spark will certainly have found yourself in the unpleasant situation, during which the progress bar related to an operation stops for a long time on the last tasks, giving the feeling, in the most “serious” cases, that the job has gone into freeze.

Analyzing the tasks in detail, it is easy to identify the “incriminated” ones by looking at the duration, size and number of records processed that will be one or more orders of magnitude larger than all the others.

Techniques such as adding additional join keys (where possible) or key salting were used to resolve these situations without AQE. With an additional effort from developers for implementation, testing etc.

AQE mechanisms transparently discover and optimize implementation.

Let’s see how to enable the feature and set configuration parameters correctly. The core directive spark.sql.adaptive.skewJoin.enabled is set to true by default. As in the previous cases, it is sufficient to enable AQE (spark.sql.adaptive.enabled) to take advantage of the optimization in question.

To find out what policy Spark uses to identify a skewed partition, simply analyze the sources of the org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin class (class that extends Rule[SparkPlan]).

where ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR corresponds to the configuration property spark.sql.adaptive.skewJoin.skewedPartitionFactor and SKEW_JOIN_SKEWED_PARTITION_THRESHOLD to spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes which ideally needs to be larger than the property spark.sql.adaptive.advisoryPartitionSizeInBytes we’ve already seen about the shuffle partitions coalesce. By default, the threshold is set to 256Mb.

To show how this optimization works, we’ll borrow the excellent example of the article “The Taming of the Skew” based on two hypothetical tables in a car manufacturer’s database:

That will be created ad-hoc with an imbalance in the number of records related to one of the join keys (represented by the brand (make) and model pair).

Let’s check that the keys are really unbalanced:

Now let’s try to join with AQE disabled (we also disable broadcast joins as the tables are very small and would frustrate the result of our experiment):

and let’s analyze the results. The longest task took, as we would have expected, a good 1.2 minutes out of 1.3 minutes total having processed most of the data concentrated on a single partition.

Now let’s repeat the experiment with AQE enabled and the configuration properties set appropriately based on the size of the sample data we’re using:

First of all, we note that the execution time drops from 1.3 min to 34 seconds. And that longer tasks now only take 4 seconds. Working fine-tuning on the parameters, I am convinced that we could get even better performance but for what was our purposes we stop here.

If you want to read the previous parts of the article:

Part 1- Dynamically coalescing shuffle partitions

Part 2 – Dynamically switching join strategies

Written by Mario Cartia – Agile Lab Big Data Specialist/Agile Skill Managing Director
 
 If you found this article useful, take a look at our blog and follow us on Agile Lab Engineering, our Medium Publication.

Agile Lab ospite di InnoTech, l’Hub di The European House Ambrosetti per parlare di Machine Learning e Big Data in ambito assicurativo

Agile Lab ospite di InnoTech, l’Hub di The European House Ambrosetti per parlare di Machine Learning e Big Data in ambito assicurativo

Alberto Firpo, Co-Founder & CEO di Agile Lab, è il protagonista dell’ episodio di “InnoTechCast – Leaders’ View on Innovation” per parlare di Machine Learning e Big Data applicati all’ambito assicurativo.

The European House Ambrosetti, all’interno di InnoTech Hub, riferimento per l’ecosistema dell’innovazione e della tecnologia italiana in Europa e nel mondo, nel bel mezzo della pandemia COVID, ha lanciato InnoTechCast, un nuovo format per raccontare in digitale il punto di vista dei principali leader della community italiana del mondo dell’innovazione e della tecnologia.

In questo podcast Alberto Firpo racconta come i dati raccolti dalle “black-box” installate sulle autovetture, grazie a modelli di Machine Learning e sistemi real-time, forniscano informazioni utili alle aziende del mondo assicurativo.

In particolare, attraverso la piattaforma WASP di Agile Lab, i dati raccolti in tempo reale consentono alle compagnie assicurative di attivare servizi per i clienti, in real-time, e formulare le polizze sulla base del “driving-behaviour“, il comportamento di guida del conducente rilevato attraverso modalità innovative.

Ascolta il podcast!

Real-time Analytics in applicazioni mission-critical: il caso Vodafone Automotive

 

Real-time Analytics in applicazioni mission-critical: il caso Vodafone Automotive

In occasione del Convegno conclusivo dell’ottava edizione dell’Osservatorio Big Data & Analytics del Politecnico di Milano, Alberto Firpo, CEO & Co-Founder Agile Lab, Yari Franzini, Regional Director Cloudera Italia e Paolo Giuseppetti, Head of Innovation and Connected Mobility Platform, Vodafone Automotive, intervistati da Irene Di Deo, Ricercatrice per gli Osservatori Digital Innovation, hanno illustrato l’innovativo progetto realizzato con la piattaforma WASP, Wide Analytics Streaming Platform, di Agile Lab.

Grazie all’utilizzo di questo sistema, Vodafone Automotive è stata in grado di utilizzare i dati raccolti dalle black box installate sulle vetture e trasformarli, in tempo reale, in informazioni utili per accrescere il livello dei servizi offerti ai propri clienti.

Play Video

La ricerca 2020 dell’Osservatorio Big Data & Business Analytics

L’obiettivo della Ricerca 2020, a cui ha collaborato Agile Lab in qualità di Sponsor, è stato quello di fotografare e comprendere lo stato dell’arte degli Analytics in Italia, in particolare di:

  • quantificare e analizzare il mercato Analytics in Italia, identificando i trend in atto;
  • indagare le applicazioni degli Analytics nei diversi settori e processi;
  • comprendere le principali evoluzioni tecnologiche in ambito Analytics;
  • stimare la diffusione di competenze e modelli organizzativi di gestione della Data Science;
  • comprendere il ruolo svolto dalle startup in ambito Analytics.

Per maggiori informazioni, visita il sito Osservatori.net.