Partitioning in Apache Spark

The following post should serve as a guide for those trying to understand of inner-workings of Apache Spark. I have created it initially for organizing my knowledge and extended later on. It assumes that you, however, possess some basic knowledge of Spark.

All examples are written in Python 2.7 running with PySpark 2.1 but the rules are very similar for other APIs.


First of some words about the most basic concept - a partition:

Partition - a logical chunk of a large data set.

Very often data we are processing can be separated into logical partitions (ie. payments from the same country, ads displayed for given cookie, etc). In Spark, they are distributed among nodes when shuffling occurs.

Spark can run 1 concurrent task for every partition of an RDD (up to the number of cores in the cluster). If you're cluster has 20 cores, you should have at least 20 partitions (in practice 2-3x times more). From the other hand a single partition typically shouldn't contain more than 128MB and a single shuffle block cannot be larger than 2GB (see SPARK-6235).

In general, more numerous partitions allow work to be distributed among more workers, but fewer partitions allow work to be done in larger chunks (and often quicker).

Spark's partitioning feature is available on all RDDs of key/value pairs.

Why care?

For one, quite important reason - performance. By having all relevant data in one place (node) we reduce the overhead of shuffling (need for serialization and network traffic).

Also understanding how Spark deals with partitions allow us to control the application parallelism (which leads to better cluster utilization - fewer costs).

But keep in mind that partitioning will not be helpful in all applications. For example, if a given RDD is scanned only once, there is no point in partitioning it in advance. It's useful only when a dataset is reused multiple times (in key-oriented situations using functions like join()).

We will use the following list of numbers for investigating the behavior.

Playing with partitions

Let's start with creating a local context with allocated one thread only and parallelizing a collection with using all defaults. We are going to use glom() function that will expose the structure of created partitions.

From API: glom()- return an RDD created by coalescing all elements within each partition into a list.

Each RDD also possesses information about partitioning schema (you will see later that it can be invoked explicitly or derived via some transformations).

From API: partitioner - inspect partitioner information used for the RDD.

Ok, so what happened under the hood?

Spark uses different partitioning schemes for various types of RDDs and operations. In a case of using parallelize() data is evenly distributed between partitions using their indices (no partitioning scheme is used).

If there is no partitioner the partitioning is not based upon characteristic of data but distribution is random and uniformed across nodes.

Different rules apply for various data sources and structures (ie. when loading data using textFile() or using tuple objects). Good sumary is provided here.

If you look inside parallelize() source code you will see that the number of partitions can be distinguished either by setting numSlice argument or by using spark.defaultParallelism property (which is reading context information).

Now let's try to allow our driver to use two local cores.

Ok, that worked as expected - the data was distributed across two partitions and each will be executed in a separate thread.

But what will happen when the number of partitions exceeds the number of data records?

You can see that Spark created requested a number of partitions but most of them are empty. This is bad because the time needed to prepare a new thread for processing data (one element) is significantly greater than processing time itself (you can analyze it in Spark UI).

Custom partitions with partitionBy()

partitionBy() transformation allows applying custom partitioning logic over the RDD.

Let's try to partition the data further by taking advantage of domain-specific knowledge.

Warning - to use partitionBy() RDD must consist of tuple (pair) objects. It's a transformation, so a new RDD will be returned. It's highly adviseable to persist it for more optimal later usage.

Because partitionBy() requires data to be in key/value format we will need to transform the data.

In PySpark an object is considered valid for PairRDD operations if it can be unpacked as follows k, v = kv. You can read more about the requirements here.

You can see that now the elements are distributed differently. A few interesting things happened:

  1. parallelize(nums) - we are transforming Python array into RDD with no partitioning scheme,
  2. map(lambda el: (el, el)) - transforming data into the form of a tuple,
  3. partitionBy(2) - splitting data into 2 chunks using default hash partitioner,

Spark used a partitioner function to distinguish which to which partition assign each record. It can be specified as the second argument to the partitionBy(). The partition number is then evaluated as follows partition = partitionFunc(key) % num_partitions.

By default PySpark implementation uses hash partitioning as the partitioning function.

Let's perform an additional sanity check.

But let's get into a more realistic example. Imagine that our data consist of various dummy transactions made across different countries.

We know that further analysis will be performed analyzing many similar records within the same country. To optimize network traffic it seems to be a good idea to put records from one country in one node.

