Spark Repartition & Coalesce - Explained

November 20, 2018

All data processed by spark is stored in partitions. Today we discuss what are partitions, how partitioning works in Spark (Pyspark), why it matters and how the user can manually control the partitions using repartition and coalesce for effective distributed computing.

Introduction

Spark is a framework which provides parallel and distributed computing on big data. To perform it’s parallel processing, spark splits the data into smaller chunks(i.e. partitions) and distributes the same to each node in the cluster to provide a parallel execution of the data. This partitioning of data is performed by spark’s internals and the same can also be controlled by the user.

Table of Contents

1. What is Partitioning in Spark ?

A Partition in simple terms is a split in the input data, so partitions in spark are basically smaller logical chunks or divisions of the input data. Spark distributes this partitioned data among the different nodes to perform distributed processing on the data.

Let’s dive into a practical example and create a simple RDD using the sc.parallelize method to determine the number of partitions spark creates by default.

rdd = sc.parallelize(range(1,11))
rdd.getNumPartitions()
8


As shown above, spark creates 8 partitions by default here. Now, you must be wondering where this default number is come from.

Spark has a default parallelism parameter which is determined by, sc.defaultParallelism. When we run this method, it returns 8 as shown below,

sc.defaultParallelism
8


This is how the data is present inside each partition. Spark uses an internal Hash Partitioning Scheme to split the data into these smaller chunks.

We can use the rdd.glom() method to display the partitions in a list.

glom - returns an RDD having the elements within each partition in a separate list

rdd.glom().collect()
[[1], [2], [3], [4, 5], [6], [7], [8], [9, 10]]

