Multiprocessing in Python

April 6, 2019

With increasing number of power hungry applications, the demand for speed and low latency has become a challenge in certain situations. However, the availability of machines with multiple processors/processors with multiple cores help us combat such situations. This post would guide you through using multiprocessing in python.

Introduction

In contemporary times, a lot of CPUs are being manufactured with multiple cores to boost performance by enabling parallelism and concurrency of applications. Consequently, such CPUs can be leveraged to write code which can run in parallel thereby speeding up your application. This is where the Multiprocessing package in python comes into play. This API is very similar to the threading module in Python.

The previous post on Multithreading in Python provides a clear explanation on the threading module, click here to read through the same if you haven’t.

Table of Contents

What is Multiprocessing ?

Multiprocessing in simple terms is defined as the use of two or more processors by an application within the bounds of a single central computing system. The Python multiprocessing module provides a clean and instinctive API to utilize parallel processing in python.

All processes are independent to each other and have their own share of resources (such as memory & processing power) for computation.

To access objects between the processes, the concept of Inter-Process Communication(IPC) must be used, which would also be discussed in this post.

Multiprocessing Example

Let’s start with a simple example to compute the square and square root of a set of numbers as 2 different processes.

We will create a Process object by importing the Process class and start both the processes.

Below is the Syntax for creating a Process Object

from multiprocessing import Process
Process(target=function_name, name='Process 1', args=(arg1, arg2, arg3), kwargs={arg4: 'abc'})

args : Accepts a tuple argument list
kwargs : Accepts a dict object
name : String

from multiprocessing import Process
import math

def calc_square(numbers):
    """
    Calculate the squares of the input set of numbers
    """
    squares = [i*i for i in numbers]
    print("Squares: {0}".format(squares))
        
def calc_square_root(numbers):
    """
    Calculate the square roots of the input set of numbers
    """
    square_roots = [round(math.sqrt(i), 2) for i in numbers]
    print("Square Roots: {0}".format(square_roots))
        
if __name__ == "__main__":
    number_set = list(range(1,11))
    
    # Create the Processes
    p1 = Process(target=calc_square, args=(number_set,))
    p2 = Process(target=calc_square_root, args=(number_set,))

    # Start the Processes
    p1.start()
    p2.start()

# Note: (1) is considered as int, whereas (1,) is considered as a tuple in python. So, in case 
# of a single argument an extra comma must be placed to project it as a tuple
Squares: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
Square Roots: [1.0, 1.41, 1.73, 2.0, 2.24, 2.45, 2.65, 2.83, 3.0, 3.16]

Multiprocessing Use Cases

  • Multiprocessing is the best fit for heavyweight tasks.
  • When you have multiple tasks which can run independently.
  • When there is not a lot of data sharing involved between the tasks. Although, data sharing is possible, it is quite complicated to use in comparison to threading.
  • When your applications are CPU bound and not I/O bound.

Multiprocessing vs Multithreading

Often a lot of people get confused with the ideas of multiprocessing and multithreading.

Below are some key differences,

  • Multiprocessing launches different processes which are independent to each other. On the other hand, multithreading launches threads which are still dependent on the parent process.

  • Each Process in Multiprocessing has its own CPU Power and memory which is distinct to each process. Whereas, in multithreading the individual threads utilize the same resources(CPU + Memory) present in the parent process.

  • If any process fails due to an exception, the other processes would still be running in case of Multiprocessing. On the contrary, if any thread fails, all the other threads/tasks are terminated.

  • Multiprocessing requires some special objects or some sort of shared memory to access objects in different processes. In contrast to this, sharing objects/data between threads are much easier since they share the same memory space.

  • Multiprocessing is more suitable for CPU intensive applications, wheras Multithreading is the best fit when your applications are I/O bound.

image.png

For more details on MultiThreading in Python, click here.

Multiprocessing Pool

The Pool object is one of the best features provided by this package. This is where this module stands out when compared to the threading module.

Features of Pool:

  • Pool maps the data to the respective workers and aggregates it back to display the final result.
  • The Pool object gives you the liberty to specify the number of worker processes required for your execution. For example, if you specify 4 workers and you have 8 jobs to complete, 4 of them would start first and as and when a worker completes it’s job, it immediately starts executing the next process.
  • If the number of worker processes is not specified, Pool will use the number of CPU Cores present in your machine by default. This isn’t recommended because it might hog up all the resources in your machine causing other jobs to slow down.

