Spark Remote Debugging

 

Spark Remote Debugging

Hi everybody! I’m a Big Data Engineer @ Agile Lab, a remote-first Big Data engineering and R&D firm located in Italy. Our main focus is to build Big Data and AI systems, in a very challenging — yet awesome — environment.

In Agile Lab we use in various projects, among other technologies, Apache Spark as a processing engine. Spark is fast and it is simple for a developer to write some code that can be run right away, but as for regular programs, it is not always easy to understand what a Spark job is doing.

This article will focus on how a developer can remotely debug a running Spark Scala/Java application (running on YARN) using IntelliJ IDEA, but all the Spark and environment configurations hold also for other IDEs.

Agent JDWP, licence to debug

To perform remote debugging of a Spark job, we leverage the JDWP agent (Java Debug Wire Protocol) that defines a communication protocol between a debugger and a running JVM. JDWP defines only the format and layout of packets exchanged by the debugger and the target JVM, while the transport protocol can be chosen by the user. Usually, the available transport mechanisms are shared memory (dt_shmem) and socket (dt_socket) but only the latter, which uses a TCP socket connection to communicate, can be used for remote debugging.

So in order to enable remote debugging, we must configure the target JVM with the following Java property in order to make it acting as a JDWP server to which our IDE can connect:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=4747

The property above tells the JVM to load the JDWP agent and wait for a socket connection on the specified port. In particular:

  • transport=dt_socket tells the agent to use socket as the desired transport mechanism.
  • server=y means that the JVM will act a JDWP server: it will listen for a debugger client to attach to it.
  • suspend=y tells the JVM if it must wait for a debugger connection before executing the main function. If this is set to false (n), the main function will start while listening for the debugger connection anyway.
  • address=4747 specifies the port at which the debug socket will listen on. In the example, the target JVM will listen on port 4747 for incoming client connections.

We will leverage the JDWP agent for all the following remote debugging scenarios, so remember that you can always adjust the configurations listed above to fit your use case.

You must choose your Spark deployment …but choose wisely

Before delving into the debug of your application, here’s a quick recap of how a Spark job executes on a cluster; each Spark job requires:
* a process called driver that performs all the standard Java code
* one or more executor processes that will perform all the code defined inside the transformations and actions of the RDDs/Datasets.

This means that in a realistic scenario we will have different JVMs running at the same time (often on different nodes): one for the driver and one for each executor.

A typical Spark application will run on multiple nodes, each containing one or more processes

Spark allows the developer to run a job in different modes depending on the requirement of the desired use case. In particular, we will focus on two configurations of the spark-submit command: deploy-mode and master.

Those two configurations allow a developer to decide how the Spark application will be deployed and run:

  • master: this configuration tells Spark which is the master URL of the cluster. A complete list of all the allowed values can be found in the official Spark documentation.
  • deploy-mode: Whether to deploy your driver on one of the worker nodes (cluster) or locally as an external client (client). In client mode, the driver is started in the same node where the spark-submit is launched.

Even in one of the allowed value for master is local, which allows running the application on the local machine specifying how many threads should be used, we will not explore it: there is no need for remote debugging if the Spark application runs with master local since everything runs on the same local JVM.

This article will focus only on Spark applications launched with a spark-submit run against a YARN cluster (master yarn configuration), but the same considerations will hold also for other resource managers.

Your honour, my client pleads not guilty

When the spark-submit command is invoked with client deploy-mode, Spark will spawn the driver in the client process that performed the submit command. Executors will spawn into nodes of the cluster depending on the resources associated with them.

In client mode, the driver is spawned in the same process used to start the spark-submit command

If you are performing the spark-submit command from an edge node of your cluster, you can debug the driver code by simply passing the JDWP agent configuration as a driver extra Java option:

spark-submit
    --class org.example.MyJob \
    --master yarn \
    --deploy-mode client \
    --conf "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \
    myJob.jar

The command above will start the Spark job with the driver running on the edge node and listening on port 4747 (this value is arbitrary, you can choose any available port number here). Now we can setup our IDE to start a remote debug towards the edge node IP and port 4747:

Define a remote debug configuration with the edge node IP as host

