Agile Lab has been recognised as one of Italy’s fastest growing companies by the nations’s leading economic newspaper “Il Sole 24 Ore” and the provider of market and consumer data “Statista”.Continue reading
Real-Time Analytics in Mission Critical Applications
for Usage-Based Insurance (UBI) services
“The IOT generates large amounts of data and above all the expectation, for those who use them, to have processed and transformed them into useful information in real time. Therefore, we need solutions that are scalable in terms of economics and efficiency and, at the same time, guarantee the expected results. The choice of partners such as Agile Lab, always aware of innovation and with a solid expertise in new technologies, allows us to achieve ambitious results and to adopt innovative solutions for the challenges of tomorrow.”
Head of Innovation and Connected Mobility Platform, Vodafone Automotive
In order to to provide insurance companies with specific risk profiles for every driver, extracted from on-board black boxes, Vodafone Automotive needs a whole new systems for acquisition and processing of telemetry data, capable of Big Data collection, management and analysis in real time for over 227 million weekly mileage and driving related messages.
The new architecture, based on Cloudera, adding the components of Apache, Kafka, Spark and the combination of HDFS and HBase, has been specifically designed to be available on-premises. The Cloud Computing branch of Vodafone handles the management and maintenance of the environments, while the company relies on its ongoing collaboration with AgileLab for the application services development and management.
This new architecture – introducing a more flexible and innovative platform – has enabled the company to meet the level of service expected by Clients (i.e. a maximum of 15 minutes of latency between data generation and its reception on the Client’s Cloud, including cross-time on both mobile networks and the Internet) and it has become the solid foundation for developing services that the company wouldn’t otherwise have been able to offer.
Vodafone Automotive, part of Vodafone Group, operates in the specialized segment of Vodafone IoT, and it focuses on providing IT platforms to the mobility world. The company supplies telecommunication services and products targeted to the automotive sector. In particular, it offers stolen vehicle assistance and recovery services, theft prevention, crash and vehicle accident management services. Specifically for insurance-related services, it provides analytical functions for driving habits and styles, risk-management assessments, as well as a wide scope of vehicle management services (i.e. maintenance and management life cycle) for both fleet and OEM manufacturers (of automotive on board electronics). The main focus of our case study is on Usage Based Insurance (UBI).
UBI aims to address the traditional trade-off between the driver’s privacy and the cost of the insurance plan, a key aspect of the car insurance sector. Vodafone Automotive is able to provide insurance companies with different driving style profiles by collecting multiple information, as for instance the location and acceleration of a car, by installing on board the vehicle an electronic device (the black box).
Through this information, Vodafone Automotive helps insurance companies create a “score” that represents with the outmost precision the risk associated with the type of driver and therefore the single insurance plan, also providing data on the type of roads traveled, driving times, and many more.
The project was born in light of a necessity for extracting the maximum value from the data generated by on-board devices (often called black boxes, like the flight recorders on airplanes), to better cater to the needs of insurance companies. They can therefore utilize this data for policy pricing, (computationally, is organized in time intervals, with pre-established elaboration cycles and post-elaboration submission of data sets as per agreed with the company and used, for example, at the time of policy renewal, quarterly or even annually), but also to offer new services to their subscribers, strengthening and enhancing the customer experience. For example, by sending alerts related to the level of danger of a certain area (i.e. where the client may have parked), or localized weather-related alerts (such as hail alerts).
In compliance with Vodafone Automotive’s goal to increase safety on the street, the company launched this project to revise and revolutionize their systems for acquisition and processing of telemetry data (generated by systems previously installed by Vodafone Automotive on insured vehicles) by introducing the features and capabilities offered by the Cloudera platform to collect, manage and analyze in real-time the Big Data sent by the installed black boxes.
The Vodafone Automotive project, started in 2017, was aimed at deploying, managing and consolidating a platform able to collect and elaborate big quantities of data, to help insurance companies risk evaluation process both for issuing insurance plans and offering real-time services to their customers. The project led to the replacement of the previous architecture with a newer and innovative one, based on Cloudera, adding the components of Apache, Kafka, Spark and the combination of HDFS and HBase (architectural model “Lambda”), and later on also NIFI – which can elaborate data with a latency of a few seconds, regardless of their quantity or frequency. The primary feature of this platform is its ability to flexibly manage high volumes of data, and to be able to expand and grow according to the company’s evolving needs. Data processing occurs mainly through Apache Spark, which captures the data and processes them after their extraction from Code Kafka. Afterwards, the platform drops the base data on a distributed HDFS file system. Whereas processed data are saved on the NoSQL Database, achieving impressive performance results. The collected data are then sorted through the Lambda architecture, enabling both real-time data processing and effective storage for future re-processing needs. To accomplish the latter function, the architecture relies on NoSQL HBase. It should be noted that the primary data processing reconstructs the driver’s path from localization and speed data, and geographical information acquired through GPS system and the accelerometer in the vehicle’s black box.
Additional operations are required to guarantee the reliability of collected data: it is fundamental, for instance, to perform data cleansing and preparation, in order to spot any device malfunctions or differentiate between a pothole and a car’s impact (and consequently understand whether or not to provide assistance to the driver). The new architecture has been specifically designed to be available on-premises, and its related servers are placed in Vodafone Group’s Technology Hub in Milan, (for Vodafone Italy), which hosts the VFIT services. Also, a back-up server cluster has been created in the twin data center of Vodafone, as part of the disaster recovery plan. The Cloud Computing branch of Vodafone handles the management and maintenance of the environments (GDC – Group Data Center, where the data processing resources are being implemented. Vodafone caters to the Southern European market through this structure), while the company relies on its collaboration with AgileLab for the application services development and management. As a matter of fact, the architectural evolution that Vodafone Automotive implemented allowed the company to not only effectively handle high volumes of data, but also represented a qualifying element to guarantee the availability of real-time analyzed and processed data to insurance companies. Thanks to the new platform, today insurance companies are able to receive real-time information on the driving habits of their client, with a latency of mere minutes from the event registration in the vehicle’s onboard black box.
The following are some figures from our project:
- over 33 million messages per day
- 227 million weekly mileage and driving-related messages (for insured clients)
- 130 terabytes of data collected in 3 years.
From a management point of view, the project has required the involvement of a dedicated team, that focuses exclusively on the design and development of new architectural scenarios. This organizational choice and the collaboration with AgileLab – which took charge of every detail regarding the planning, engineering, and optimization of the application platform – played a key role in the success of the project. After the project launched, the team created by Vodafone Automotive to manage the development phases of the project, joined the IT of the company to work in the areas of Project Management, Development, Operations.
The greatest challenge faced by the company has been the need to integrate numerous recent technologies into its existing information system kit. The IT department was required to manage a series of new tools and platforms, and take all the necessary steps (also from a training perspective) to both maintain and employ those technologies to their fullest potential.
Achieved Results and Benefits
First of all, the new architecture – introducing a more flexible and innovative platform – has enabled the company to meet the level of service expected by Clients (i.e. a maximum of 15 minutes of latency between data generation and its reception on the Client’s Cloud, including cross-time on both mobile networks and the Internet). In addition, the new architecture has become the solid foundation for developing services that the company wouldn’t otherwise have been able to offer. It allowed Vodafone Automotive to acquire a definitive competitive advantage, positioning itself as one of the most innovative players on the market.
Among the potential evolutions of the platform, there is the possibility of adding a Machine Learning function to be applied to reliability and data quality check processes, even in streaming mode (as they occur). The introduction of automatic learning techniques would allow the company to identify any possible device malfunction a lot more quickly, becoming proactive in the process of maintenance and replacement, when needed, of the black box. This would also bring about the added benefit of avoiding corrective measures on corrupted data ingested because of device errors or malfunctions.
This case study was published in Italian by the Management Engineering department of Milan’s Polytechnic University as part of the 2020 Business case of Big Data and Business Analytics Digital Innovation Observatories of the of the School of Management of Politecnico ( Copyright © Politecnico di Milano / Dipartimento di Ingegneria Gestionale)
Do you wish there was a system that knew what your next favorite beer would be?
During the next meetup, you can figure it out and participate in creating your own beer recommendation system.
With Paolo Tomeo, Big Data Engineer with a Ph.D. in Recommendation Systems, Riccardo Fino, Data Scientist and Statistician, and Lorenzo Graziano, Data Engineer and Computer Scientist, you will be able to create your beer recommendation system.
This meetup offers a gentle introduction to recommendation systems and is well suited for those who have never had a chance to learn more about the topic. The more experienced ones will find extra insights and could help us and other participants with interesting stories and hints.
We hope to come out of this meetup having learned something from you as well.
Don’t miss the event: 17th February 2021 – 6,30PM.
A new Hold my Beer event: next January 13th, at 6.30 PM, we will talk about how to configure a Kubernetes cluster using Ansible.
– Mario Vitale -A DevOps Engineer @ Flixbus who loves playing music, fitness, videogames and open source. He will be the main speaker of the event.
– Carlo Ventrella, SRE Engineer @ AgileLab will support Mario during the talk.
– You: for a fruitful discussion, everyone can take part. The more, the merrier!
Tech Talk with Neodata Group: The journey to Elasticsearch
In this first episode of our first Tech Talk, we hosted Neodata Group to talk about their journey to Elasticsearch, the open-source search and analytics engine for all types of data.
We explored some interesting use cases on data & analytics.
See the full video and stay tuned to discover all Agile Lab’s events on our blog!
Spark Remote Debugging
Hi everybody! I’m a Big Data Engineer @ Agile Lab, a remote-first Big Data engineering and R&D firm located in Italy. Our main focus is to build Big Data and AI systems, in a very challenging — yet awesome — environment.
In Agile Lab we use in various projects, among other technologies, Apache Spark as a processing engine. Spark is fast and it is simple for a developer to write some code that can be run right away, but as for regular programs, it is not always easy to understand what a Spark job is doing.
This article will focus on how a developer can remotely debug a running Spark Scala/Java application (running on YARN) using IntelliJ IDEA, but all the Spark and environment configurations hold also for other IDEs.
Agent JDWP, licence to debug
To perform remote debugging of a Spark job, we leverage the JDWP agent (Java Debug Wire Protocol) that defines a communication protocol between a debugger and a running JVM. JDWP defines only the format and layout of packets exchanged by the debugger and the target JVM, while the transport protocol can be chosen by the user. Usually, the available transport mechanisms are shared memory (dt_shmem) and socket (dt_socket) but only the latter, which uses a TCP socket connection to communicate, can be used for remote debugging.
So in order to enable remote debugging, we must configure the target JVM with the following Java property in order to make it acting as a JDWP server to which our IDE can connect:
The property above tells the JVM to load the JDWP agent and wait for a socket connection on the specified port. In particular:
transport=dt_sockettells the agent to use socket as the desired transport mechanism.
server=ymeans that the JVM will act a JDWP server: it will listen for a debugger client to attach to it.
suspend=ytells the JVM if it must wait for a debugger connection before executing the main function. If this is set to false (n), the main function will start while listening for the debugger connection anyway.
address=4747specifies the port at which the debug socket will listen on. In the example, the target JVM will listen on port 4747 for incoming client connections.
We will leverage the JDWP agent for all the following remote debugging scenarios, so remember that you can always adjust the configurations listed above to fit your use case.
You must choose your Spark deployment …but choose wisely
Before delving into the debug of your application, here’s a quick recap of how a Spark job executes on a cluster; each Spark job requires:
* a process called driver that performs all the standard Java code
* one or more executor processes that will perform all the code defined inside the transformations and actions of the RDDs/Datasets.
This means that in a realistic scenario we will have different JVMs running at the same time (often on different nodes): one for the driver and one for each executor.
A typical Spark application will run on multiple nodes, each containing one or more processes
Spark allows the developer to run a job in different modes depending on the requirement of the desired use case. In particular, we will focus on two configurations of the spark-submit command: deploy-mode and master.
Those two configurations allow a developer to decide how the Spark application will be deployed and run:
- master: this configuration tells Spark which is the master URL of the cluster. A complete list of all the allowed values can be found in the official Spark documentation.
- deploy-mode: Whether to deploy your driver on one of the worker nodes (cluster) or locally as an external client (client). In client mode, the driver is started in the same node where the spark-submit is launched.
Even in one of the allowed value for master is local, which allows running the application on the local machine specifying how many threads should be used, we will not explore it: there is no need for remote debugging if the Spark application runs with master local since everything runs on the same local JVM.
This article will focus only on Spark applications launched with a spark-submit run against a YARN cluster (master yarn configuration), but the same considerations will hold also for other resource managers.
Your honour, my client pleads not guilty
When the spark-submit command is invoked with client deploy-mode, Spark will spawn the driver in the client process that performed the submit command. Executors will spawn into nodes of the cluster depending on the resources associated with them.
In client mode, the driver is spawned in the same process used to start the spark-submit command
If you are performing the spark-submit command from an edge node of your cluster, you can debug the driver code by simply passing the JDWP agent configuration as a driver extra Java option:
spark-submit --class org.example.MyJob \ --master yarn \ --deploy-mode client \ --conf "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \ myJob.jar
The command above will start the Spark job with the driver running on the edge node and listening on port 4747 (this value is arbitrary, you can choose any available port number here). Now we can setup our IDE to start a remote debug towards the edge node IP and port 4747:
Define a remote debug configuration with the edge node IP as host
We can start debugging our code with the configuration just defined by clicking on the debug icon:
Always check that the chosen debug configuration is the remote one
Code execution will stop at each breakpoint we define in our code; remember that since we are debugging the driver we can set up breakpoints anywhere in our code except for the code defined inside RDDs/Datasets, that will be performed by the executors.
You and what nodes army?
To debug the executor code we can focus on how a Spark job behaves in cluster deploy-mode: the driver is spawned into one of the cluster nodes as well as the executors. So in this configuration connecting to the driver or one of the executors will require us to check where the processes are actually running.
In cluster mode, the driver will be spawned in one of the cluster nodes, as done for the executors
If we need to debug the driver we can pass the same configurations presented above for the client mode, while if we need to debug one of the executors we should pass the agent properties in the executor options instead of the driver options:
spark-submit --class org.example.MyJob \ --master yarn \ --deploy-mode client \ --conf "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \ myJob.jar
As discussed, in cluster mode if we want to debug the driver or one of the executors we first need to find out where the actual process is running. To do so we can leverage the Spark UI: whenever we start the job we can access its Spark UI using the link printed by the spark-submit command.
Then we can access the Executors section of the Spark UI, where all the running processes associated to our job are listed. From here we can see the driver node and all the nodes where executors are running, so we can find the executor IP and use it in our IDE debug configuration.
In this case the driver was spawned on cluster.node2 and the single executor on cluster.node1
At this point, we need to check the Spark UI to find out the IP addresses of the desired nodes. Since we defined the port in the agent configuration, we should keep the same debug port, changing only the IP address.
If we need to debug the executor code, since in the Spark UI we saw that the executor was cluster.node1, we can set that address in the remote debug configuration
To simplify the debugging mechanism it is advised to start your Spark job with only one executor: debugging a Spark job with multiple executors dramatically increases its complexity. For example, if two executors are spawned on the same node, they will have the same IP address and debug port, so it could lead to inconsistencies. For debug purposes, you could scale it down to only one executor.
spark-submit --class org.example.MyJob \ --master yarn \ --deploy-mode client \ --num-executors 1 \ --conf "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \ myJob.jar
If we decide to debug one of the executors, code execution will stop at each breakpoint we define inside RDDs/Datasets transformations and actions; remember that since we are debugging the executor, all breakpoints set up outside RDDs/Datasets will not be reached since that code will be performed only by the driver process.
In a cluster far, far away…
In a real scenario, the Spark job will run inside a cluster which is not accessible from the outside world: you can only access a cluster edge node, but you are not able to access directly the nodes where the driver and executors processes will run. The edge node can access the nodes since it is co-located in the cluster.
In a real scenario, the cluster is not accessible from the outside and you can only communicate with the edge nodes of the cluster
In this situation we need to take advantage of a mechanism called port forwarding: we can forward the port of our target node to a port of the edge node. In this way, we can use our edge node (for which we have access) as a proxy for the desired target node (for which we don’t).
A very common way to perform the port forwarding is to use ssh:
ssh -L 4848:target.node:4747 firstname.lastname@example.org
The command above, run from your local machine, will connect to the edge node (edge.node) with the specified username (user). Obviously you must provide your identity to the remote machine, the edge node, using one of several methods depending on the protocol version used. The -L specifies that the given port on the local host (4848) has to be forwarded to the given host (target.node) and port (4747) on the remote side.
In summary, in order to perform remote debugging of your code for a Spark job running in a cluster like the one described above you will need to:
- add the desired agent configuration to the spark-submit command
- start the job
- open the Spark UI and find out where your process is running
- use the ssh command to forward the port specified in the agent from the target node to your local machine through the edge node
- start the remote debug from your IDE using as IP and port localhost and the forwarded port
Since we forwarded the remote port 4747 to our local port 4848 we can edit the remote debug configuration to listen for localhost on port 4848
If you made it this far, you may be interested in other Spark-related articles that you can find on our blog.
Stay tuned because other Spark articles are coming!
Written by Lorenzo Pirazzini – Agile Lab Big Data Engineer
Il caso Vodafone Automotive: Real-time Analytics in applicazioni mission Critical con Agile Lab WASP e Cloudera
Si è da poco conclusa l’edizione 2020 dell’Osservatorio Big Data & Business Analytics del Politecnico di Milano, da anni un punto di riferimento per il mondo della ricerca e non solo, che, come ogni anno, ha delineato l’evoluzione del mercato e portato alla ribalta interessanti use case realizzati in ambito Big Data.
Tra questi è stato inserito il progetto realizzato da Vodafone Automotive, l’azienda del noto gruppo che si occupa di fornire servizi, prodotti ed architetture telematiche afferenti al mondo della mobilità. Grazie all’impiego della piattaforma WASP – Wide Analytics Streaming Platform – di Agile Lab e alla tecnologia Cloudera, Vodafone Automotive è stata in grado di gestire la grande mole di dati provenienti dalle black box installate sulle vetture per raccogliere, analizzare ed elaborare un’ingente mole di dati ed erogare servizi ai propri clienti in near-real-time.
Spark 3.0: First hands-on approach with Adaptive Query Execution (Part 3)
In the previous articles (1)(2), we started analyzing the individual features of Adaptive Query Execution introduced on Spark 3.0. In particular, we analyzed “dynamically coalescing shuffle partitions” and “dynamically switching join strategies”. Last but not least, let’s analyze what will probably be the most-awaited and appreciated feature:
Dynamically optimizing skew joins
To understand exactly what it is, let’s take a moment’s step back with the theory, remembering that a DataFrame is on Spark an abstraction from the concept of RDD, which is in turn a logical abstraction of the dataset that can be processed in a distributed way thanks to the concept of partition. A dataset on Spark, once “transformed” into RDD, is divided into partitions where ideally the data is distributed in a balanced way:
In practice, it is very common for RDDs generated by certain operations (such as key grouping) to be unbalanced:
The unfortunate consequence of this is that, since computation parallels are based on the concept of data partitioning, we will not be able to adequately exploit the resources of the cluster. Identifying such situations is fairly straightforward by using the Spark UI. How many of you who are regular users of Spark will certainly have found yourself in the unpleasant situation, during which the progress bar related to an operation stops for a long time on the last tasks, giving the feeling, in the most “serious” cases, that the job has gone into freeze.
Analyzing the tasks in detail, it is easy to identify the “incriminated” ones by looking at the duration, size and number of records processed that will be one or more orders of magnitude larger than all the others.
Techniques such as adding additional join keys (where possible) or key salting were used to resolve these situations without AQE. With an additional effort from developers for implementation, testing etc.
AQE mechanisms transparently discover and optimize implementation.
Let’s see how to enable the feature and set configuration parameters correctly. The core directive spark.sql.adaptive.skewJoin.enabled is set to true by default. As in the previous cases, it is sufficient to enable AQE (spark.sql.adaptive.enabled) to take advantage of the optimization in question.
To find out what policy Spark uses to identify a skewed partition, simply analyze the sources of the org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin class (class that extends Rule[SparkPlan]).
where ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR corresponds to the configuration property spark.sql.adaptive.skewJoin.skewedPartitionFactor and SKEW_JOIN_SKEWED_PARTITION_THRESHOLD to spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes which ideally needs to be larger than the property spark.sql.adaptive.advisoryPartitionSizeInBytes we’ve already seen about the shuffle partitions coalesce. By default, the threshold is set to 256Mb.
To show how this optimization works, we’ll borrow the excellent example of the article “The Taming of the Skew” based on two hypothetical tables in a car manufacturer’s database:
That will be created ad-hoc with an imbalance in the number of records related to one of the join keys (represented by the brand (make) and model pair).
Let’s check that the keys are really unbalanced:
Now let’s try to join with AQE disabled (we also disable broadcast joins as the tables are very small and would frustrate the result of our experiment):
and let’s analyze the results. The longest task took, as we would have expected, a good 1.2 minutes out of 1.3 minutes total having processed most of the data concentrated on a single partition.
Now let’s repeat the experiment with AQE enabled and the configuration properties set appropriately based on the size of the sample data we’re using:
First of all, we note that the execution time drops from 1.3 min to 34 seconds. And that longer tasks now only take 4 seconds. Working fine-tuning on the parameters, I am convinced that we could get even better performance but for what was our purposes we stop here.
If you want to read the previous parts of the article:
Agile Lab ospite di InnoTech, l’Hub di The European House Ambrosetti per parlare di Machine Learning e Big Data in ambito assicurativo
Alberto Firpo, Co-Founder & CEO di Agile Lab, è il protagonista dell’ episodio di “InnoTechCast – Leaders’ View on Innovation” per parlare di Machine Learning e Big Data applicati all’ambito assicurativo.
The European House Ambrosetti, all’interno di InnoTech Hub, riferimento per l’ecosistema dell’innovazione e della tecnologia italiana in Europa e nel mondo, nel bel mezzo della pandemia COVID, ha lanciato InnoTechCast, un nuovo format per raccontare in digitale il punto di vista dei principali leader della community italiana del mondo dell’innovazione e della tecnologia.
In questo podcast Alberto Firpo racconta come i dati raccolti dalle “black-box” installate sulle autovetture, grazie a modelli di Machine Learning e sistemi real-time, forniscano informazioni utili alle aziende del mondo assicurativo.
In particolare, attraverso la piattaforma WASP di Agile Lab, i dati raccolti in tempo reale consentono alle compagnie assicurative di attivare servizi per i clienti, in real-time, e formulare le polizze sulla base del “driving-behaviour“, il comportamento di guida del conducente rilevato attraverso modalità innovative.
Real-time Analytics in applicazioni mission-critical: il caso Vodafone Automotive
In occasione del Convegno conclusivo dell’ottava edizione dell’Osservatorio Big Data & Analytics del Politecnico di Milano, Alberto Firpo, CEO & Co-Founder Agile Lab, Yari Franzini, Regional Director Cloudera Italia e Paolo Giuseppetti, Head of Innovation and Connected Mobility Platform, Vodafone Automotive, intervistati da Irene Di Deo, Ricercatrice per gli Osservatori Digital Innovation, hanno illustrato l’innovativo progetto realizzato con la piattaforma WASP, Wide Analytics Streaming Platform, di Agile Lab.
Grazie all’utilizzo di questo sistema, Vodafone Automotive è stata in grado di utilizzare i dati raccolti dalle black box installate sulle vetture e trasformarli, in tempo reale, in informazioni utili per accrescere il livello dei servizi offerti ai propri clienti.
La ricerca 2020 dell’Osservatorio Big Data & Business Analytics
L’obiettivo della Ricerca 2020, a cui ha collaborato Agile Lab in qualità di Sponsor, è stato quello di fotografare e comprendere lo stato dell’arte degli Analytics in Italia, in particolare di:
- quantificare e analizzare il mercato Analytics in Italia, identificando i trend in atto;
- indagare le applicazioni degli Analytics nei diversi settori e processi;
- comprendere le principali evoluzioni tecnologiche in ambito Analytics;
- stimare la diffusione di competenze e modelli organizzativi di gestione della Data Science;
- comprendere il ruolo svolto dalle startup in ambito Analytics.
Per maggiori informazioni, visita il sito Osservatori.net.