Hold my Beer: Recommendation systems for beer lovers – 17th February 2021 | 6,30 PM

Do you wish there was a system that knew what your next favorite beer would be?
During the next meetup, you can figure it out and participate in creating your own beer recommendation system.

With Paolo Tomeo, Big Data Engineer with a Ph.D. in Recommendation Systems, Riccardo Fino, Data Scientist and Statistician, and Lorenzo Graziano, Data Engineer and Computer Scientist, you will be able to create your beer recommendation system.

This meetup offers a gentle introduction to recommendation systems and is well suited for those who have never had a chance to learn more about the topic. The more experienced ones will find extra insights and could help us and other participants with interesting stories and hints.

We hope to come out of this meetup having learned something from you as well.

Don’t miss the event: 17th February 2021 – 6,30PM.

SIGN UP SOON!

Hold my Beer: Kubernetes loves Ansible – 13th January 2021 | 6,30 PM

A new Hold my Beer event: next January 13th, at 6.30 PM, we will talk about how to configure a Kubernetes cluster using Ansible.

Speakers:

Mario Vitale -A DevOps Engineer @ Flixbus who loves playing music, fitness, videogames and open source. He will be the main speaker of the event.

Carlo Ventrella, SRE Engineer @ AgileLab will support Mario during the talk.

You: for a fruitful discussion, everyone can take part. The more, the merrier! 

Discover more and sign up soon!

Spark Remote Debugging

 

Spark Remote Debugging

Hi everybody! I’m a Big Data Engineer @ Agile Lab, a remote-first Big Data engineering and R&D firm located in Italy. Our main focus is to build Big Data and AI systems, in a very challenging — yet awesome — environment.

In Agile Lab we use in various projects, among other technologies, Apache Spark as a processing engine. Spark is fast and it is simple for a developer to write some code that can be run right away, but as for regular programs, it is not always easy to understand what a Spark job is doing.

This article will focus on how a developer can remotely debug a running Spark Scala/Java application (running on YARN) using IntelliJ IDEA, but all the Spark and environment configurations hold also for other IDEs.

Agent JDWP, licence to debug

To perform remote debugging of a Spark job, we leverage the JDWP agent (Java Debug Wire Protocol) that defines a communication protocol between a debugger and a running JVM. JDWP defines only the format and layout of packets exchanged by the debugger and the target JVM, while the transport protocol can be chosen by the user. Usually, the available transport mechanisms are shared memory (dt_shmem) and socket (dt_socket) but only the latter, which uses a TCP socket connection to communicate, can be used for remote debugging.

So in order to enable remote debugging, we must configure the target JVM with the following Java property in order to make it acting as a JDWP server to which our IDE can connect:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=4747

The property above tells the JVM to load the JDWP agent and wait for a socket connection on the specified port. In particular:

  • transport=dt_socket tells the agent to use socket as the desired transport mechanism.
  • server=y means that the JVM will act a JDWP server: it will listen for a debugger client to attach to it.
  • suspend=y tells the JVM if it must wait for a debugger connection before executing the main function. If this is set to false (n), the main function will start while listening for the debugger connection anyway.
  • address=4747 specifies the port at which the debug socket will listen on. In the example, the target JVM will listen on port 4747 for incoming client connections.

We will leverage the JDWP agent for all the following remote debugging scenarios, so remember that you can always adjust the configurations listed above to fit your use case.

You must choose your Spark deployment …but choose wisely

Before delving into the debug of your application, here’s a quick recap of how a Spark job executes on a cluster; each Spark job requires:
* a process called driver that performs all the standard Java code
* one or more executor processes that will perform all the code defined inside the transformations and actions of the RDDs/Datasets.

This means that in a realistic scenario we will have different JVMs running at the same time (often on different nodes): one for the driver and one for each executor.

A typical Spark application will run on multiple nodes, each containing one or more processes

Spark allows the developer to run a job in different modes depending on the requirement of the desired use case. In particular, we will focus on two configurations of the spark-submit command: deploy-mode and master.

Those two configurations allow a developer to decide how the Spark application will be deployed and run:

  • master: this configuration tells Spark which is the master URL of the cluster. A complete list of all the allowed values can be found in the official Spark documentation.
  • deploy-mode: Whether to deploy your driver on one of the worker nodes (cluster) or locally as an external client (client). In client mode, the driver is started in the same node where the spark-submit is launched.

