Partitioning in Apache Spark

The following post should serve as a guide for those trying to understand of inner-workings of Apache Spark. I have created it initially for organizing my knowledge and extended later on. It assumes that you, however, possess some basic knowledge of Spark.

All examples are written in Python 2.7 running with PySpark 2.1 but the rules are very similar for other APIs.


First of some words about the most basic concept - a partition:

Partition - a logical chunk of a large data set.

Very often data we are processing can be separated into logical partitions (ie. payments from the same country, ads displayed for given cookie, etc). In Spark, they are distributed among nodes when shuffling occurs.

Spark can run 1 concurrent task for every partition of an RDD (up to the number of cores in the cluster). If you're cluster has 20 cores, you should have at least 20 partitions (in practice 2-3x times more). From the other hand a single partition typically shouldn't contain more than 128MB and a single shuffle block cannot be larger than 2GB (see SPARK-6235).

In general, more numerous partitions allow work to be distributed among more workers, but fewer partitions allow work to be done in larger chunks (and often quicker).

Spark's partitioning feature is available on all RDDs of key/value pairs.

Why care?

For one, quite important reason - performance. By having all relevant data in one place (node) we reduce the overhead of shuffling (need for serialization and network traffic).

Also understanding how Spark deals with partitions allow us to control the application parallelism (which leads to better cluster utilization - fewer costs).

But keep in mind that partitioning will not be helpful in all applications. For example, if a given RDD is scanned only once, there is no point in partitioning it in advance. It's useful only when a dataset is reused multiple times (in key-oriented situations using functions like join()).

We will use the following list of numbers for investigating the behavior.

Playing with partitions

Let's start with creating a local context with allocated one thread only and parallelizing a collection with using all defaults. We are going to use glom() function that will expose the structure of created partitions.

From API: glom()- return an RDD created by coalescing all elements within each partition into a list.

Each RDD also possesses information about partitioning schema (you will see later that it can be invoked explicitly or derived via some transformations).

From API: partitioner - inspect partitioner information used for the RDD.

Ok, so what happened under the hood?

Spark uses different partitioning schemes for various types of RDDs and operations. In a case of using parallelize() data is evenly distributed between partitions using their indices (no partitioning scheme is used).

If there is no partitioner the partitioning is not based upon characteristic of data but distribution is random and uniformed across nodes.

Different rules apply for various data sources and structures (ie. when loading data using textFile() or using tuple objects). Good sumary is provided here.

If you look inside parallelize() source code you will see that the number of partitions can be distinguished either by setting numSlice argument or by using spark.defaultParallelism property (which is reading context information).

Now let's try to allow our driver to use two local cores.

Ok, that worked as expected - the data was distributed across two partitions and each will be executed in a separate thread.

But what will happen when the number of partitions exceeds the number of data records?

You can see that Spark created requested a number of partitions but most of them are empty. This is bad because the time needed to prepare a new thread for processing data (one element) is significantly greater than processing time itself (you can analyze it in Spark UI).

Custom partitions with partitionBy()

partitionBy() transformation allows applying custom partitioning logic over the RDD.

Let's try to partition the data further by taking advantage of domain-specific knowledge.

Warning - to use partitionBy() RDD must consist of tuple (pair) objects. It's a transformation, so a new RDD will be returned. It's highly adviseable to persist it for more optimal later usage.

Because partitionBy() requires data to be in key/value format we will need to transform the data.

In PySpark an object is considered valid for PairRDD operations if it can be unpacked as follows k, v = kv. You can read more about the requirements here.

You can see that now the elements are distributed differently. A few interesting things happened:

  1. parallelize(nums) - we are transforming Python array into RDD with no partitioning scheme,
  2. map(lambda el: (el, el)) - transforming data into the form of a tuple,
  3. partitionBy(2) - splitting data into 2 chunks using default hash partitioner,

Spark used a partitioner function to distinguish which to which partition assign each record. It can be specified as the second argument to the partitionBy(). The partition number is then evaluated as follows partition = partitionFunc(key) % num_partitions.

By default PySpark implementation uses hash partitioning as the partitioning function.

Let's perform an additional sanity check.

But let's get into a more realistic example. Imagine that our data consist of various dummy transactions made across different countries.

