Python Multithreading and Multiprocessing Tutorial

Note:By popular request that I demonstrate some alternative techniques—including async/await, only available since the advent of Python 3.5—I’ve addedsome updates at the end of the article. Enjoy!

Discussions criticizing Python often talk about how it is difficult to use Python for multithreaded work, pointing fingers at what is known as the global interpreter lock (affectionately referred to as the GIL
) that prevents multiple threads of Python code from running simultaneously. Due to this, the Python multithreading module doesn’t quite behave the way you would expect it to if you’re not aPython developer and you are coming from other languages such as C++ or Java. It must be made clear that one can still write code in Python that runs concurrently or in parallel and make a stark difference in resulting performance, as long as certain things are taken into consideration. If you haven’t read it yet, I suggest you take a look at Eqbal Quran’s article on concurrency and parallelism in Ruby
here on the Toptal Engineering Blog.

In this Python concurrency tutorial, we will write a small Python script to download the top popular images from Imgur. We will start with a version that downloads images sequentially, or one at a time. As a prerequisite, you will have to register an application on Imgur
. If you do not have an Imgur account already, please create one first.

The scripts in this tutorial have been tested with Python 3.6.4. With some changes, they should also run with Python 2—urllib is what has changed the most between these two versions of Python.

Getting Started with Python Multithreading

Let us start by creating a Python module, named download.py
. This file will contain all the functions necessary to fetch the list of images and download them. We will split these functionalities into three separate functions:

  • get_links
  • download_link
  • setup_download_dir

The third function, setup_download_dir
, will be used to create a download destination directory if it doesn’t already exist.

Imgur’s API requires HTTP requests to bear the Authorization
header with the client ID. You can find this client ID from the dashboard of the application that you have registered on Imgur, and the response will be JSON encoded. We can use Python’s standard JSON library to decode it. Downloading the image is an even simpler task, as all you have to do is fetch the image by its URL and write it to a file.

This is what the script looks like:

import json
import logging
import os
from pathlib import Path
from urllib.request import urlopen, Request

logger = logging.getLogger(__name__)

def get_links(client_id):
   headers = {'Authorization': 'Client-ID {}'.format(client_id)}
   req = Request('https://api.imgur.com/3/gallery/', headers=headers, method='GET')
   with urlopen(req) as resp:
       data = json.loads(resp.readall().decode('utf-8'))
   return map(lambda item: item['link'], data['data'])

def download_link(directory, link):
   logger.info('Downloading %s', link)
   download_path = directory / os.path.basename(link)
   with urlopen(link) as image, download_path.open('wb') as f:
       f.write(image.readall())

def setup_download_dir():
   download_dir = Path('images')
   if not download_dir.exists():
       download_dir.mkdir()
   return download_dir

Next, we will need to write a module that will use these functions to download the images, one by one. We will name this single.py
. This will contain the main function of our first, naive version of the Imgur image downloader. The module will retrieve the Imgur client ID in the environment variable IMGUR_CLIENT_ID
. It will invoke the setup_download_dir
to create the download destination directory. Finally, it will fetch a list of images using the get_links
function, filter out all GIF and album URLs, and then use download_link
to download and save each of those images to the disk. Here is what single.py
looks like:

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)

def main():
   ts = time()
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   for link in links:
       download_link(download_dir, link)
   print('Took {}s'.format(time() - ts))

if __name__ == '__main__':
   main()

On my laptop, this script took 19.4 seconds to download 91 images. Please do note that these numbers may vary based on the network you are on. 19.4 seconds isn’t terribly long, but what if we wanted to download more pictures? Perhaps 900 images, instead of 90. With an average of 0.2 seconds per picture, 900 images would take approximately 3 minutes. For 9000 pictures it would take 30 minutes. The good news is that by introducing concurrency or parallelism, we can speed this up dramatically.

All subsequent code examples will only show import statements that are new and specific to those examples. For convenience, all of these Python scripts can be found in this GitHub repository
.

Using Threads for Concurrency and Parallelism