To meet this requirement, we will need a custom partitioner:

Custom partitioner - function returning an integer for given object (tuple key).

By validating our partitioner we can see what partitions are assigned for each country.

Pay attention for potential data skews. If some keys are overrepresented in the dataset it can result in suboptimal resource usage and potential failure.

It worked as expected all records from a single country is within one partition. We can do some work directly on them without worrying about shuffling by using the mapPartitions() function.

From API: mapPartitions() converts each partition of the source RDD into multiple elements of the result (possibly none). One important usage can be some heavyweight initialization (that should be done once for many elements). Using mapPartitions() it can be done once per worker task/thread/partition instead of running map() for each RDD data element.

In the example below, we will calculate the sum of sales in each partition (in this case such operations make no sense, but the point is to show how to pass data into mapPartitions() function).

Working with DataFrames

Nowadays we are all advised to abandon operations on raw RDDs and use structured DataFrames (or Datasets if using Java or Scala) from Spark SQL module. Creators made it very easy to create custom partitioners in this case.

You can see that DataFrames expose a modified repartition() method taking as an argument a column name. When not specifying number of partitions a default value is used (taken from the config parameter spark.sql.shuffle.partitions).

Let's take a closer look at this method at the general.

coalesce() and repartition()

coalesce() and repartition() transformations are used for changing the number of partitions in the RDD.

repartition() is calling coalesce() with explicit shuffling.

The rules for using are as follows:

  • if you are increasing the number of partitions use repartition() (performing full shuffle),
  • if you are decreasing the number of partitions use coalesce() (minimizes shuffles)

Code below shows how repartitioning works (data is represented using DataFrames).

Vanishing partitioning schema

Many available RDD operations will take advantage of underlying partitioning. On the other hand operations like map() cause the new RDD to forget the parent's partitioning information.

Operations that benefit from partitioning

All operations performing shuffling data by key will benefit from partitioning. Some examples are cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey() or lookup().

Operations that affect partitioning

Spark knows internally how each of it's operations affects partitioning, and automatically sets the partitioner on RDDs created by operations that partition that data.

But the are some transformations that cannot guarantee to produce known partitioning - for example calling map() could theoretically modify the key of each element.

Spark does not analyze your functions to check whether they retain the key.

Instead, there are some functions provided that guarantee that each tuple's key remains the same - mapValues(), flatMapValues() or filter() (if the parent has a partitioner).

Memory issues

Have you ever seen this mysterious piece of text - java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE ?

Looking into stack trace it can be spotted that it's not coming from within you app but from Spark internals. The reason is that in Spark you cannot have shuffle block greater than 2GB.

Shuffle block - data transferred across stages between executors.

This happens because Spark uses ByteBuffer as abstraction for storing block and it's limited by Integer.MAX_SIZE (2 GB).

It's especially problematic for Spark SQL (various aggregation functions) because the default number of partitions to use when doing shuffle is set to 200 (it can lead to high shuffle block sizes that can sometimes exceed 2GB).

So what can be done:

  1. Increase the number of partitions (thereby, reducing the average partition size) by increasing the value of spark.sql.shuffle.partitions for Spark SQL or by calling repartition() or coalesce() on RDDs,
  2. Get rid of skew in data

It's good to know that Spark uses different logic for memory management when the number of partitions is greater than 2000 (uses high compression algorithm). So if you have ~2000 partitions it's worth bumping it up to 2001 which will result in smaller memory footprint.

Take-aways

Spark partitioning is available on all RDDs of key/value pairs and causes the system to group elements based on a function of each key.

Features

  • tuples in the same partition are guaranteed to be on the same machine,
  • each node in the cluster can contain more than one partition,
  • the total number of partitions are configurable (by default set to the total number of cores on all executor nodes)

Performance tuning checklist

  • have the correct number of partitions (according to cluster specification) - check this and that for guidance,
  • consider using custom partitioners,
  • check if your transformations preserve partition schema,
  •  check if memory could be optimized by bumping number of partitions to 2001

Settings

  • spark.default.parallelism - sets up the number of partitions to use for HashPartitioner (can be overridden when creating SparkContext object),
  • spark.sql.shuffle.partitions - controls the number of partitions for operations on DataFrames (default is 200)

As the final thought note that the number of partitions also determine how many files will be generated by actions saving an RDD to files.

Sources

Reproducible research and explaining predictions of any classifier