The example present below demonstrates the usage of Pool. To better illustrate the way processes are being started, the process_id of the respective process is displayed using os.getpid method.

As shown below, since we specified 4 workers, the processes 9990 to 9993 are recursively used until the execution is complete.

from multiprocessing import Pool
import os

def compute_squares(num):
    print(f"Process {os.getpid()} calculating square of {num}")
    return num*num

if __name__ == "__main__":
    # Create the Pool object with the number of workers, 4 in this case.
    p = Pool(4)
    
    # Map the jobs to the individual workers
    result = p.map(compute_squares, range(12))
    
    # Close the Pool object
    p.close()
    print(f"Result is: {result}")
Process 9991 calculating square of 1
Process 9992 calculating square of 2
Process 9990 calculating square of 0
Process 9993 calculating square of 3
Process 9990 calculating square of 5
Process 9992 calculating square of 4
Process 9991 calculating square of 6
Process 9992 calculating square of 7
Process 9990 calculating square of 8
Process 9991 calculating square of 9
Process 9993 calculating square of 10
Process 9990 calculating square of 11
Result is: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121]

Tip: Instantiating and calling the Pool method can also be done in a much more concise way.

p = Pool(4)
result = p.map(calc_squares, range(12))
p.close()

Above 3 statements can be replaced with,

with Pool(4) as p:
    result = p.map(calc_squares, range(12)

What if you have more than 1 argument in your function ?

In this case, you can use the pool.starmap function (Python 3.3+) or use an alternate method via a workaround to send 2 arguments.

Below is an example of using more than 1 argument with map. A list of tuples can be passed to an intermediate function which further unpacks these tuples into args for the original function.

from multiprocessing import Pool
import os

def calc_product(num1, num2, num3):
    print(f"Process {os.getpid()} calculating product of {num1, num2, num3}")
    return num1*num2*num3

def exec_func(nums):
    """
    Intermediate method to call calc_product method
    """
    return calc_product(*nums)

if __name__ == "__main__":
    with Pool(4) as p:
        result = p.map(exec_func, [(1,2,3),(3,4,5)])
    print(f"Result is: {result}")
Process 11123 calculating product of (1, 2, 3)
Process 11124 calculating product of (3, 4, 5)
Result is: [6, 60]

Pool.starmap

Starmap is available from Python 3.3 . The elements passsed to a starmap must be an iterable consisting of iterables. If you are using an older version of Python the previous method can be used instead.

from multiprocessing import Pool
import os

def calc_product(num1, num2, num3):
    print(f"Process {os.getpid()} calculating product of {num1, num2, num3}")
    return num1*num2*num3

if __name__ == "__main__":
    with Pool(2) as p:
        result = p.starmap(calc_product, [(1,2,3),(3,4,5)])
    print(f"Result is: {result}")
Process 11144 calculating product of (1, 2, 3)
Process 11145 calculating product of (3, 4, 5)
Result is: [6, 60]

Multiprocessing vs Serial Processing Performance Test

Let’s use the Pool method to perform a simple benchmarking between serial and parallel processing in python

from multiprocessing import Pool
import time

def calc_cubes(_):
    """
    Calculate the cube of numbers ranging till 1000
    """
    return list(map(lambda n: n*n*n, range(1000)))

if __name__ == "__main__":
    t1 = time.time()
    with Pool(5) as p:
        p.map(calc_cubes, range(100000))  # Call the calc_cubes method 100000 times with 5 concurrent tasks.
    print("Parallel Processing Time Taken: {0}".format(time.time() - t1))
    
    print("Starting Serial Processing")
    t2 = time.time()
    list(map(calc_cubes, range(100000)))  # Call the calc_cubes method 100000 times sequentially.
    print("Sequential Processing Time Taken: {0}".format(time.time() - t2))
Parallel Processing Time Taken: 12.06329607963562
Starting Serial Processing
Sequential Processing Time Taken: 20.98725724220276

From the above example, we can see that it took almost double the time for our computation when done via a single process.

Sharing Data Between Processes

Before we understand how to share data between processes, let’s get familiar with the fundamentals of different processes.

Each Process will have its own share of resources, meaning the memory and CPU allocation are native to each process.

Every process will have it’s own memory or address space which is native to that process alone. So, data in these processes would be stored in these individual memories. Hence, sharing of objects would require some sort of communication between the processes.

image.png

To share data between these processes, some sort of communication must exist between the processes. This is where the concept of Inter Process Communication (IPC) comes into play. We would require a common storage area or shared memory to access these objects from the different launched processes.

The Multiprocessing Module provides some objects designed specifically to share data between processes. Some of these methods are described below.

Note: If your processes don’t require a lot of CPU Power but involve a lot of common shared variables, then threading module is recommended in this case.

MultiProcessing Queues

The MultiProcessing Queue is similar to the Queue class present in Python

Below is an example to fetch the even numbers from an input number set and place it inside a multiprocessing queue.

from multiprocessing import Queue, Process
import time

def get_even(nums, q):
    """
    Obtain the even numbers from a set of input numbers
    Multiprocessing Queue is passed as an additional argument
    Queue will consist of all even numbers
    """
    q.put([n for n in nums if n%2 == 0])
#     q.put(list(filter(lambda n: n%2==0, nums))) # alternate method

if __name__ == "__main__":
    q = Queue()
    p = Process(target=get_even, args=(range(100), q))
    p.start()
    p.join()
    
    # Display the contents of the queue until it is empty
    while not q.empty():
        print(f"Queue: {q.get()}")
Queue: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98]