Threading is one of the most well-known approaches to attaining Python concurrency and parallelism. Threading is a feature usually provided by the operating system. Threads are lighter than processes, and share the same memory space.

In our Python threading tutorial, we will write a new module to replace single.py
. This module will create a pool of eight threads, making a total of nine threads including the main thread. I chose eight worker threads because my computer has eight CPU cores and one worker thread per core seemed a good number for how many threads to run at once. In practice, this number is chosen much more carefully based on other factors, such as other applications and services running on the same machine.

This is almost the same as the previous one, with the exception that we now have a new class, DownloadWorker
, which is a descendent of the Python Thread
class. The run method has been overridden, which runs an infinite loop. On every iteration, it calls self.queue.get()
to try and fetch a URL to from a thread-safe queue. It blocks until there is an item in the queue for the worker to process. Once the worker receives an item from the queue, it then calls the same download_link
method that was used in the previous script to download the image to the images directory. After the download is finished, the worker signals the queue that that task is done. This is very important, because the Queue keeps track of how many tasks were enqueued. The call to queue.join()
would block the main thread forever if the workers did not signal that they completed a task.

from queue import Queue
from threading import Thread

class DownloadWorker(Thread):
   def __init__(self, queue):
       Thread.__init__(self)
       self.queue = queue

   def run(self):
       while True:
           # Get the work from the queue and expand the tuple
           directory, link = self.queue.get()
           download_link(directory, link)
           self.queue.task_done()

def main():
   ts = time()
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   # Create a queue to communicate with the worker threads
   queue = Queue()
   # Create 8 worker threads
   for x in range(8):
       worker = DownloadWorker(queue)
       # Setting daemon to True will let the main thread exit even though the workers are blocking
       worker.daemon = True
       worker.start()
   # Put the tasks into the queue as a tuple
   for link in links:
       logger.info('Queueing {}'.format(link))
       queue.put((download_dir, link))
   # Causes the main thread to wait for the queue to finish processing all the tasks
   queue.join()
   print('Took {}'.format(time() - ts))

Running this Python threading example script on the same machine used earlier results in a download time of 4.1 seconds! That’s 4.7 times faster than the previous example. While this is much faster, it is worth mentioning that only one thread was executing at a time throughout this process due to the GIL. Therefore, this code is concurrent but not parallel. The reason it is still faster is because this is an IO bound task. The processor is hardly breaking a sweat while downloading these images, and the majority of the time is spent waiting for the network. This is why Python multithreading can provide a large speed increase. The processor can switch between the threads whenever one of them is ready to do some work. Using the threading module in Python or any other interpreted language with a GIL can actually result in reduced performance. If your code is performing a CPU bound task, such as decompressing gzip files, using the threading
module will result in a slower execution time. For CPU bound tasks and truly parallel execution, we can use the multiprocessing module.

While the de facto
reference Python implementation—CPython–has a GIL, this is not true of all Python implementations. For example, IronPython, a Python implementation using the .NET framework, does not have a GIL, and neither does Jython, the Java-based implementation. You can find a list of working Python implementations here
.

Related: Python Best Practices and Tips by Toptal Developers

Spawning Multiple Processes

The multiprocessing module is easier to drop in than the threading module, as we don’t need to add a class like the Python threading example. The only changes we need to make are in the main function.

To use multiple processes, we create a multiprocessing Pool
. With the map method it provides, we will pass the list of URLs to the pool, which in turn will spawn eight new processes and use each one to download the images in parallel. This is true parallelism, but it comes with a cost. The entire memory of the script is copied into each subprocess that is spawned. In this simple example, it isn’t a big deal, but it can easily become serious overhead for non-trivial programs.

from functools import partial
from multiprocessing.pool import Pool

def main():
   ts = time()
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   download = partial(download_link, download_dir)
   with Pool(8) as p:
       p.map(download, links)
   print('Took {}s'.format(time() - ts))

Distributing to Multiple Workers