Recently I had a pleasure to give two talks at PyData Wrocław meetup group - about reproducible data science and explaining predictions of any classifier using LIME project. The meeting is taking place each month enabling others to discuss potential issues they encounter in their projects or simply share knowledge.

 

Reproducible data science

Practical approach

Assuring reproducibility is one of the most important issues in any scientific projects. See what techniques and tools you can use in your daily basis

Why have you done this to me?

Explaining predictions of any classifier

Very often it's nearly impossible to explain the decision made by a black-box classifier. But there is a new open source library solving this problem (LIME). Learn what's possible by seeing it in action.

Video

You can watch the whole presentation below (28:12):

You can download notebook and data files used in the examples here.

 

10,5 Python Libraries for Data Analysis Nobody Told You About

UPDATED: 23 Apr. 2017

Below is a list of little "less popular" Python libraries that can add tremendous value to your data projects.


LIME

Github stats: 84 watchers, 1004 stars, 155 forks

LIME project (local interpretable model-agnostic explanations) is about to reveal what were the motivations for any black-box classifier that picked certain action.

At the moment the reasoning is possible for text and tabular data (with continuous, discrete and both features). The project is constantly evolving and you can expect much, much more improvements over time.

 

All you need to provide is an algorithm that outputs probability for each class.

Just watch the promo-video for the project (2:55 min):


Yellowbrick

Github stats: 19 watchers, 140 stars, 43 forks

Yellowbrick is a collection of tools that are super-handy for visualization of machine learning issues related to feature or model selection/evaluation and parameter's tuning.

There are about 19 distinct tools available, ranging from simple boxplots to grid-search heat maps.

It's of course designed to play nicely with Scikit-learn package.


Traces

Github stats: 10 watchers, 176 stars, 18 forks

As written in the docs Traces aims for making it simple to manipulate, transform and perform analysis of unevenly spaced time series.

It offers some very handy helper functions for simplifying analysis like getting distributions by each day of a week or transforming to evenly spaced events (ie. for doing forecasting).


Quiver

Github stats: 41 watchers, 878 stars, 63 forks

Quiver is a kick-ass tool for doing interactive visualization of Keras convolutional network features.

The way it works it that you need to build and feed a Keras model into Quiver. Then with just one line of code start an embedded web-server with the app (built with React and Redux) and open it in your browser.

Watch the video how to explore layer activations on all the different images (1:47 min):


Dplython

Github stats: 30 watchers, 526 stars, 36 forks

If you have done some data analysis in R using dplyr package and later on switched to Python you probably know the pain of no such convenient piping possibility.

Dplython aims for providing the same functionality for pandas data-frames as dplyr in R.

Just see what's possible:

The library makes it possible to perform "pipeline-able" operations by creating special function decorators. You can read more about this here.


TSFRESH

Github stats: 70 watchers, 1745 stars, 118 forks

TSFRESH stands for Time Series Feature extraction based on scalable hypothesis tests".

The beauty of this project is that it can help you to automatically extract about various 100 (!) features from a signal.

To avoid duplicated or irrelevant features TSFRESH utilizes a filtering procedure evaluating the explaining power and importance of each characteristic for the regression or classification tasks.


Arrow

Github stats: 117 watchers, 3853 stars, 314 forks

Arrow is the library providing an impressive user experience for working with dates and time.

Even though Python is fully equipped with many modules for the same purpose, you probably can do this with Arrow faster, cleaner and simpler.

The library is inspired by famous moment.js.

To learn more about it read the docs.


TPOT

Github stats: 136 watchers, 1728 stars, 251 forks

TPOT utilizes genetic algorithms to automatically create and optimize machine learning pipelines. it will explore thousands of possibilities and get's back to you with the best one.

To show you this magic I have prepared a short (3:50 min) video (loading a Kaggle dataset, configuring and training app for 60 minutes). Click if you're curious to see what will happen.

It can be used both as CLI or within Python code. All you need to do is to prepare some good quality data and write a little script for starting computations (see examples). After some time (or iterations) script stops, providing you Python snippet (based on Sklearn) with the best configuration found.


PandaSQL

Github stats: 30 watchers, 381 stars, 50 forks

PandaSQL allows you to query Pandas DataFrames using SQL syntax.

First, you need to load interesting DataFrame into PandaSQL engine. Then enter SQL query and obtain results. You can use features like grouping, sub-queries, various kind of joins etc.

