Partitioning in Apache Spark

The following post should serve as a guide for those trying to understand of inner-workings of Apache Spark. I have created it initially for organizing my knowledge and extended later on. It assumes that you, however, possess some basic knowledge of Spark.

All examples are written in Python 2.7 running with PySpark 2.1 but the rules are very similar for other APIs.


First of some words about the most basic concept - a partition:

Partition - a logical chunk of a large data set.

Very often data we are processing can be separated into logical partitions (ie. payments from the same country, ads displayed for given cookie, etc). In Spark, they are distributed among nodes when shuffling occurs.

Spark can run 1 concurrent task for every partition of an RDD (up to the number of cores in the cluster). If you're cluster has 20 cores, you should have at least 20 partitions (in practice 2-3x times more). From the other hand a single partition typically shouldn't contain more than 128MB and a single shuffle block cannot be larger than 2GB (see SPARK-6235).

In general, more numerous partitions allow work to be distributed among more workers, but fewer partitions allow work to be done in larger chunks (and often quicker).

Spark's partitioning feature is available on all RDDs of key/value pairs.

Why care?

For one, quite important reason - performance. By having all relevant data in one place (node) we reduce the overhead of shuffling (need for serialization and network traffic).

Also understanding how Spark deals with partitions allow us to control the application parallelism (which leads to better cluster utilization - fewer costs).

But keep in mind that partitioning will not be helpful in all applications. For example, if a given RDD is scanned only once, there is no point in partitioning it in advance. It's useful only when a dataset is reused multiple times (in key-oriented situations using functions like join()).

We will use the following list of numbers for investigating the behavior.

Playing with partitions

Let's start with creating a local context with allocated one thread only and parallelizing a collection with using all defaults. We are going to use glom() function that will expose the structure of created partitions.

From API: glom()- return an RDD created by coalescing all elements within each partition into a list.

Each RDD also possesses information about partitioning schema (you will see later that it can be invoked explicitly or derived via some transformations).

From API: partitioner - inspect partitioner information used for the RDD.

Ok, so what happened under the hood?

Spark uses different partitioning schemes for various types of RDDs and operations. In a case of using parallelize() data is evenly distributed between partitions using their indices (no partitioning scheme is used).

If there is no partitioner the partitioning is not based upon characteristic of data but distribution is random and uniformed across nodes.

Different rules apply for various data sources and structures (ie. when loading data using textFile() or using tuple objects). Good sumary is provided here.

If you look inside parallelize() source code you will see that the number of partitions can be distinguished either by setting numSlice argument or by using spark.defaultParallelism property (which is reading context information).

Now let's try to allow our driver to use two local cores.

Ok, that worked as expected - the data was distributed across two partitions and each will be executed in a separate thread.

But what will happen when the number of partitions exceeds the number of data records?

You can see that Spark created requested a number of partitions but most of them are empty. This is bad because the time needed to prepare a new thread for processing data (one element) is significantly greater than processing time itself (you can analyze it in Spark UI).

Custom partitions with partitionBy()

partitionBy() transformation allows applying custom partitioning logic over the RDD.

Let's try to partition the data further by taking advantage of domain-specific knowledge.

Warning - to use partitionBy() RDD must consist of tuple (pair) objects. It's a transformation, so a new RDD will be returned. It's highly adviseable to persist it for more optimal later usage.

Because partitionBy() requires data to be in key/value format we will need to transform the data.

In PySpark an object is considered valid for PairRDD operations if it can be unpacked as follows k, v = kv. You can read more about the requirements here.

You can see that now the elements are distributed differently. A few interesting things happened:

  1. parallelize(nums) - we are transforming Python array into RDD with no partitioning scheme,
  2. map(lambda el: (el, el)) - transforming data into the form of a tuple,
  3. partitionBy(2) - splitting data into 2 chunks using default hash partitioner,

Spark used a partitioner function to distinguish which to which partition assign each record. It can be specified as the second argument to the partitionBy(). The partition number is then evaluated as follows partition = partitionFunc(key) % num_partitions.

By default PySpark implementation uses hash partitioning as the partitioning function.

Let's perform an additional sanity check.

But let's get into a more realistic example. Imagine that our data consist of various dummy transactions made across different countries.

We know that further analysis will be performed analyzing many similar records within the same country. To optimize network traffic it seems to be a good idea to put records from one country in one node.

To meet this requirement, we will need a custom partitioner:

Custom partitioner - function returning an integer for given object (tuple key).

By validating our partitioner we can see what partitions are assigned for each country.

Pay attention for potential data skews. If some keys are overrepresented in the dataset it can result in suboptimal resource usage and potential failure.

It worked as expected all records from a single country is within one partition. We can do some work directly on them without worrying about shuffling by using the mapPartitions() function.

From API: mapPartitions() converts each partition of the source RDD into multiple elements of the result (possibly none). One important usage can be some heavyweight initialization (that should be done once for many elements). Using mapPartitions() it can be done once per worker task/thread/partition instead of running map() for each RDD data element.