While the threading and multiprocessing modules are great for scripts that are running on your personal computer, what should you do if you want the work to be done on a different machine, or you need to scale up to more than the CPU on one machine can handle? A great use case for this is long-running back-end tasks for web applications. If you have some long-running tasks, you don’t want to spin up a bunch of sub-processes or threads on the same machine that need to be running the rest of your application code. This will degrade the performance of your application for all of your users. What would be great is to be able to run these jobs on another machine, or many other machines.

A great Python library for this task is RQ
, a very simple yet powerful library. You first enqueue a function and its arguments using the library. This pickles
the function call representation, which is then appended to a Redis
list. Enqueueing the job is the first step, but will not do anything yet. We also need at least one worker to listen on that job queue.

The first step is to install and run a Redis server on your computer, or have access to a running Redis server. After that, there are only a few small changes made to the existing code. We first create an instance of an RQ Queue and pass it an instance of a Redis server from the redis-py library
. Then, instead of just calling our download_link
method, we call q.enqueue(download_link, download_dir, link)
. The enqueue method takes a function as its first argument, then any other arguments or keyword arguments are passed along to that function when the job is actually executed.

One last step we need to do is to start up some workers. RQ provides a handy script to run workers on the default queue. Just run rqworker
in a terminal window and it will start a worker listening on the default queue. Please make sure your current working directory is the same as where the scripts reside in. If you want to listen to a different queue, you can run rqworker queue_name
and it will listen to that named queue. The great thing about RQ is that as long as you can connect to Redis, you can run as many workers as you like on as many different machines as you like; therefore, it is very easy to scale up as your application grows. Here is the source for the RQ version:

from redis import Redis
from rq import Queue

def main():
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   q = Queue(connection=Redis(host='localhost', port=6379))
   for link in links:
       q.enqueue(download_link, download_dir, link)

However, RQ is not the only Python job queue solution. RQ is easy to use and covers simple use cases extremely well, but if more advanced options are required, other Python 3 queue solutions (such asCelery) can be used.

Python Multithreading vs. Multiprocessing

If your code is IO bound, both multiprocessing and multithreading in Python will work for you. Multiprocessing is a easier to just drop in than threading but has a higher memory overhead. If your code is CPU bound, multiprocessing is most likely going to be the better choice—especially if the target machine has multiple cores or CPUs. For web applications, and when you need to scale the work across multiple machines, RQ is going to be better for you.

Related: Become More Advanced: Avoid the 10 Most Common Mistakes That Python Programmers Make

Update

Python concurrent.futures

Something new since Python 3.2 that wasn’t touched upon in the original article is the concurrent.futures
package. This package provides yet another way to use concurrency and parallelism with Python.

In the original article, I mentioned that Python’s multiprocessing module would be easier to drop into existing code than the threading module. This was because the Python 3 threading module required subclassing the Thread
class and also creating a Queue
for the threads to monitor for work.

Using a concurrent.futures.ThreadPoolExecutor
makes the code almost identical to the multiprocessing module.

import logging
import os
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)

    # By placing the executor inside a with block, the executors shutdown method
    # will be called cleaning up threads.
    # 
    # By default, the executor sets number of workers to 5 times the number of
    # CPUs.
    with ThreadPoolExecutor() as executor:

        # Create a new partially applied function that stores the directory
        # argument.
        # 
        # This allows the download_link function that normally takes two
        # arguments to work with the map function that expects a function of a
        # single argument.
        fn = partial(download_link, download_dir)

        # Executes fn concurrently using threads on the links iterable. The
        # timeout is for the entire process, not a single call, so downloading
        # all images must complete within 30 seconds.
        executor.map(fn, links, timeout=30)


if __name__ == '__main__':
    main()

Now that we have all these images downloaded with our Python ThreadPoolExecutor, we can use them to test a CPU-bound task. We can create thumbnail versions of all the images in both a single-threaded, single-process script and then test a multiprocessing-based solution.

We are going to use the Pillow
library to handle the resizing of the images.

Here is our initial script.

import logging
from pathlib import Path
from time import time

from PIL import Image

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