We know that further analysis will be performed analyzing many similar records within the same country. To optimize network traffic it seems to be a good idea to put records from one country in one node.

To meet this requirement, we will need a custom partitioner:

Custom partitioner - function returning an integer for given object (tuple key).

By validating our partitioner we can see what partitions are assigned for each country.

Pay attention for potential data skews. If some keys are overrepresented in the dataset it can result in suboptimal resource usage and potential failure.

It worked as expected all records from a single country is within one partition. We can do some work directly on them without worrying about shuffling by using the mapPartitions() function.

From API: mapPartitions() converts each partition of the source RDD into multiple elements of the result (possibly none). One important usage can be some heavyweight initialization (that should be done once for many elements). Using mapPartitions() it can be done once per worker task/thread/partition instead of running map() for each RDD data element.

In the example below, we will calculate the sum of sales in each partition (in this case such operations make no sense, but the point is to show how to pass data into mapPartitions() function).

Working with DataFrames

Nowadays we are all advised to abandon operations on raw RDDs and use structured DataFrames (or Datasets if using Java or Scala) from Spark SQL module. Creators made it very easy to create custom partitioners in this case.

You can see that DataFrames expose a modified repartition() method taking as an argument a column name. When not specifying number of partitions a default value is used (taken from the config parameter spark.sql.shuffle.partitions).

Let's take a closer look at this method at the general.

coalesce() and repartition()

coalesce() and repartition() transformations are used for changing the number of partitions in the RDD.

repartition() is calling coalesce() with explicit shuffling.

The rules for using are as follows:

  • if you are increasing the number of partitions use repartition() (performing full shuffle),
  • if you are decreasing the number of partitions use coalesce() (minimizes shuffles)

Code below shows how repartitioning works (data is represented using DataFrames).

Vanishing partitioning schema

Many available RDD operations will take advantage of underlying partitioning. On the other hand operations like map() cause the new RDD to forget the parent's partitioning information.

Operations that benefit from partitioning

All operations performing shuffling data by key will benefit from partitioning. Some examples are cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey() or lookup().

Operations that affect partitioning

Spark knows internally how each of it's operations affects partitioning, and automatically sets the partitioner on RDDs created by operations that partition that data.

But the are some transformations that cannot guarantee to produce known partitioning - for example calling map() could theoretically modify the key of each element.

Spark does not analyze your functions to check whether they retain the key.

Instead, there are some functions provided that guarantee that each tuple's key remains the same - mapValues(), flatMapValues() or filter() (if the parent has a partitioner).

Memory issues

Have you ever seen this mysterious piece of text - java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE ?

Looking into stack trace it can be spotted that it's not coming from within you app but from Spark internals. The reason is that in Spark you cannot have shuffle block greater than 2GB.

Shuffle block - data transferred across stages between executors.

This happens because Spark uses ByteBuffer as abstraction for storing block and it's limited by Integer.MAX_SIZE (2 GB).

It's especially problematic for Spark SQL (various aggregation functions) because the default number of partitions to use when doing shuffle is set to 200 (it can lead to high shuffle block sizes that can sometimes exceed 2GB).

So what can be done:

  1. Increase the number of partitions (thereby, reducing the average partition size) by increasing the value of spark.sql.shuffle.partitions for Spark SQL or by calling repartition() or coalesce() on RDDs,
  2. Get rid of skew in data

It's good to know that Spark uses different logic for memory management when the number of partitions is greater than 2000 (uses high compression algorithm). So if you have ~2000 partitions it's worth bumping it up to 2001 which will result in smaller memory footprint.

Take-aways

Spark partitioning is available on all RDDs of key/value pairs and causes the system to group elements based on a function of each key.

Features

  • tuples in the same partition are guaranteed to be on the same machine,
  • each node in the cluster can contain more than one partition,
  • the total number of partitions are configurable (by default set to the total number of cores on all executor nodes)

Performance tuning checklist

  • have the correct number of partitions (according to cluster specification) - check this and that for guidance,
  • consider using custom partitioners,
  • check if your transformations preserve partition schema,
  •  check if memory could be optimized by bumping number of partitions to 2001

Settings

  • spark.default.parallelism - sets up the number of partitions to use for HashPartitioner (can be overridden when creating SparkContext object),
  • spark.sql.shuffle.partitions - controls the number of partitions for operations on DataFrames (default is 200)