We can start debugging our code with the configuration just defined by clicking on the debug icon:

Always check that the chosen debug configuration is the remote one

Code execution will stop at each breakpoint we define in our code; remember that since we are debugging the driver we can set up breakpoints anywhere in our code except for the code defined inside RDDs/Datasets, that will be performed by the executors.

You and what nodes army?

To debug the executor code we can focus on how a Spark job behaves in cluster deploy-mode: the driver is spawned into one of the cluster nodes as well as the executors. So in this configuration connecting to the driver or one of the executors will require us to check where the processes are actually running.

In cluster mode, the driver will be spawned in one of the cluster nodes, as done for the executors

If we need to debug the driver we can pass the same configurations presented above for the client mode, while if we need to debug one of the executors we should pass the agent properties in the executor options instead of the driver options:

spark-submit
    --class org.example.MyJob \
    --master yarn \
    --deploy-mode client \
    --conf "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \
    myJob.jar

As discussed, in cluster mode if we want to debug the driver or one of the executors we first need to find out where the actual process is running. To do so we can leverage the Spark UI: whenever we start the job we can access its Spark UI using the link printed by the spark-submit command.

Then we can access the Executors section of the Spark UI, where all the running processes associated to our job are listed. From here we can see the driver node and all the nodes where executors are running, so we can find the executor IP and use it in our IDE debug configuration.

In this case the driver was spawned on cluster.node2 and the single executor on cluster.node1

At this point, we need to check the Spark UI to find out the IP addresses of the desired nodes. Since we defined the port in the agent configuration, we should keep the same debug port, changing only the IP address.

If we need to debug the executor code, since in the Spark UI we saw that the executor was cluster.node1, we can set that address in the remote debug configuration

To simplify the debugging mechanism it is advised to start your Spark job with only one executor: debugging a Spark job with multiple executors dramatically increases its complexity. For example, if two executors are spawned on the same node, they will have the same IP address and debug port, so it could lead to inconsistencies. For debug purposes, you could scale it down to only one executor.

spark-submit
    --class org.example.MyJob \
    --master yarn \
    --deploy-mode client \
    --num-executors 1 \
    --conf "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747" \
    myJob.jar

If we decide to debug one of the executors, code execution will stop at each breakpoint we define inside RDDs/Datasets transformations and actions; remember that since we are debugging the executor, all breakpoints set up outside RDDs/Datasets will not be reached since that code will be performed only by the driver process.

In a cluster far, far away…

In a real scenario, the Spark job will run inside a cluster which is not accessible from the outside world: you can only access a cluster edge node, but you are not able to access directly the nodes where the driver and executors processes will run. The edge node can access the nodes since it is co-located in the cluster.

In a real scenario, the cluster is not accessible from the outside and you can only communicate with the edge nodes of the cluster

In this situation we need to take advantage of a mechanism called port forwarding: we can forward the port of our target node to a port of the edge node. In this way, we can use our edge node (for which we have access) as a proxy for the desired target node (for which we don’t).

A very common way to perform the port forwarding is to use ssh:

ssh -L 4848:target.node:4747 user@edge.node

The command above, run from your local machine, will connect to the edge node (edge.node) with the specified username (user). Obviously you must provide your identity to the remote machine, the edge node, using one of several methods depending on the protocol version used. The -L specifies that the given port on the local host (4848) has to be forwarded to the given host (target.node) and port (4747) on the remote side.

In summary, in order to perform remote debugging of your code for a Spark job running in a cluster like the one described above you will need to:

  1. add the desired agent configuration to the spark-submit command
  2. start the job
  3. open the Spark UI and find out where your process is running
  4. use the ssh command to forward the port specified in the agent from the target node to your local machine through the edge node
  5. start the remote debug from your IDE using as IP and port localhost and the forwarded port
Since we forwarded the remote port 4747 to our local port 4848 we can edit the remote debug configuration to listen for localhost on port 4848

If you made it this far, you may be interested in other Spark-related articles that you can find on our blog.

Stay tuned because other Spark articles are coming!

Written by Lorenzo Pirazzini – Agile Lab Big Data Engineer

How to create an Apache Spark 3.0 development cluster on a single machine using Docker