def create_thumbnail(size, path):
    """
    Creates a thumbnail of an image with the same name as image but with
    _thumbnail appended before the extension.  E.g.:

    >>> create_thumbnail((128, 128), 'image.jpg')

    A new thumbnail image is created with the name image_thumbnail.jpg

    :param size: A tuple of the width and height of the image
    :param path: The path to the image file
    :return: None
    """
    image = Image.open(path)
    image.thumbnail(size)
    path = Path(path)
    name = path.stem + '_thumbnail' + path.suffix
    thumbnail_path = path.with_name(name)
    image.save(thumbnail_path)


def main():
    ts = time()
    for image_path in Path('images').iterdir():
        create_thumbnail((128, 128), image_path)
    logging.info('Took %s', time() - ts)


if __name__ == '__main__':
    main()

This script iterates over the paths in the images
folder and for each path it runs the create_thumbnail function. This function uses Pillow to open the image, create a thumbnail, and save the new, smaller image with the same name as the original but with _thumbnail
appended to the name.

Running this script on 160 images totaling 36 million takes 2.32 seconds. Lets see if we can speed this up using a ProcessPoolExecutor
.

import logging
from pathlib import Path
from time import time
from functools import partial

from concurrent.futures import ProcessPoolExecutor

from PIL import Image

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


def create_thumbnail(size, path):
    """
    Creates a thumbnail of an image with the same name as image but with
    _thumbnail appended before the extension. E.g.:

    >>> create_thumbnail((128, 128), 'image.jpg')

    A new thumbnail image is created with the name image_thumbnail.jpg

    :param size: A tuple of the width and height of the image
    :param path: The path to the image file
    :return: None
    """
    path = Path(path)
    name = path.stem + '_thumbnail' + path.suffix
    thumbnail_path = path.with_name(name)
    image = Image.open(path)
    image.thumbnail(size)
    image.save(thumbnail_path)


def main():
    ts = time()
    # Partially apply the create_thumbnail method, setting the size to 128x128
    # and returning a function of a single argument.
    thumbnail_128 = partial(create_thumbnail, (128, 128))

    # Create the executor in a with block so shutdown is called when the block
    # is exited.
    with ProcessPoolExecutor() as executor:
        executor.map(thumbnail_128, Path('images').iterdir())
    logging.info('Took %s', time() - ts)


if __name__ == '__main__':
    main()

The create_thumbnail
method is identical to the last script. The main difference is the creation of a ProcessPoolExecutor
. The executor’s map
method is used to create the thumbnails in parallel. By default, the ProcessPoolExecutor
creates one subprocess per CPU. Running this script on the same 160 images took 1.05 seconds—2.2 times faster!

Async/Await (Python 3.5+ only)

One of the most requested items in the comments on the original article was for an example using Python 3’s asyncio
module. Compared to the other examples, there is some new Python syntax that may be new to most people and also some new concepts. An unfortunate additional layer of complexity is caused by Python’s built-in urllib
module not being asynchronous. We will need to use an async HTTP library to get the full benefits of asyncio. For this, we’ll use aiohttp
.

Let’s jump right into the code and a more detailed explanation will follow.

import asyncio
import logging
import os
from time import time

import aiohttp

from download import setup_download_dir, get_links

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


async def async_download_link(session, directory, link):
    """
    Async version of the download_link method we've been using in the other examples.
    :param session: aiohttp ClientSession
    :param directory: directory to save downloads
    :param link: the url of the link to download
    :return:
    """
    download_path = directory / os.path.basename(link)
    async with session.get(link) as response:
        with download_path.open('wb') as f:
            while True:
                # await pauses execution until the 1024 (or less) bytes are read from the stream
                chunk = await response.content.read(1024)
                if not chunk:
                    # We are done reading the file, break out of the while loop
                    break
                f.write(chunk)
    logger.info('Downloaded %s', link)


