Parallelism in Python¶
Ever wondered how to speed-up your operations by performing them in parallel? Turns out it's not as complicated as you might think.
First, we'll need some operations that we want to do in parallel. In this notebook, we'll uppercase each string in a list of strings. To simulate different inputs having different processing time, we'll have our function which does the uppercasing, fn
, also sleep for as many seconds as there are characters in the string, e.g. "hello"
will sleep for 5 seconds before returning "HELLO"
, "a"
will sleep for 1 second, etc.
import time
tokens = ["hello", "world", "how", "are", "you", "a", "b", "c", "d", "e", "f", "g"]
def fn(t):
time.sleep(len(t))
return t.upper()
ThreadPoolExecutor¶
The first method of parallelism we'll look at is using threads. The concurrent.futures
standard library has a ThreadPoolExecutor
which we can use to execute a function within a thread.
The ThreadPoolExecutor
when used as a context manager can use the map
method, which applies the function, given as the first argument, to all elements in the second argument.
It returns a generator, so we explicitly convert it to a list before printing it out.
from concurrent.futures import ThreadPoolExecutor
t0 = time.monotonic()
with ThreadPoolExecutor() as executor:
results = executor.map(fn, tokens)
dt = time.monotonic() - t0
print(list(results))
print(dt)
['HELLO', 'WORLD', 'HOW', 'ARE', 'YOU', 'A', 'B', 'C', 'D', 'E', 'F', 'G'] 5.0064839580000005
More explicitly, we can submit
each element with a function to apply (useful if we want to apply a different function to each element) and then get the result
. Each submit
creates a thread and applies the function to the input, creating a Future
object. We can get the actual result from the Future
using result
.
t0 = time.monotonic()
with ThreadPoolExecutor() as executor:
futures = [executor.submit(fn, t) for t in tokens]
results = [future.result() for future in futures]
dt = time.monotonic() - t0
print(results)
print(dt)
['HELLO', 'WORLD', 'HOW', 'ARE', 'YOU', 'A', 'B', 'C', 'D', 'E', 'F', 'G'] 5.007868625
If we didn't care about the order in which the results were returned, i.e. we don't care that the output order is not the same as the input order, then we can use as_completed
. This will get the results as they complete. Our example has the single character strings only wait for a second, so they should complete first. The order is not deterministic though.
from concurrent.futures import as_completed
t0 = time.monotonic()
with ThreadPoolExecutor() as executor:
futures = [executor.submit(fn, t) for t in tokens]
results = [future.result() for future in as_completed(futures)]
dt = time.monotonic() - t0
print(results)
print(dt)
['A', 'B', 'C', 'D', 'E', 'F', 'G', 'HOW', 'ARE', 'YOU', 'HELLO', 'WORLD'] 5.009487957999999
There's also a ProcessPoolExecutor
which has the exact same functions as the ThreadPoolExecutor
, but creates processes instead of threads.
However, I've found processes do not work in Jupyter Notebooks, which is apparently due to some iPython issue.
#from concurrent.futures import ProcessPoolExecutor
#t0 = time.monotonic()
#with ProcessPoolExecutor() as executor:
# futures = [executor.submit(fn, t) for t in tokens]
# results = [future.result() for future in as_completed(futures)]
#dt = time.monotonic() - t0
#print(results)
#print(dt)
When do you use a thread instead of a process?
When uses processes, each worker is run in an individual process. By running each worker in an individual process, it avoids the GIL (global interpreter lock). The GIL is used to prevent multiple workers from updating a variable at the same time, potentially causing bugs.
When using threads, each worker is a thread within the main process and is potentially slowed down by the GIL, e.g. if multiple threads wanted to use the same variable, they would each have to take it in turns.
So, why not always use processes? There's an overhead in spawning multiple processes. So which to use depends on what you're trying to do.
The general rule of thumb is:
- If your operations are I/O bound, e.g. they're mainly waiting for other things to run, such as external API calls, then use threads.
- If your operations are compute bound (aka CPU bound), e.g. they're mainly performing computations, then use processes.
Joblib¶
joblib
is another library used for parallelism. Personally, I think the API looks a bit weird, but joblib
seems to be common so it's a good idea to have a clue how it works.
One important thing to note with joblib
is that you explicitly need to tell it the number of workers to use with the n_jobs
argument. Unless you know better, using n_jobs=-1
(which uses all CPUs) is probably what you should be using.
import joblib
t0 = time.monotonic()
results = joblib.Parallel(n_jobs=-1)(joblib.delayed(fn)(t) for t in tokens)
dt = time.monotonic() - t0
print(results)
print(dt)
['HELLO', 'WORLD', 'HOW', 'ARE', 'YOU', 'A', 'B', 'C', 'D', 'E', 'F', 'G'] 5.411525584
ThreadPool¶
The final methods of parallelism we'll look at are ThreadPool
(threads) and Pool
(processes) from the multiprocessing.pool
standard library. However, the official Python documentation recommends using ThreadPoolExecutor
and ProcessPoolExecutor
instead. We'll go over them just in case we see them in the wild, though the APIs are similar in some respects.
We have a map
function similar to the ThreadPoolExecutor
. This one, however, always returns a list and not a generator.
from multiprocessing.pool import ThreadPool
t0 = time.monotonic()
with ThreadPool() as pool:
results = pool.map(fn, tokens)
dt = time.monotonic() - t0
print(results)
print(dt)
['HELLO', 'WORLD', 'HOW', 'ARE', 'YOU', 'A', 'B', 'C', 'D', 'E', 'F', 'G'] 5.0165569580000025
If we'd like to return a generator, then we can use imap
instead of map
.
Note: I've found that calling the list
function on the generator outside of the context manager occasionally causes Jupyter Notebooks to hang. I am not sure what the cause of the issue is, but recommend calling it inside the context manager unless necessary.
t0 = time.monotonic()
with ThreadPool() as pool:
results = list(pool.imap(fn, tokens))
dt = time.monotonic() - t0
print(results)
print(dt)
['HELLO', 'WORLD', 'HOW', 'ARE', 'YOU', 'A', 'B', 'C', 'D', 'E', 'F', 'G'] 5.0115710000000036
Similar to the as_completed
method, the pool
objects have an imap_unordered
which returns an unordered generator.
This also seems to have similar issues to above when calling list
outside the context mananger.
t0 = time.monotonic()
with ThreadPool() as pool:
results = list(pool.imap_unordered(fn, tokens))
dt = time.monotonic() - t0
print(results)
print(dt)
['A', 'C', 'B', 'E', 'F', 'D', 'HOW', 'ARE', 'YOU', 'G', 'HELLO', 'WORLD'] 5.008692875000001
As you may have guessed ThreadPool
creates threads. Processes are created by Pool
, which has the same API as ThreadPool
.
These processes have the same issue as the ProcessPoolExecutor
, where they throw errors in Jupyter Notebooks.
#from multiprocessing.pool import Pool
#t0 = time.monotonic()
#with Pool() as pool:
# results = list(pool.imap_unordered(fn, tokens)))
#dt = time.monotonic() - t0
#print(results)
#print(dt)
This should be enough information for basic parallel computation in Python.