Apache Spark is the most widely used in-memory parallel distributed processing framework in the field of Big Data advanced analytics. The main reasons for its success are the simplicity of use of its API and the rich set of features ranging from those for querying the data lake using SQL to the distributed training of complex Machine Learning models through the use the most popular algorithms.

Given this simplicity of using its API, however, one of the most frequently problem encountered by developers, similar to what happens with most distributed systems, is the creation of a development environment where you can test your applications by simulating the execution of code on multiple nodes of a cluster.

Although Spark provides a local execution mode, it may hide a number of issues due to the distributed mechanism of code execution, making testing ineffective. That’s why the most effective way to create a test environment that is more like a production cluster environment is to use Docker containers.

An additional benefit of using this approach is that you can test your applications on different versions of the framework by simply changing a few parameters in the configuration file.

In our example we will use version 3.0, the test cluster hypothetically could be useful in testing the compatibility of our applications with the recently released major version as well as to test the new features introduced.

Spark is designed so that you can run on different types of clusters. This is done by supporting several cluster managers such as YARN, the Hadoop platform resource manager, Mesos or Kubernetes. If an existing cluster infrastructure is not available, Spark can run on an integrated resource manager/scheduler. This is commonly referred as “standalone” cluster mode.

In our example, we’ll create a cluster consisting of a master node and 3 worker nodes like the one in the image below.

To setup our cluster, we will use the images created by the developers of the open source project “Big Data Europe”, whose sources are available on GitHub: https://github.com/big-data-europe/docker-spark.

We’ll make small changes to the docker-compose.yml configuration file to size the number of nodes and most importantly to add a persistent volume for reading/writing data during our experiments.

This is the configuration file that we’re going to use.

You can then start the cluster and run a shell on the master node once it starts.

And finally launch our spark-shell.

At the time of testing, I am using a laptop with a CPU with 8 cores and 16Gb of RAM. That’s why I allocated 2 cores for each executor (6 in total) and 2.5 Gb of RAM.

Our development cluster is ready, have fun!

Written by Mario Cartia – Agile Lab Big Data Specialist/Agile Skill Managing Director
 
 If you found this article useful, take a look at our blog and follow us on our Medium Publication, Agile Lab Engineering!
 

Darwin, Avro schema evolution made easy!

Hi everybody! I’m a Big Data Engineer @ Agile Lab, a remote-first Big Data engineering and R&D firm located in Italy. Our main focus is to build Big Data and AI systems, in a very challenging — yet awesome — environment.

In Agile Lab we work on different projects where we chose Avro as encoding format for the data, given its good performances: Avro allows us to store data efficiently in size, but it is not easy to set it up in order to enable data model evolution over time.

Meet Avro!

Avro is a row-based data serialization format. Widely used in Big Data projects, it supports schema evolution in a size efficient fashion, alongside with compression, and splitting. The size reduction is achieved by not storing the schema along with the data: since the schema is not stored with each element (as it would be with a format like JSON) the serialized elements contain only the actual binary data and not their structure.

Let’s see how this is achieved by looking at an example of serialization using Avro. First of all, the schema of the data that should be serialized can be defined using a specific IDL or using JSON. Here we define the hypothetical structure of a “song” in our model with the Avro IDL and the equivalent JSON representation:

IDL and JSON schemas for Song elements

 

Since the schema is the same for all elements, each serialized element could contain only the actual data, and the schema will tell a reader how it should be interpreted: the decoder will go through the fields in the order they appear in the schema and will use the schema to tell the datatype of each field. In our example, the decoder could be able to decode a binary sequence like:

The binary values are interpreted by the decoder using the schema

 

Change is inevitable

One of the problems with the above implementation is that in real scenarios data evolve: new fields are introduced, old ones are removed, fields can be reordered and types can change; e.g. we could add the “album” field to our “song”, change its “duration” field from long to int and change the order of the “duration” and “artists” fields.

To ensure that we don’t lose the ability to read previously written data in the process, a decoder must be able to read both new and old “songs”. Luckily, if the evolution is compliant to certain rules (e.g. the new “album” field must have a default value), then Avro is able to read data written with a certain schema as elements compliant to the newest one, as long as both schemas are provided to the decoder. So, after we introduce the new song schema, we can tell Avro to read a binary element written with the old schema using both the old and the new one. The decoder will be automatically capable of understanding which field of the old schema is mapped to the fields of the new one.