# Main is now a coroutine
async def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    # We use a session to take advantage of tcp keep-alive
    # Set a 3 second read and connect timeout. Default is 5 minutes
    async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session:
        tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)]
        # gather aggregates all the tasks and schedules them in the event loop
        await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == '__main__':
    ts = time()
    # Create the asyncio event loop
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        # Shutdown the loop even if there is an exception
        loop.close()
    logger.info('Took %s seconds to complete', time() - ts)

There is quite a bit to unpack here. Let’s start with the main entry point of the program. The first new thing we do with the asyncio module is to obtain the event loop. The event loop handles all of the asynchronous code. Then, the loop is run until complete and passed the main
function. There is a piece of new syntax in the definition of main: async def
. You’ll also notice await
and with async
.

The async/await syntax was introduced in PEP492
. The async def
syntax marks a function as a coroutine
. Internally, coroutines are based on Python generators, but aren’t exactly the same thing. Coroutines return a coroutine object similar to how generators return a generator object. Once you have a coroutine, you obtain its results with the await
expression. When a coroutine calls await
, execution of the coroutine is suspended until the awaitable completes. This suspension allows other work to be completed while the coroutine is suspended “awaiting” some result. In general, this result will be some kind of I/O like a database request or in our case an HTTP request.

The download_link
function had to be changed pretty significantly. Previously, we were relying on urllib
to do the brunt of the work of reading the image for us. Now, to allow our method to work properly with the async programming paradigm, we’ve introduced a while
loop that reads chunks of the image at a time and suspends execution while waiting for the I/O to complete. This allows the event loop to loop through downloading the different images as each one has new data available during the download.

There Should Be One—Preferably Only One—Obvious Way to Do It

While the zen of Python
tells us there should be one obvious way to do something, there are many ways in Python to introduce concurrency into our programs. The best method to choose is going to depend on your specific use case. The asynchronous paradigm scales better to high-concurrency workloads (like a webserver) compared to threading or multiprocessing, but it requires your code (and dependencies) to be async in order to fully benefit.

Hopefully this article—and update—will point you in the right direction so you have an idea of where to look in the Python standard library if you need to introduce concurrency into your programs.

Understanding the Basics

What is a thread in Python?

A thread is a lightweight process or task. A thread is one way to add concurrency to your programs. If your Python application is using multiple threads and you look at the processes running on your OS, you would only see a single entry for your script even though it is running multiple threads.

What is multithreading?

Multithreading (sometimes simply “threading”) is when a program creates multiple threads with execution cycling among them, so one longer-running task doesn’t block all the others. This works well for tasks that can be broken down into smaller subtasks, which can then each be given to a thread to be completed.

What’s the difference between Python threading and multiprocessing?

With threading, concurrency is achieved using multiple threads, but due to the GIL only one thread can be running at a time. In multiprocessing, the original process is forked process into multiple child processes bypassing the GIL. Each child process will have a copy of the entire program’s memory.

How are Python multithreading and multiprocessing related?

Both multithreading and multiprocessing allow Python code to run concurrently. Only multiprocessing will allow your code to be truly parallel. However, if your code is IO-heavy (like HTTP requests), then multithreading will still probably speed up your code.

About the author


View full profile »

Hire the Author

Marcus McCurdy, United States

member since November 10, 2014

Python
Flask

Marcus has a Bachelor’s in Computer Engineering and a Master’s in Computer Science. He is a talented programmer, and excels most at back-end development. However, he is comfortable creating polished products as a full stack developer. [click to continue…]

Hiring? Meet the Top 10 Freelance Python Developers for Hire in May 2018

Toptal Engineering Blog责编内容来自:Toptal Engineering Blog (源链) | 更多关于

阅读提示:酷辣虫无法对本内容的真实性提供任何保证,请自行验证并承担相关的风险与后果!
本站遵循[CC BY-NC-SA 4.0]。如您有版权、意见投诉等问题,请通过eMail联系我们处理。
酷辣虫 » 综合编程 » Python Multithreading and Multiprocessing Tutorial

喜欢 (0)or分享给?

专业 x 专注 x 聚合 x 分享 CC BY-NC-SA 4.0

使用声明 | 英豪名录