Even in one of the allowed value for master is local, which allows running the application on the local machine specifying how many threads should be used, we will not explore it: there is no need for remote debugging if the Spark application runs with master local since everything runs on the same local JVM.

This article will focus only on Spark applications launched with a spark-submit run against a YARN cluster (master yarn configuration), but the same considerations will hold also for other resource managers.

Your honour, my client pleads not guilty

When the spark-submit command is invoked with client deploy-mode, Spark will spawn the driver in the client process that performed the submit command. Executors will spawn into nodes of the cluster depending on the resources associated with them.

In client mode, the driver is spawned in the same process used to start the spark-submit command

If you are performing the spark-submit command from an edge node of your cluster, you can debug the driver code by simply passing the JDWP agent configuration as a driver extra Java option:

spark-submit
    --class org.example.MyJob \
    --master yarn \
    --deploy-mode client \
    --conf "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \
    myJob.jar

The command above will start the Spark job with the driver running on the edge node and listening on port 4747 (this value is arbitrary, you can choose any available port number here). Now we can setup our IDE to start a remote debug towards the edge node IP and port 4747:

Define a remote debug configuration with the edge node IP as host

We can start debugging our code with the configuration just defined by clicking on the debug icon:

Always check that the chosen debug configuration is the remote one

Code execution will stop at each breakpoint we define in our code; remember that since we are debugging the driver we can set up breakpoints anywhere in our code except for the code defined inside RDDs/Datasets, that will be performed by the executors.

You and what nodes army?

To debug the executor code we can focus on how a Spark job behaves in cluster deploy-mode: the driver is spawned into one of the cluster nodes as well as the executors. So in this configuration connecting to the driver or one of the executors will require us to check where the processes are actually running.

In cluster mode, the driver will be spawned in one of the cluster nodes, as done for the executors

If we need to debug the driver we can pass the same configurations presented above for the client mode, while if we need to debug one of the executors we should pass the agent properties in the executor options instead of the driver options:

spark-submit
    --class org.example.MyJob \
    --master yarn \
    --deploy-mode client \
    --conf "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \
    myJob.jar

As discussed, in cluster mode if we want to debug the driver or one of the executors we first need to find out where the actual process is running. To do so we can leverage the Spark UI: whenever we start the job we can access its Spark UI using the link printed by the spark-submit command.

Then we can access the Executors section of the Spark UI, where all the running processes associated to our job are listed. From here we can see the driver node and all the nodes where executors are running, so we can find the executor IP and use it in our IDE debug configuration.

In this case the driver was spawned on cluster.node2 and the single executor on cluster.node1

At this point, we need to check the Spark UI to find out the IP addresses of the desired nodes. Since we defined the port in the agent configuration, we should keep the same debug port, changing only the IP address.

If we need to debug the executor code, since in the Spark UI we saw that the executor was cluster.node1, we can set that address in the remote debug configuration

To simplify the debugging mechanism it is advised to start your Spark job with only one executor: debugging a Spark job with multiple executors dramatically increases its complexity. For example, if two executors are spawned on the same node, they will have the same IP address and debug port, so it could lead to inconsistencies. For debug purposes, you could scale it down to only one executor.

spark-submit
    --class org.example.MyJob \
    --master yarn \
    --deploy-mode client \
    --num-executors 1 \
    --conf "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \
    myJob.jar

If we decide to debug one of the executors, code execution will stop at each breakpoint we define inside RDDs/Datasets transformations and actions; remember that since we are debugging the executor, all breakpoints set up outside RDDs/Datasets will not be reached since that code will be performed only by the driver process.

In a cluster far, far away…

In a real scenario, the Spark job will run inside a cluster which is not accessible from the outside world: you can only access a cluster edge node, but you are not able to access directly the nodes where the driver and executors processes will run. The edge node can access the nodes since it is co-located in the cluster.

In a real scenario, the cluster is not accessible from the outside and you can only communicate with the edge nodes of the cluster

In this situation we need to take advantage of a mechanism called port forwarding: we can forward the port of our target node to a port of the edge node. In this way, we can use our edge node (for which we have access) as a proxy for the desired target node (for which we don’t).

A very common way to perform the port forwarding is to use ssh:

ssh -L 4848:target.node:4747 user@edge.node

The command above, run from your local machine, will connect to the edge node (edge.node) with the specified username (user). Obviously you must provide your identity to the remote machine, the edge node, using one of several methods depending on the protocol version used. The -L specifies that the given port on the local host (4848) has to be forwarded to the given host (target.node) and port (4747) on the remote side.

In summary, in order to perform remote debugging of your code for a Spark job running in a cluster like the one described above you will need to:

  1. add the desired agent configuration to the spark-submit command
  2. start the job
  3. open the Spark UI and find out where your process is running
  4. use the ssh command to forward the port specified in the agent from the target node to your local machine through the edge node
  5. start the remote debug from your IDE using as IP and port localhost and the forwarded port
Since we forwarded the remote port 4747 to our local port 4848 we can edit the remote debug configuration to listen for localhost on port 4848

If you made it this far, you may be interested in other Spark-related articles that you can find on our blog.

Stay tuned because other Spark articles are coming!

Written by Lorenzo Pirazzini – Agile Lab Big Data Engineer

Hold my Beer: Kafka – 10th December 2020 | 7 PM

We are excited to launch our first Hold my Beer: next December 10th, at 7 PM, we will explore 
Kafka world drinking a beer together.

We are all swamped with online webinars, so we have planned something different, more relaxed,
where everyone can participate and liven up the discussion.

Are you proud of a problem solved using Kafka and want to share it with us (maybe letting the code talk)? Are you looking for advice? Have you got questions or just curiosities? Do you just want to get an idea of what Kafka is? This is the event for you!

P.S. We recommend you to to pick also a good beer (we promise that when we will be able to set up a live event, we will also bring beers for everyone!).

Click here to sign up!

 

Scala ‘fun’ error handling

Hi everybody! I’m Antonio Murgia, a Big Data Architect @ Agile Lab, a remote-first Big Data engineering and R&D firm located in Italy. Our main focus is to build Big Data and AI systems, in a very challenging — yet awesome — environment.

As the title suggests, this is the first post of a series about error handling in a functional way in Scala.

Dealing with failures is one of the harshest things we face in life. Coding is not different than living: at some point, things will go bad and as a software practitioner, you should expect such failures and be prepared. You will never remember when your application ran fine for years without crashing or producing bugs, but you will surely remember the problem that woke you up in the middle of the night and that could’ve been easily avoided by properly dealing with errors and failures in advance.

As a software engineer, I’ve seen my code failing many more times that I would like to admit, almost every piece of technology that your code interacts with, or is made of, can fail. The big difference between stable and fragile applications (when it comes to “code”) is in how they deal with failures.

In this post series (that was born from a workshop I kept at my company) I will show a couple of ideas that make it possible to deal with failures in a very simple, opinionated and explicit way. All of this without sacrificing code readability and composability. The code snippets are written in Scala, a JVM language made (in)famous by Apache Spark, which is our favourite data processing framework at AgileLab, and a functional programming library for Scala named Cats ?.

Disclaimer

Tech stack used in the snippets:

  • Scala 2.11
  • Typelevel Cats 2.0.0 (which is cross-published among 2.11, 2.12 and 2.13)

What this article is not:

  • A guide to Typelevel Cats
  • A category theory article

My objective is to be pragmatic, I want you to read this article and then try these things (and hopefully profit from them).

Our Legacy

Scala is a JVM language with full-interop with Java, therefore we cannot start an article about error handling in functional Scala without explaining how errors are dealt with in Java.

Checked Exceptions

If I try to write a simple program in Java that reads the first 3 lines of a txt file, I come across a very nice feature of the Java programming language: checked exceptions!

This code does not compile ?

Why is that? That’s because some methods that we are using explicitly declare to throw exceptions. In fact, if we give a look at the FileReader constructor signature:

If you’ve ever written any Java, you know that there is an “easy” solution to this problem:

Adding the throws clause to your method definition is enough to make someone else care about the exception.

We have a saying in Italy: Fatta la legge trovato l’inganno that in English translates into something like: Every law has a loophole.

In fact, not all Java exceptions have this useful property (that reflects in the method signature): there are also unchecked exceptions that do not “annoy” the programmer and therefore tend to be ubiquitous.