Note: Join must be used in scenarios where you want the program to wait before the code present after the join statement is executed.

For example:
create process
process.join()
print(“complete”) – this will be printed only after process is completed.

Multiprocessing Value and Lock

If you have a requirement to maintain and modify a shared variable between the processes, we can make use of the Value object from the module.

Syntax for creating a Value object is,

val = Value(typecode, *args)

Commonly used typecodes are,
‘i’ - Integer
’d’ - Float
‘u’ - Unicode String

Let’s simulate an example where in we multiply and divide a number by 10 to finally arrive at the result 1.

Before lock:

from multiprocessing import Process, Value
import time

def add(val):
    for i in range(10000):
        val.value += 1
        
def subtract(val):
    for i in range(10000):
        val.value -= 1
        
if __name__ == "__main__":
    
    # Create Value Object
    val = Value('i', 1) # i represents integer and 1 is the initial value
    
    p1 = Process(target=add, args=(val, ))
    p2 = Process(target=subtract, args=(val, ))
    
    # Start the Tasks
    p1.start()
    p2.start()
    
    # Wait for the Tasks to complete
    p1.join()
    p2.join()
    
    print("Value is {0}".format(val.value))
        
Value is 273

As you can see above, the actual result obtained is different from the expected result of 1. This is because both the processes are accessing the same object which leads to a conflict of both processes modifying the value at the same time.

Let’s run the same example with Multiprocessing Lock.

After Lock:

from multiprocessing import Process, Value, Lock
import time

def add(val, lock):
    for i in range(10000):
        lock.acquire() # Acquire Lock
        val.value += 1
        lock.release() # Release the lock
        
def subtract(val, lock):
    for i in range(10000):
        with lock:     # Alternate method to create a lock
            val.value -= 1
        
if __name__ == "__main__":
    lock = Lock()
    val = Value('i', 1) # i represents integer and 1 is the initial value
    p1 = Process(target=add, args=(val, lock))
    p2 = Process(target=subtract, args=(val, lock))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()
    
    print("Value is {0}".format(val.value))
Value in add: 976
Value in subtract: 1
Value is 1

Multiprocessing Manager

A Multiprocessing manager maintains an independent server process where in these python objects are held.

Multiprocessing Library also provides the Manager class which gives access to more synchronization objects to use between processes.

The Manager object supports types such as lists, dict, Array, Queue, Value etc.

When to use Multiprocessing Manager ?

  • When there is a requirement to use synchronization objects such as lists, dict etc.
  • If the processes are running across a network and require data sharing then the multiprocessing manager is a good fit.

Below example shows how to use a Manager List in a pool of workers

from multiprocessing import pool, Manager
import sys

def update_string(val, l):
    l.append(val)

if __name__ == "__main__":
    strs = ['with', 'great', 'power', 'comes', 'great', 'responsibilty']
    
    # Create the Manager Object
    mgr = Manager()
    
    # Create a manager list
    li = mgr.list()
    
    with Pool(4) as p:
        p.starmap(update_string, list(map(lambda x: (x,li), strs)))
    print(f"List: {li}")
List: ['with', 'great', 'power', 'comes', 'responsibilty', 'great']

Conclusion

By now you must be familiar with the concepts of multiprocessing in python and also be clear with the distinction between multiprocessing and multithreading.

The time has come to speed up your applications with parallel processing. But do remember, with great power comes great responsibilty.

Comments and feedback are welcome. Cheers!

comments powered by Disqus