As the final thought note that the number of partitions also determine how many files will be generated by actions saving an RDD to files.

Sources

Reproducible research and explaining predictions of any classifier

Recently I had a pleasure to give two talks at PyData Wrocław meetup group - about reproducible data science and explaining predictions of any classifier using LIME project. The meeting is taking place each month enabling others to discuss potential issues they encounter in their projects or simply share knowledge.

 

Reproducible data science

Practical approach

Assuring reproducibility is one of the most important issues in any scientific projects. See what techniques and tools you can use in your daily basis

Why have you done this to me?

Explaining predictions of any classifier

Very often it's nearly impossible to explain the decision made by a black-box classifier. But there is a new open source library solving this problem (LIME). Learn what's possible by seeing it in action.

Video

You can watch the whole presentation below (28:12):

You can download notebook and data files used in the examples here.

 

10,5 Python Libraries for Data Analysis Nobody Told You About

UPDATED: 23 Apr. 2017

Below is a list of little "less popular" Python libraries that can add tremendous value to your data projects.


LIME

Github stats: 84 watchers, 1004 stars, 155 forks

LIME project (local interpretable model-agnostic explanations) is about to reveal what were the motivations for any black-box classifier that picked certain action.

At the moment the reasoning is possible for text and tabular data (with continuous, discrete and both features). The project is constantly evolving and you can expect much, much more improvements over time.

 

All you need to provide is an algorithm that outputs probability for each class.

Just watch the promo-video for the project (2:55 min):


Yellowbrick

Github stats: 19 watchers, 140 stars, 43 forks

Yellowbrick is a collection of tools that are super-handy for visualization of machine learning issues related to feature or model selection/evaluation and parameter's tuning.

There are about 19 distinct tools available, ranging from simple boxplots to grid-search heat maps.

It's of course designed to play nicely with Scikit-learn package.


Traces

Github stats: 10 watchers, 176 stars, 18 forks

As written in the docs Traces aims for making it simple to manipulate, transform and perform analysis of unevenly spaced time series.

It offers some very handy helper functions for simplifying analysis like getting distributions by each day of a week or transforming to evenly spaced events (ie. for doing forecasting).


Quiver

Github stats: 41 watchers, 878 stars, 63 forks

Quiver is a kick-ass tool for doing interactive visualization of Keras convolutional network features.

The way it works it that you need to build and feed a Keras model into Quiver. Then with just one line of code start an embedded web-server with the app (built with React and Redux) and open it in your browser.

Watch the video how to explore layer activations on all the different images (1:47 min):


Dplython

Github stats: 30 watchers, 526 stars, 36 forks

If you have done some data analysis in R using dplyr package and later on switched to Python you probably know the pain of no such convenient piping possibility.

Dplython aims for providing the same functionality for pandas data-frames as dplyr in R.

Just see what's possible:

The library makes it possible to perform "pipeline-able" operations by creating special function decorators. You can read more about this here.


TSFRESH

Github stats: 70 watchers, 1745 stars, 118 forks

TSFRESH stands for Time Series Feature extraction based on scalable hypothesis tests".

The beauty of this project is that it can help you to automatically extract about various 100 (!) features from a signal.

To avoid duplicated or irrelevant features TSFRESH utilizes a filtering procedure evaluating the explaining power and importance of each characteristic for the regression or classification tasks.


Arrow

Github stats: 117 watchers, 3853 stars, 314 forks

Arrow is the library providing an impressive user experience for working with dates and time.

Even though Python is fully equipped with many modules for the same purpose, you probably can do this with Arrow faster, cleaner and simpler.

The library is inspired by famous moment.js.

To learn more about it read the docs.


TPOT

Github stats: 136 watchers, 1728 stars, 251 forks

TPOT utilizes genetic algorithms to automatically create and optimize machine learning pipelines. it will explore thousands of possibilities and get's back to you with the best one.

To show you this magic I have prepared a short (3:50 min) video (loading a Kaggle dataset, configuring and training app for 60 minutes). Click if you're curious to see what will happen.

It can be used both as CLI or within Python code. All you need to do is to prepare some good quality data and write a little script for starting computations (see examples). After some time (or iterations) script stops, providing you Python snippet (based on Sklearn) with the best configuration found.