In the example below, we will calculate the sum of sales in each partition (in this case such operations make no sense, but the point is to show how to pass data into mapPartitions() function).

Working with DataFrames

Nowadays we are all advised to abandon operations on raw RDDs and use structured DataFrames (or Datasets if using Java or Scala) from Spark SQL module. Creators made it very easy to create custom partitioners in this case.

You can see that DataFrames expose a modified repartition() method taking as an argument a column name. When not specifying number of partitions a default value is used (taken from the config parameter spark.sql.shuffle.partitions).

Let's take a closer look at this method at the general.

coalesce() and repartition()

coalesce() and repartition() transformations are used for changing the number of partitions in the RDD.

repartition() is calling coalesce() with explicit shuffling.

The rules for using are as follows:

  • if you are increasing the number of partitions use repartition() (performing full shuffle),
  • if you are decreasing the number of partitions use coalesce() (minimizes shuffles)

Code below shows how repartitioning works (data is represented using DataFrames).

Vanishing partitioning schema

Many available RDD operations will take advantage of underlying partitioning. On the other hand operations like map() cause the new RDD to forget the parent's partitioning information.

Operations that benefit from partitioning

All operations performing shuffling data by key will benefit from partitioning. Some examples are cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey() or lookup().

Operations that affect partitioning

Spark knows internally how each of it's operations affects partitioning, and automatically sets the partitioner on RDDs created by operations that partition that data.

But the are some transformations that cannot guarantee to produce known partitioning - for example calling map() could theoretically modify the key of each element.

Spark does not analyze your functions to check whether they retain the key.

Instead, there are some functions provided that guarantee that each tuple's key remains the same - mapValues(), flatMapValues() or filter() (if the parent has a partitioner).

Memory issues

Have you ever seen this mysterious piece of text - java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE ?

Looking into stack trace it can be spotted that it's not coming from within you app but from Spark internals. The reason is that in Spark you cannot have shuffle block greater than 2GB.

Shuffle block - data transferred across stages between executors.

This happens because Spark uses ByteBuffer as abstraction for storing block and it's limited by Integer.MAX_SIZE (2 GB).

It's especially problematic for Spark SQL (various aggregation functions) because the default number of partitions to use when doing shuffle is set to 200 (it can lead to high shuffle block sizes that can sometimes exceed 2GB).

So what can be done:

  1. Increase the number of partitions (thereby, reducing the average partition size) by increasing the value of spark.sql.shuffle.partitions for Spark SQL or by calling repartition() or coalesce() on RDDs,
  2. Get rid of skew in data

It's good to know that Spark uses different logic for memory management when the number of partitions is greater than 2000 (uses high compression algorithm). So if you have ~2000 partitions it's worth bumping it up to 2001 which will result in smaller memory footprint.

Take-aways

Spark partitioning is available on all RDDs of key/value pairs and causes the system to group elements based on a function of each key.

Features

  • tuples in the same partition are guaranteed to be on the same machine,
  • each node in the cluster can contain more than one partition,
  • the total number of partitions are configurable (by default set to the total number of cores on all executor nodes)

Performance tuning checklist

  • have the correct number of partitions (according to cluster specification) - check this and that for guidance,
  • consider using custom partitioners,
  • check if your transformations preserve partition schema,
  •  check if memory could be optimized by bumping number of partitions to 2001

Settings

  • spark.default.parallelism - sets up the number of partitions to use for HashPartitioner (can be overridden when creating SparkContext object),
  • spark.sql.shuffle.partitions - controls the number of partitions for operations on DataFrames (default is 200)

As the final thought note that the number of partitions also determine how many files will be generated by actions saving an RDD to files.

Sources

Integrating Apache Spark 2.0 with PyCharm CE

The following post presents how to configure JetBrains PyCharm CE IDE to develop applications with Apache Spark 2.0+ framework.

  1. Download Apache Spark distribution pre-built for Hadoop (link).
  2. Unpack the archive. This directory will be later referred as $SPARK_HOME.
  3. Start PyCharm and create a new project File → New Project. Call it "spark-demo".
  4. Inside project create a new Python file - New → Python File. Call it run.py.
  5. Write a simple script counting the occurrences of A's and B's inside Spark's README.md file. Don't worry about the errors, we will fix them in next steps.
  6. Add required librariesPyCharm → Preferences ... → Project spark-demo → Project Structure → Add Content Root. Select all ZIP files from $SPARK_HOME/python/lib. Apply changes.
  7. Create a new run configuration. Go into Run → Edit Configurations → + → Python. Name it "Run with Spark" and select the previously created file as the script to be executed.
  8. Add environment variables. Inside created configuration add the corresponding environment variables. Save all changes.
  9. Run the script - Run → Run 'Run with Spark'. You should see that the script is executed properly within Spark context.

Now you can improve your working experience with IDE advanced features like debugging or code completion.

Happy coding.