Let’s visualize the above collect operation in the Spark WebUI(if you are using spark locally, navigate to https://localhost:4040 to see the spark webui or if spark is being accessed via a cluster navigate to your cluster specific localhost webUI)

Note: Spark shell notifies a port number before creating a Sparksession.

Since 8 partitions are present, 8 executors would be launched for this action, as shown below,

image.png

How to see number of Partitions in a Dataframe ?

The above method used for an RDD can also be applied to a dataframe. Let’s convert the RDD to a Dataframe and apply the same method. We can see that the same number of partitions are present.

df = rdd.map(lambda x: (x, )).toDF() # converting rdd to Dataframe
df.rdd.glom().collect()
[[Row(_1=1)],
 [Row(_1=2)],
 [Row(_1=3)],
 [Row(_1=4), Row(_1=5)],
 [Row(_1=6)],
 [Row(_1=7)],
 [Row(_1=8)],
 [Row(_1=9), Row(_1=10)]]

How to control the number of partitions when reading / writing files in Spark?

Now, you must’ve had the question in your mind as to how spark partitions the data when reading textfiles. Let’s read a simple textfile and see the number of partitions here.

Let’s read the CSV file which was the input dataset in my first post, [pyspark dataframe basic operations]

df2 = spark.read.format('csv').options(delimiter=',', header=True).load('/Users/ecom-ahmed.noufel/Downloads/population.csv')
df2.rdd.getNumPartitions()
1


We can see that only 1 partition is created here. Alright let’s break this down,

Spark by default creates 1 partition for every 128 MB of the file.
So if you are reading a file of size 1GB, it creates 10 partitions.

So how to control the number of bytes a single partition can hold ?

The file being read here is of 3.6 MB, hence only 1 partition is created in this case.

The no. of partitions is determined by spark.sql.files.maxPartitionBytes parameter, which is set to 128 MB, by default.

spark.conf.get("spark.sql.files.maxPartitionBytes") # -> 128 MB by default
'134217728'

Note: The files being read must be splittable by default for spark to create partitions when reading the file. So, in case of compressed files like snappy, gz or lzo etc, a single partition is created irrespective of the size of the file.

Let’s manually set the spark.sql.files.maxPartitionBytes to 1MB, so that we get 4 partitions upon reading the file.

spark.conf.set("spark.sql.files.maxPartitionBytes", 1000000)
spark.conf.get("spark.sql.files.maxPartitionBytes")
'1000000'
df3 = spark.read.format('csv').options(delimiter=',', header=True).load('/Users/ecom-ahmed.noufel/Downloads/population.csv')
df3.rdd.getNumPartitions()
4

As discussed, 4 partitions were created.

Caution: The above manual setting of the spark.sql.files.maxPartitionBytes to 1MB was just for the purpose of this demo and is not recommended in practical scenarios. It is recommended to use the default setting or set a value based on your input size and cluster hardware size.

2. Why is Partitioning required ?

Partitioning is the sole basis by which spark distributes data among different nodes to thereby producing a distributed and parallel execution of the data with reduced latency.

If this concept of partitioning is not present in spark, then there would be no parallel processing existing in spark.

Let’s analyze this with a simple example where we create an RDD with a single partition and perform an action on the same.

rdd2 = sc.parallelize(range(20), 1) # the second optional parameter specifies the number of partitions, which is 1 here
rdd2.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

When the above action is seen on the Spark WebUI, only a single executor would be issued to process this data.

So, imagine a case where you are processing a huge volume of data without the concept of partitioning, then the entire data would be processed by a single executor taking a lot of time and memory.

image.png

3. Repartition

Now what if you wish to control these partitions by yourself. This is where the methods of repartition and coalesce come to effect.

What is Repartition?

Repartition is a method in spark which is used to perform a full shuffle on the data present and creates partitions based on the user’s input.

The resulting data is hash partitioned and the data is equally distributed among the partitions.

Methods to Repartition a Dataframe

There are 2 ways to repartition a dataframe, Specifying a set of columns or column along with the partition size (or) Specifying an int value to create the number of partitions

Method 1 : Repartition using Column Name
Now let’s repartition our dataset using the first method using the column present in the dataframe and check the number of partitions being created after repartition.

# df.printSchema()   #> root
                     # |-- _1: long (nullable = true)

df = df.repartition('_1')
df.rdd.getNumPartitions()
200


After our repartition, 200 partitions are created by default. This is because of the value present in spark.sql.shuffle.partitions. Spark uses the value present here to create the number of partitions after the shuffle operation. By default, 200 partitions are created if the number is not specified in the repartition clause.

spark.conf.get("spark.sql.shuffle.partitions")
'200'


Method 2 : Repartition using integer value
Now let’s use method 2 to specify an integer value to create the number of partitions as per our requirement. Let’s repartition the dataset to 3 partitions.

df = df.repartition(3)
df.rdd.glom().collect()
[[Row(_1=5), Row(_1=6), Row(_1=7), Row(_1=9)],
 [Row(_1=1), Row(_1=2), Row(_1=4)],
 [Row(_1=3), Row(_1=8), Row(_1=10)]]

Now, if you save the above dataframe as CSV, 3 files would be created with each one having contents as below,

Partition1 : 5,6,7,9
Partition2 : 1,2,4
Partition3 : 3,8,10

Let’s visualise the same on the Spark WebUI

The above execution when seen on the spark console would show 3 tasks being issued on the collect operation(stage 2).

image.png

Caution: Repartition performs a full shuffle on the input dataset, hence it would be a costly operation if done on large datasets.

Note: Use Repartition only when you want to increase the number of partitions (or) if you want to perform a full shuffle on the data.

4. Coalesce

Coalesce is another method to partition the data in a dataframe. This is mainly used to reduce the number of partitions in a dataframe.

Unlike repartition, coalesce doesn’t perform a shuffle to create the partitions.
Suppose there are 100 partitions with 10 records in each partition, and if the partition size is reduced to 50, it would retain 50 partitions and append the other values to these existing partitions thereby having 20 records in each partition.

Let’s analyze this using our dataset, let’s reduce the number of partitions to 2 now,

df3 = df.coalesce(2)
df3.rdd.glom().collect()
[[Row(_1=5), Row(_1=6), Row(_1=7), Row(_1=9)],
 [Row(_1=1), Row(_1=2), Row(_1=4), Row(_1=3), Row(_1=8), Row(_1=10)]]


As shown above, the data from the 3rd partition is removed and appended with the 2nd partition, proving that there is no shuffle process going on here. Hence, this would be much less memory intensive.

Now, if you save the above dataframe as CSV, 3 files would be created with each one having contents as below,

Partition1 : 5,6,7,9
Partition2 : 1,2,4,3,8,10

So, when your motive is to reduce the number of partitions from say 1000 to 500, then coalesce is a better option. But the output data would not be equally partitioned.

Can you increase the number of partitions using Coalesce ?

The Answer is no. Coalesce has no shuffle taking place and the algorithm is designed to move data from some partitions to existing partitions.

Let’s prove the above point with an example. If we try to increase the coalesce number to 8, we would still receive the existing number of partitions present before the coalesce function, so in our case only 3 partitions would be the output.

df3 = df3.coalesce(8)
df3.rdd.getNumPartitions()
3

5. Repartition vs Coalesce - When to use what ?

Repartition can be used under these scenarios,

  • when you want your output partitions to be of equally distributed chunks.
  • increase the number of partitions.
  • perform a shuffle of the data and create partitions.

Coalesce can be used under the following scenarios,

  • when you want to decrease the number of partitions.
  • in order to avoid shuffle when decreasing partitions.

Caution: Even though coalesce seems to be useful when decreasing partitions, it also reduces the degree of parallelism when you want to partition your data. Example, if you do an extreme coalesce to 1 partition, then all the computation would take place on a single node which is not a good practice. In this case repartition can be used.

Let’s make better sense of the above caution of what happens when we perform a coalesce(1).
We’ll create a dataframe with 8 partitions(default in our case) for our example and let’s use coalesce to reduce this to a single partition

from pyspark.sql.types import IntegerType
df4 = spark.createDataFrame(range(10), IntegerType())

df4.coalesce(1).write.format('csv').mode('Overwrite').save('/path-to-file/sample1')

For the above coalesce operation, all computation takes place on a single node and not utilizing the other ones. So, in a real world problem with a huge dataset, this would create a huge overhead on a single node thereby slowing down the process and degrading the performance. The snapshot present below clearly depicts a single task being issued for the above operation,

image.png

Now let’s perform the same process using repartition.

df4.repartition(1).write.format('csv').mode('Overwrite').save('/path-to-file/sample2')

For the above case, repartition first performs a full shuffle of the data. The data here resides in 8 partitions, so 8 executors are launched to perform the shuffle.

After completion of shuffle, the data is placed into a single node described by the second stage. So, the save operation is distributed among the different executors. Performance wise this might increase the latency a bit depending on the number of partitions created, but the overhead on a single node would be avoided. The snapshot present below indicates the same.

image.png

6. Conclusion

By now, you must be clear on the concepts of partitioning in spark and how spark utilizes partitioning to perform parallel distributed processing.

In addition to this, the repartition and coalesce methods should be much clearer and you can use either one depending upon your use case.

Comments and feedback are encouraged on the comment section below. Cheers!

comments powered by Disqus