Elements written with an older version of the schema can be read with the new one performing a mapping

between the old and the new fields

 

Note that if the decoder is not aware of the older schema and tries to read some data written with the old schema, it would throw an error since it would read the “album” field value thinking it represents the “song” duration.

Keep your data close and your schemas closer

So it seems that everything works fine with Avro and schema evolution, why should I worry?

Yes, everything is managed transparently by Avro, but only as long as you provide to the decoder not only the schema your application expects to read, but also the one used to encode every single record: how can you know which schema was used to write a very old record? A solution could be to store each element along with its schema, but this way we are going back to store data and schema together (like JSON) losing all the size efficiency!

Another solution could be to write the schema only once into the header of an Avro file with multiple elements that share the same schema in order to reduce the number of schemas per element, but this is not a solution that could be always applied, and is tightly coupled to the number of elements that you can write together in a single file.

Luckily, Avro comes to the rescue again with a particular encoding format called Single-Object Encoding. With this encoding format each element is stored along with an identifier of the schema used for the serialization itself; the identifier is a fingerprint of the schema, to be meant as a hash (64 bit fingerprints are enough to avoid collisions up to a million entries).
From Avro documentation:

Single Avro objects are encoded as follows:

* A two-byte marker, C3 01, to show that the message is Avro and uses this single-record format (version 1).

* The 8-byte little-endian CRC-64-AVRO fingerprint of the object’s schema

* The Avro object encoded using Avro’s binary encoding

Darwin: how I stopped worrying and learned to love the evolution

OK, now we don’t have the actual schema, but only a fingerprint… how can we pass the schema to the decoder if we only have its fingerprint?

Here is where Darwin comes to our help!

Darwin is a lightweight repository written in Scala that allows to maintain different versions of the Avro schemas used in your applications: it is an open-source portable library that does not require any application server!
To store all the data relative to your schemas, you can choose from multiple storage managers (HBase, MongoDB, etc) easily pluggable.

Darwin keeps all the schemas used to write elements stored together with their fingerprints: this way, each time a decoder tries to read an element knowing only the reader schema, it can simply ask Darwin to get the writer schema given its fingerprint.

To plug a Connector for a specific storage, simply include the desired connector among the dependencies of your project. Darwin loads automatically all the Connectors it finds in its classpath: if multiple Connectors are found the first one found is used or you can specify which one to use via configuration.

Even if it is not required when used as a library, Darwin provides a web service that can be queried using REST APIs to retrieve a schema by its fingerprint and to register new schemas. By the time this article is written, Darwin provides connectors for HBase, MongoDB and a REST one that can be linked to a running instance of the Darwin REST server.

Each Connector requires its own set of configurations, e.g. for the HBaseConnector:

Darwin can be configured to run in three different modes:

  1. Lazy mode: Darwin accesses the connector (and the underlying storage) each time it is asked for a schema. This allows the application to always fetch the schemas up to date, but it could increase the number of accesses to the storage. This mode is suitable for applications that always require the latest values found on the storage and cannot afford to load all values at startup (like Web Services).
  2. Eager mode: Darwin loads all the schemas from the storage only once at startup into an internal cache. Then all the schemas are searched only inside the cache without querying the storage again. This reduces to a minimum the accesses to the storage, but won’t retrieve schemas registered after the startup (suitable for batch applications but not for streaming ones).
  3. Lazy cached mode: an hybrid mode between the two presented above; all schemas are loaded at startup and all requests will perform a look up on the internal cache. If a miss occurs, the storage is asked for such schema and, if found, the cache is updated accordingly. This is a mode suitable for streaming applications that could load all the possible values at startup but need to be aware of possible variations during their lifetime.

Darwin in a nutshell

Using Darwin is very easy! Here we have a Scala simple example: first of all we must obtain an instance of AvroSchemaManager passing the desired configurations

Then every application can register the Avro schemas needed by simply invoking