For more examples see this demo and blog post.


Auto-sklearn

Github stats: 50 watchers, 707 stars, 119 forks

Auto-sklearn is an automated machine learning toolkit.

It works similar to TPOT but instead of using genetic algorithms, Auto-sklearn leverages recent advantages in Bayesian optimization, meta-learning and ensemble construction.

Caution: author warns that the package probably won't work with Windows and mac OS operating systems.


Scikit-plot

Github stats: 26 watcher, 688 stars, 60 forks

An intuitive library to add plotting functionality to scikit-learn objects.

Scikit-plot is the result of an unartistic data scientist's dreadful realization that visualization is one of the most crucial components in the data science process, not just a mere afterthought.

Although it's name suggest tight coupling with Scikit-learn library it's flexible enought to work with different APIs as well.


To help you experiment and play with some of these libraries I have prepared a Docker image (see the Dockerfile to know what's included).

It can be used for running scripts or to perform as a remote interpreter.

To download and get into the console just type:

That's all.


If you know about other, hidden gems for doing data analysis in Python post them as the comment - I will be happy to review them and add to list.

Boolean Multiplexer in Practice

Introduction

There are two popular types of problems for evaluating learning classifier systems:

  • single-step - like "question-answer" systems,
  • multi-steps - problems where multiple consequential steps are needed to solve it. Most popular are different kind of mazes (in literature often referred as various kind of MAZE or WOODS environments).

This article will focus on a method for testing single-step systems, where the environment has the Markov property (each state is independent of it's predecessor).

A method referred as boolean multiplexer function will be first described, followed by some examples and a simple Python implementation.

Enter ...

Multiplexer

First, let's gain some intuition about the idea of a multiplexer:

Multiplexing is the generic term used to describe the operation of sending one or more analog or digital signals over a common transmission line at different times or speeds. [source]

In the following scheme, an example of the 4-1 multiplexer with 4 inputs, 2 control signals, and 1 output is presented. The output Q can be one of the input signal A, B, C or D depending on the value of a and b.

There are of course many different configuration options available but this knowledge should be sufficient for now.

Boolean multiplexer function

Boolean multiplexer is a case where each signal is represented in binary using either 0 or 1.

There is a convention that the incoming signal consists of two, concatenated parts - control and data bits

In the example, above we are dealing with 6-bit boolean multiplexer. First 2 bits are capable of addressing 4 inputs ( 2^2 = 4 ) that came along.

The output is a data bit at a location specified by converting control bit number into decimal (in this case bin(01) = dec(1)). Data bits indexing starts from zero.

Examples

Below you will find three examples of multiplexer functions.

3-bit

Control bits: 1, Data bits: 2

The output is the 0-th data bit.

6-bit

Control bits: 2, Data bits: 4

The output is the 3-rd data bit.

11-bit

Control bits: 3, Data bits: 8

The output is the 5-th data bit.

Implementation

The following implementation generates a random binary signal (user needs to provide a number of control bits), and prints the correct value of the signal.

Mind that you need to make sure the bitstring module is available in your OS.

Here is an example of using 2 bits for controlling the signal (6-bit multiplexer):

 

Integrating Apache Spark 2.0 with PyCharm CE

The following post presents how to configure JetBrains PyCharm CE IDE to develop applications with Apache Spark 2.0+ framework.

  1. Download Apache Spark distribution pre-built for Hadoop (link).
  2. Unpack the archive. This directory will be later referred as $SPARK_HOME.
  3. Start PyCharm and create a new project File → New Project. Call it "spark-demo".
  4. Inside project create a new Python file - New → Python File. Call it run.py.
  5. Write a simple script counting the occurrences of A's and B's inside Spark's README.md file. Don't worry about the errors, we will fix them in next steps.
  6. Add required librariesPyCharm → Preferences ... → Project spark-demo → Project Structure → Add Content Root. Select all ZIP files from $SPARK_HOME/python/lib. Apply changes.
  7. Create a new run configuration. Go into Run → Edit Configurations → + → Python. Name it "Run with Spark" and select the previously created file as the script to be executed.
  8. Add environment variables. Inside created configuration add the corresponding environment variables. Save all changes.
  9. Run the script - Run → Run 'Run with Spark'. You should see that the script is executed properly within Spark context.

Now you can improve your working experience with IDE advanced features like debugging or code completion.

Happy coding.