A very bad and very common practice, in fact, is the following:

Please never do that! You are hiding the fact that the print3lines method can fail, and this is a piece of information that is really precious for whoever is using that method.

Even if checked exceptions are somehow “good” because they appear in the method signatures, exceptions in general are a “bad idea” to deal with expected errors (i.e. parsing a String into a Int , which can fail in obvious and totally unexpected ways), since every thrown exception is an uncoditional jump to the first caller that handles it. And we don’t like uncoditional jumps in our code, otherwise we would code in asm ?

To summarize what we have seen so far:

What about Scala?

Scala is based on the JVM and it’s interoperable with Java, but it has some fundamental differences, one of which is in how it handles exceptions. Scala has ONLY unchecked exceptions.

But Scala also has an exceptional type system (pun intended). Let’s leverage it!

In order to dive deeper in what I’m going to present you, we will need to learn a couple of concepts. Don’t be scared, that’s easy stuff!

ADT — Algebraic Data Types

Quoting from Wikipedia:

In computer programming, especially functional programming and type theory, an algebraic data type is a kind of composite type, i.e., a type formed by combining other types.

Two common classes of algebraic types are:

  • Product Types (tuples and records)
  • Sum Types (tagged/disjoint unions or coproducts)

I know, the names are scary, but in fact they embody a really simple concept. Let’s start from the idea that types are domains. For example, the Boolean type domain is {true, false}, while Int domain is [-2³¹, 2³¹). Unit domain, which is the Scala version of void, is only: (), Nothing (which is the type without instances) it’s an empty domain. Each of these domains is finite and therefore has a size (all types are finite domains, since computer memory is finite).

Product types in Scala are case classes or tuples, and they do exactly what you expect them to do with the domain of their inner types:

Sum types in Scala are sealed hierarchies or enumerations, and they do exactly what you expect them to do with the domain of their inner types:

 

Now, it’s enough with the theory, why is this important?

  • We keep “domain”s of every function in your head while reasoning about code
  • The smaller the domains, the easier the reasoning and (also) built-in docs
  • If you want to stress this, have a look at refined

. . .

So, back to “checked exceptions”, in Scala we can emulate them at the type level as follows:

which is logically equivalent to its Java counterpart:

If you configure the Scala compiler to fail on warnings (i.e. -Xfatal-warnings) you will also get the same behaviour you would have when trying not to deal with checked exceptions in Java. In fact, if you try to write the following code:

Compilation fails due to:

Since the error is now encoded in the type system, it’s very easy for IDEs to help you deal with it:

Ok, but that is a lot of boiler-plate!

That’s why Scala has built-in generic sum types that come to help:

  • Option[A]
  • Either[A, B]

Option[A]

Either[A, B]

Even if Option might not be the best option (my puns are always intended) to deal with failures, or things that “went wrong”, they can be fine when we do not need in any way to communicate the “kind” of error that happened.

Using Either we can rewrite the previous toy program as follows:

Now let’s have a quick look at Either and Option and what they can offer us.

A (de)tour into Either and Option

If you are already “fluent” in Either and Option usage you can skip this chapter ?

Other than pattern matching, Either and Option offer many more idiomatic ways to interact with them.

Map

Option

Either

Right-biased map (default since Scala 2.12)

or Left-biased map

The effect of mapping a “side” (or projection) of the Either or an Option is:

  • if the “side” you’re trying to map is “empty” the computation is short-circuited

  • If the side is “populated” then the transformation is performed. Remember that if you map a Right, it is going to remain a Right and same applies for a Left: you can change the content, but not the type of the container.

flatMap

Option

Either

Right biased flatMap (default since Scala 2.12)

or Left biased flatMap

The effect of flatMapping a “side” (or projection) of the Either or an Option is:

  • if the “side” you are trying to flatMap is empty, the computation is short-circuited:

  • If the side is “populated” then the transformation is performed. Remember that using flatMap you are able to transform a Left into a Right or vice-versa.

Either <-> Option

Let’s reason about Option and Either in terms of ADTs ?

What is Option[A] domain?

  • A{None}

What’s the domain of Either[L, R]?

  • RL

They are quite similar, aren’t they? What if we pick a particular L ? For example let’s pick Unit, the domain of Either[Unit, A] is:

  • A{()}