The AvroSchemaManager can then be used to retrieve a schema given its fingerprint and vice-versa

Since the data will be stored and retrieved as single-object encoded, Darwin exposes also APIs to create a single-object encoded byte array starting from an Avro encoded byte array and its schema:

It also provides APIs to retrieve schema and Avro encoded byte array from a single-object encoded element:

The same code in Java would be:

TL;DR — what was that all about?

In Agile Lab we use Avro in different projects as a serialization system that allows us to store data in a binary, fast, and compact data format. Avro achieves very small serialization sizes of the records by storing them without their schema.

Since data evolves (and their schemas accordingly), in order to read records written with different schemas each decoder must know all the schemas used to write records throughout the history of your application. This will cause each application that needs to read the data to worry about the schemas: where to store them, how to fetch them and how to tell which schema was associated to each record.

To overcome said evolution problems in our projects, we created Darwin!

Darwin is a schema repository and utility library that simplifies the whole process of Avro encoding/decoding with schema evolution. We are currently using Darwin in multiple Big Data projects in production at Terabyte scale to solve Avro data evolution problems.

Darwin is available on Maven Central and it is fully open source: code and documentation are available on GitHub.

So give it a try and feel free to give us feedback and contribute to the project!

Written by Lorenzo Pirazzini – Agile Lab Big Data Engineer

 

If you found this article useful, take a look at our blog and follow us on our Medium Publication, Agile Lab Engineering!

Scala ‘fun’ error handling

Hi everybody! I’m Antonio Murgia, a Big Data Architect @ Agile Lab, a remote-first Big Data engineering and R&D firm located in Italy. Our main focus is to build Big Data and AI systems, in a very challenging — yet awesome — environment.

As the title suggests, this is the first post of a series about error handling in a functional way in Scala.

Dealing with failures is one of the harshest things we face in life. Coding is not different than living: at some point, things will go bad and as a software practitioner, you should expect such failures and be prepared. You will never remember when your application ran fine for years without crashing or producing bugs, but you will surely remember the problem that woke you up in the middle of the night and that could’ve been easily avoided by properly dealing with errors and failures in advance.

As a software engineer, I’ve seen my code failing many more times that I would like to admit, almost every piece of technology that your code interacts with, or is made of, can fail. The big difference between stable and fragile applications (when it comes to “code”) is in how they deal with failures.

In this post series (that was born from a workshop I kept at my company) I will show a couple of ideas that make it possible to deal with failures in a very simple, opinionated and explicit way. All of this without sacrificing code readability and composability. The code snippets are written in Scala, a JVM language made (in)famous by Apache Spark, which is our favourite data processing framework at AgileLab, and a functional programming library for Scala named Cats ?.

Disclaimer

Tech stack used in the snippets:

  • Scala 2.11
  • Typelevel Cats 2.0.0 (which is cross-published among 2.11, 2.12 and 2.13)

What this article is not:

  • A guide to Typelevel Cats
  • A category theory article

My objective is to be pragmatic, I want you to read this article and then try these things (and hopefully profit from them).

Our Legacy

Scala is a JVM language with full-interop with Java, therefore we cannot start an article about error handling in functional Scala without explaining how errors are dealt with in Java.

Checked Exceptions

If I try to write a simple program in Java that reads the first 3 lines of a txt file, I come across a very nice feature of the Java programming language: checked exceptions!

This code does not compile ?

Why is that? That’s because some methods that we are using explicitly declare to throw exceptions. In fact, if we give a look at the FileReader constructor signature:

If you’ve ever written any Java, you know that there is an “easy” solution to this problem:

Adding the throws clause to your method definition is enough to make someone else care about the exception.

We have a saying in Italy: Fatta la legge trovato l’inganno that in English translates into something like: Every law has a loophole.

In fact, not all Java exceptions have this useful property (that reflects in the method signature): there are also unchecked exceptions that do not “annoy” the programmer and therefore tend to be ubiquitous.

A very bad and very common practice, in fact, is the following:

Please never do that! You are hiding the fact that the print3lines method can fail, and this is a piece of information that is really precious for whoever is using that method.