PandaSQL

Github stats: 30 watchers, 381 stars, 50 forks

PandaSQL allows you to query Pandas DataFrames using SQL syntax.

First, you need to load interesting DataFrame into PandaSQL engine. Then enter SQL query and obtain results. You can use features like grouping, sub-queries, various kind of joins etc.

For more examples see this demo and blog post.


Auto-sklearn

Github stats: 50 watchers, 707 stars, 119 forks

Auto-sklearn is an automated machine learning toolkit.

It works similar to TPOT but instead of using genetic algorithms, Auto-sklearn leverages recent advantages in Bayesian optimization, meta-learning and ensemble construction.

Caution: author warns that the package probably won't work with Windows and mac OS operating systems.


Scikit-plot

Github stats: 26 watcher, 688 stars, 60 forks

An intuitive library to add plotting functionality to scikit-learn objects.

Scikit-plot is the result of an unartistic data scientist's dreadful realization that visualization is one of the most crucial components in the data science process, not just a mere afterthought.

Although it's name suggest tight coupling with Scikit-learn library it's flexible enought to work with different APIs as well.


To help you experiment and play with some of these libraries I have prepared a Docker image (see the Dockerfile to know what's included).

It can be used for running scripts or to perform as a remote interpreter.

To download and get into the console just type:

That's all.


If you know about other, hidden gems for doing data analysis in Python post them as the comment - I will be happy to review them and add to list.

"Call for backup" ... with Elasticsearch

Introduction

I bet you all heard this ancient adage:

There are two types of people - those who backup, and those who will backup.

This post is dedicated to the second group of users. Those who just started using Elasticsearch with their production data, and unconsciously feel that something is wrong.

 

Tools

No doubt - I'm a big Docker fan. It saves tons of time and is super easy to use. If you haven't done it before I strongly encourage you to learn it now.

For performing backup we will use official elasticsearch-dump tool within Docker container. In the example below I'm running Elasticsearch 2.4.

We will create two bash scripts - one for backing up data and the other, more importantly - for restoring it. Optionally you can also configure Cron scheduler to do this for you automatically.

Feel free to adjust the directories and file names accordingly to your needs - there are no strict rules here.

Backup

Create a file called perform_backup.sh with the following content:

Make sure to provide valid path to ES server and replace the <INDEX> with the name of the index you want to backup.

That's it.

Restore

Create a file revert_backup.sh and paste the code below:

Like in the previous example make sure that --output argument points to the destination you want it to point.

Also note that you need to pass a backup file as an argument for the script to run correctly, ie:

Cron

If you want to become a pro you can even go one step further - schedule automatic backups.

With only one command:

The following command adds an entry to crontab to execute the script every day at 5 AM.

Summary

Sooner or later everybody will be backing up their data. A feeling that we can deal with unexpected situations reassure our minds and is worth pursuing.

Remember to thoroughly test the whole process a couple times and see if you can fully rely on it. Scripts provided above are exemplary and you should adjust them accordingly to your needs.

Integrating Apache Spark 2.0 with PyCharm CE

The following post presents how to configure JetBrains PyCharm CE IDE to develop applications with Apache Spark 2.0+ framework.

  1. Download Apache Spark distribution pre-built for Hadoop (link).
  2. Unpack the archive. This directory will be later referred as $SPARK_HOME.
  3. Start PyCharm and create a new project File → New Project. Call it "spark-demo".
  4. Inside project create a new Python file - New → Python File. Call it run.py.
  5. Write a simple script counting the occurrences of A's and B's inside Spark's README.md file. Don't worry about the errors, we will fix them in next steps.
  6. Add required librariesPyCharm → Preferences ... → Project spark-demo → Project Structure → Add Content Root. Select all ZIP files from $SPARK_HOME/python/lib. Apply changes.
  7. Create a new run configuration. Go into Run → Edit Configurations → + → Python. Name it "Run with Spark" and select the previously created file as the script to be executed.
  8. Add environment variables. Inside created configuration add the corresponding environment variables. Save all changes.
  9. Run the script - Run → Run 'Run with Spark'. You should see that the script is executed properly within Spark context.

Now you can improve your working experience with IDE advanced features like debugging or code completion.

Happy coding.