Which carries the same quantity of information as Option[A].

If we all agree on this, we can say that it’s always possible to turn an Option into an Either providing the missing side value, and viceversa it’s always possible to go from an Either to an Option “losing” one side of the Either. Thankfully Scala knows this better than us, therefore there are idiomatic ways of doing so:

If the option is empty you get Left(x), otherwise you get a Right with the content of the Option. Its dual operation is toLeft :

There are similar methods available on Either projections:

Getting out

When you get to the “end of the world” you will need to do something with the content of the Either and handle both cases (i.e. Left and Right).

We’ve already seen the most known way of “getting out” an Either which is match:

Another way to extract the result from an Either is to use fold:

or merge if A and B are of the same type (or you care about the first common supertype) which is the same as doing:

If instead of using match or folding, you are doing every time something like

or

you are defeating the purpose of using Either. If that happens, don’t bother using Either ?.

The same applies to Option: calling Option.get defeats the purpose of using Option at all, because you are assuming that the Option is Some .

“Real life” programs

Ok, so far so good, but you haven’t shown me a real program — you might think — so let’s try to build something more complex.

Given a list of integers, if the sum of all elements is higher than 100, take the head and then multiply it by 30 otherwise multiply it by -30.

A very simple direct approach would be the following:

Let’s try to see this “program” from the caller perspective:

  • can it fail?
  • if it can fail, which are the error cases?

Looking only at the signature, the answer is: no, this program cannot fail. We have to look at the implementation to actually “spot” were errors can happen and exceptions can be thrown.

Another Approach

The error ADT

The implementation

or using some syntactic sugar:

Now, let’s try to answer the same questions:

  • can it fail?
  • if it can fail, which are the error cases?

Now we can answer the questions just looking at the signature!

Yes, the program can fail ( Either can be Right or Left) and there is just one failure case, the EmptyList.

Bonus — The best code is the code you don’t write

I’m not alone in thinking that the best code is the one you do not have to write or maintain. I would extend saying that the easiest error to handle is the one someone else handles for you. Like in the program we’ve just written, there are cases in which you can avoid to care about errors at all, because you make them not even possible restricting the input of your program.

How can we make our program infallible? Well, we can accept only non empty lists. Cats has what you need: NonEmptyList a list that cannot be empty (a.k.a. Nel ).

Now our program “cannot” fail because we pushed the accountability to obtain a non empty list to the boundaries of our program, we avoid at compile time the possibility to fail.

Very complicated programs

Let’s see now how we can compose a lot of functions that “return” Either, because if you are going to follow my hints, you’re are going to compose a lot of Either s!

Let’s assume you have the following functions already implemented:

And you have to implement f combining all of them:

The most straightforward way to write the program would be something like the following:

A part for the fact that we are not dealing with errors and that this program does not compile at all, it surely has one big advance: the happy path is very “easy” to understand (this is why people say that exceptions are easy, they just ignore them most of the time), whoever reads this code can understand what’s going on. As Martin Fowler says:

Any fool can write code that a computer can understand. Good programmers write code that humans can understand.

We can use match to compose all our function calls like this:

But that’s a lot of boiler plate to write! It’s obvious that the first (wrong) implementation is much more readable.

We can try using flatMap then:

This indeed looks better, but to me it looks a lot like:

Let’s try with a for comprehension:

Do we agree that this implementation has the readability of the first (wrong) implementation and the advantages of actually dealing with errors in a way that reflects on the type system and therefore in the function signatures?

Conclusions — so far

Sticking to these practices is very easy to follow the business logic flow, without compromising on error handling. It is also straightforward to handle errors or just demand the caller to do so. Furthermore, totally avoiding exceptions allows us to not defensively try/catch every method call.

These patterns are particularly useful on Spark batch and streaming workloads were if only one out of 10 billion rows throws an exception the whole application will fail and/or retry to process the same data indefinitely wasting time and resources in vain.

That was enough for this chapter of the series (even if there weren’t any ? around)!

In the next post, we will see how we can combine functions which can fail in a way that is not fail-fast (i.e. stop at the first error) and also how to deal with collections of results.

Stay tuned!

Written by Antonio Murgia – Agile Lab Big Data Architect
If you found this article useful, take a look at our blog and follow us on our Medium Publication, Agile Lab Engineering!