Even if checked exceptions are somehow “good” because they appear in the method signatures, exceptions in general are a “bad idea” to deal with expected errors (i.e. parsing a String into a Int , which can fail in obvious and totally unexpected ways), since every thrown exception is an uncoditional jump to the first caller that handles it. And we don’t like uncoditional jumps in our code, otherwise we would code in asm ?

To summarize what we have seen so far:

What about Scala?

Scala is based on the JVM and it’s interoperable with Java, but it has some fundamental differences, one of which is in how it handles exceptions. Scala has ONLY unchecked exceptions.

But Scala also has an exceptional type system (pun intended). Let’s leverage it!

In order to dive deeper in what I’m going to present you, we will need to learn a couple of concepts. Don’t be scared, that’s easy stuff!

ADT — Algebraic Data Types

Quoting from Wikipedia:

In computer programming, especially functional programming and type theory, an algebraic data type is a kind of composite type, i.e., a type formed by combining other types.

Two common classes of algebraic types are:

  • Product Types (tuples and records)
  • Sum Types (tagged/disjoint unions or coproducts)

I know, the names are scary, but in fact they embody a really simple concept. Let’s start from the idea that types are domains. For example, the Boolean type domain is {true, false}, while Int domain is [-2³¹, 2³¹). Unit domain, which is the Scala version of void, is only: (), Nothing (which is the type without instances) it’s an empty domain. Each of these domains is finite and therefore has a size (all types are finite domains, since computer memory is finite).

Product types in Scala are case classes or tuples, and they do exactly what you expect them to do with the domain of their inner types:

Sum types in Scala are sealed hierarchies or enumerations, and they do exactly what you expect them to do with the domain of their inner types:

 

Now, it’s enough with the theory, why is this important?

  • We keep “domain”s of every function in your head while reasoning about code
  • The smaller the domains, the easier the reasoning and (also) built-in docs
  • If you want to stress this, have a look at refined

. . .

So, back to “checked exceptions”, in Scala we can emulate them at the type level as follows:

which is logically equivalent to its Java counterpart:

If you configure the Scala compiler to fail on warnings (i.e. -Xfatal-warnings) you will also get the same behaviour you would have when trying not to deal with checked exceptions in Java. In fact, if you try to write the following code:

Compilation fails due to:

Since the error is now encoded in the type system, it’s very easy for IDEs to help you deal with it:

Ok, but that is a lot of boiler-plate!

That’s why Scala has built-in generic sum types that come to help:

  • Option[A]
  • Either[A, B]

Option[A]

Either[A, B]

Even if Option might not be the best option (my puns are always intended) to deal with failures, or things that “went wrong”, they can be fine when we do not need in any way to communicate the “kind” of error that happened.

Using Either we can rewrite the previous toy program as follows:

Now let’s have a quick look at Either and Option and what they can offer us.

A (de)tour into Either and Option

If you are already “fluent” in Either and Option usage you can skip this chapter ?

Other than pattern matching, Either and Option offer many more idiomatic ways to interact with them.

Map

Option

Either

Right-biased map (default since Scala 2.12)

or Left-biased map

The effect of mapping a “side” (or projection) of the Either or an Option is:

  • if the “side” you’re trying to map is “empty” the computation is short-circuited

  • If the side is “populated” then the transformation is performed. Remember that if you map a Right, it is going to remain a Right and same applies for a Left: you can change the content, but not the type of the container.

flatMap

Option

Either

Right biased flatMap (default since Scala 2.12)

or Left biased flatMap

The effect of flatMapping a “side” (or projection) of the Either or an Option is:

  • if the “side” you are trying to flatMap is empty, the computation is short-circuited:

  • If the side is “populated” then the transformation is performed. Remember that using flatMap you are able to transform a Left into a Right or vice-versa.

Either <-> Option

Let’s reason about Option and Either in terms of ADTs ?

What is Option[A] domain?

  • A{None}

What’s the domain of Either[L, R]?

  • RL

They are quite similar, aren’t they? What if we pick a particular L ? For example let’s pick Unit, the domain of Either[Unit, A] is:

  • A{()}

Which carries the same quantity of information as Option[A].

If we all agree on this, we can say that it’s always possible to turn an Option into an Either providing the missing side value, and viceversa it’s always possible to go from an Either to an Option “losing” one side of the Either. Thankfully Scala knows this better than us, therefore there are idiomatic ways of doing so:

If the option is empty you get Left(x), otherwise you get a Right with the content of the Option. Its dual operation is toLeft :

There are similar methods available on Either projections:

Getting out

When you get to the “end of the world” you will need to do something with the content of the Either and handle both cases (i.e. Left and Right).

We’ve already seen the most known way of “getting out” an Either which is match:

Another way to extract the result from an Either is to use fold:

or merge if A and B are of the same type (or you care about the first common supertype) which is the same as doing:

If instead of using match or folding, you are doing every time something like

or

you are defeating the purpose of using Either. If that happens, don’t bother using Either ?.

The same applies to Option: calling Option.get defeats the purpose of using Option at all, because you are assuming that the Option is Some .

“Real life” programs

Ok, so far so good, but you haven’t shown me a real program — you might think — so let’s try to build something more complex.

Given a list of integers, if the sum of all elements is higher than 100, take the head and then multiply it by 30 otherwise multiply it by -30.

A very simple direct approach would be the following:

Let’s try to see this “program” from the caller perspective:

  • can it fail?
  • if it can fail, which are the error cases?

Looking only at the signature, the answer is: no, this program cannot fail. We have to look at the implementation to actually “spot” were errors can happen and exceptions can be thrown.

Another Approach

The error ADT

The implementation

or using some syntactic sugar:

Now, let’s try to answer the same questions:

  • can it fail?
  • if it can fail, which are the error cases?

Now we can answer the questions just looking at the signature!

Yes, the program can fail ( Either can be Right or Left) and there is just one failure case, the EmptyList.

Bonus — The best code is the code you don’t write

I’m not alone in thinking that the best code is the one you do not have to write or maintain. I would extend saying that the easiest error to handle is the one someone else handles for you. Like in the program we’ve just written, there are cases in which you can avoid to care about errors at all, because you make them not even possible restricting the input of your program.

How can we make our program infallible? Well, we can accept only non empty lists. Cats has what you need: NonEmptyList a list that cannot be empty (a.k.a. Nel ).

Now our program “cannot” fail because we pushed the accountability to obtain a non empty list to the boundaries of our program, we avoid at compile time the possibility to fail.

Very complicated programs

Let’s see now how we can compose a lot of functions that “return” Either, because if you are going to follow my hints, you’re are going to compose a lot of Either s!

Let’s assume you have the following functions already implemented:

And you have to implement f combining all of them:

The most straightforward way to write the program would be something like the following:

A part for the fact that we are not dealing with errors and that this program does not compile at all, it surely has one big advance: the happy path is very “easy” to understand (this is why people say that exceptions are easy, they just ignore them most of the time), whoever reads this code can understand what’s going on. As Martin Fowler says:

Any fool can write code that a computer can understand. Good programmers write code that humans can understand.

We can use match to compose all our function calls like this:

But that’s a lot of boiler plate to write! It’s obvious that the first (wrong) implementation is much more readable.

We can try using flatMap then:

This indeed looks better, but to me it looks a lot like:

Let’s try with a for comprehension:

Do we agree that this implementation has the readability of the first (wrong) implementation and the advantages of actually dealing with errors in a way that reflects on the type system and therefore in the function signatures?

Conclusions — so far

Sticking to these practices is very easy to follow the business logic flow, without compromising on error handling. It is also straightforward to handle errors or just demand the caller to do so. Furthermore, totally avoiding exceptions allows us to not defensively try/catch every method call.

These patterns are particularly useful on Spark batch and streaming workloads were if only one out of 10 billion rows throws an exception the whole application will fail and/or retry to process the same data indefinitely wasting time and resources in vain.

That was enough for this chapter of the series (even if there weren’t any ? around)!

In the next post, we will see how we can combine functions which can fail in a way that is not fail-fast (i.e. stop at the first error) and also how to deal with collections of results.

Stay tuned!

Written by Antonio Murgia – Agile Lab Big Data Architect
If you found this article useful, take a look at our blog and follow us on our Medium Publication, Agile Lab Engineering!